001 /*
002 * JBoss, Home of Professional Open Source.
003 * Copyright 2008, Red Hat Middleware LLC, and individual contributors
004 * as indicated by the @author tags. See the copyright.txt file in the
005 * distribution for a full listing of individual contributors.
006 *
007 * This is free software; you can redistribute it and/or modify it
008 * under the terms of the GNU Lesser General Public License as
009 * published by the Free Software Foundation; either version 2.1 of
010 * the License, or (at your option) any later version.
011 *
012 * This software is distributed in the hope that it will be useful,
013 * but WITHOUT ANY WARRANTY; without even the implied warranty of
014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015 * Lesser General Public License for more details.
016 *
017 * You should have received a copy of the GNU Lesser General Public
018 * License along with this software; if not, write to the Free
019 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021 */
022 package org.jboss.dna.repository.observation;
023
024 import java.util.ArrayList;
025 import java.util.Collections;
026 import java.util.HashSet;
027 import java.util.Iterator;
028 import java.util.List;
029 import java.util.Set;
030 import java.util.concurrent.CopyOnWriteArrayList;
031 import java.util.concurrent.TimeUnit;
032 import java.util.concurrent.atomic.AtomicLong;
033 import java.util.concurrent.locks.ReadWriteLock;
034 import java.util.concurrent.locks.ReentrantReadWriteLock;
035 import javax.jcr.RepositoryException;
036 import javax.jcr.Session;
037 import javax.jcr.UnsupportedRepositoryOperationException;
038 import javax.jcr.observation.Event;
039 import javax.jcr.observation.EventIterator;
040 import javax.jcr.observation.EventListener;
041 import javax.jcr.observation.ObservationManager;
042 import net.jcip.annotations.GuardedBy;
043 import net.jcip.annotations.ThreadSafe;
044 import org.jboss.dna.common.util.CheckArg;
045 import org.jboss.dna.common.util.Logger;
046 import org.jboss.dna.repository.RepositoryI18n;
047 import org.jboss.dna.repository.services.AbstractServiceAdministrator;
048 import org.jboss.dna.repository.services.AdministeredService;
049 import org.jboss.dna.repository.services.ServiceAdministrator;
050 import org.jboss.dna.repository.util.SessionFactory;
051
052 /**
053 * @author Randall Hauch
054 */
055 public class ObservationService implements AdministeredService {
056
057 /**
058 * Interface to which problems with particular events are logged.
059 *
060 * @author Randall Hauch
061 */
062 public static interface ProblemLog {
063
064 void error( String repositoryWorkspaceName,
065 Throwable t );
066 }
067
068 /**
069 * Problem log implementation that records problems in the log.
070 *
071 * @author Randall Hauch
072 */
073 public class DefaultProblemLog implements ProblemLog {
074
075 /**
076 * {@inheritDoc}
077 */
078 public void error( String repositoryWorkspaceName,
079 Throwable t ) {
080 getLogger().error(t, RepositoryI18n.errorProcessingEvents, repositoryWorkspaceName);
081 }
082 }
083
084 protected static class NoOpProblemLog implements ProblemLog {
085
086 /**
087 * {@inheritDoc}
088 */
089 public void error( String repositoryWorkspaceName,
090 Throwable t ) {
091 }
092 }
093
094 public static final ProblemLog NO_OP_PROBLEM_LOG = new NoOpProblemLog();
095
096 /**
097 * The administrative component for this service.
098 *
099 * @author Randall Hauch
100 */
101 protected class Administrator extends AbstractServiceAdministrator {
102
103 protected Administrator() {
104 super(RepositoryI18n.observationServiceName, State.STARTED);
105 }
106
107 /**
108 * {@inheritDoc}
109 */
110 @Override
111 protected void doShutdown( State fromState ) {
112 super.doShutdown(fromState);
113 shutdownService();
114 }
115
116 /**
117 * {@inheritDoc}
118 */
119 public boolean awaitTermination( long timeout,
120 TimeUnit unit ) {
121 return true;
122 }
123
124 /**
125 * {@inheritDoc}
126 */
127 @Override
128 protected boolean doCheckIsTerminated() {
129 return true;
130 }
131
132 }
133
134 private Logger logger = Logger.getLogger(this.getClass());
135 private ProblemLog problemLog = new DefaultProblemLog();
136 private final Statistics statistics = new Statistics();
137 private final SessionFactory sessionFactory;
138 private final CopyOnWriteArrayList<WorkspaceListener> workspaceListeners = new CopyOnWriteArrayList<WorkspaceListener>();
139 private final CopyOnWriteArrayList<EventListener> eventListeners = new CopyOnWriteArrayList<EventListener>();
140 private final CopyOnWriteArrayList<NodeChangeListener> nodeChangeListeners = new CopyOnWriteArrayList<NodeChangeListener>();
141 private final Administrator administrator = new Administrator();
142
143 public ObservationService( SessionFactory sessionFactory ) {
144 CheckArg.isNotNull(sessionFactory, "session factory");
145 this.sessionFactory = sessionFactory;
146 }
147
148 /**
149 * {@inheritDoc}
150 */
151 public ServiceAdministrator getAdministrator() {
152 return this.administrator;
153 }
154
155 /**
156 * @return sessionFactory
157 */
158 public SessionFactory getSessionFactory() {
159 return this.sessionFactory;
160 }
161
162 /**
163 * Get the statistics for this system.
164 *
165 * @return the statistics, which are updated as the system is used
166 */
167 public Statistics getStatistics() {
168 return this.statistics;
169 }
170
171 /**
172 * Get the logger for this system
173 *
174 * @return the logger
175 */
176 public Logger getLogger() {
177 return this.logger;
178 }
179
180 /**
181 * Set the logger for this system.
182 *
183 * @param logger the logger, or null if the standard logging should be used
184 */
185 public void setLogger( Logger logger ) {
186 this.logger = logger != null ? logger : Logger.getLogger(this.getClass());
187 }
188
189 /**
190 * @return problemLog
191 */
192 public ProblemLog getProblemLog() {
193 return this.problemLog;
194 }
195
196 /**
197 * Set the problem log that will be notified of problems handling events. By default, such problems are sent to the log.
198 *
199 * @param problemLog the new problem log implementation; if null, then the default problem log is used
200 */
201 public void setProblemLog( ProblemLog problemLog ) {
202 this.problemLog = problemLog != null ? problemLog : new DefaultProblemLog();
203 }
204
205 public boolean addListener( EventListener listener ) {
206 if (listener == null) return false;
207 return this.eventListeners.addIfAbsent(listener);
208 }
209
210 public boolean removeListener( EventListener listener ) {
211 if (listener == null) return false;
212 return this.eventListeners.remove(listener);
213 }
214
215 public boolean addListener( NodeChangeListener listener ) {
216 return this.nodeChangeListeners.addIfAbsent(listener);
217 }
218
219 public boolean removeListener( NodeChangeListener listener ) {
220 if (listener == null) return false;
221 return this.nodeChangeListeners.remove(listener);
222 }
223
224 protected void shutdownService() {
225 // Unregister all listeners ...
226 for (WorkspaceListener listener : this.workspaceListeners) {
227 try {
228 listener.unregister();
229 } catch (RepositoryException e) {
230 this.logger.error(e, RepositoryI18n.errorUnregisteringWorkspaceListenerWhileShuttingDownObservationService);
231 }
232 }
233 }
234
235 /**
236 * Monitor the supplied workspace for events of the given type on any node at or under the supplied path.
237 * <p>
238 * Monitoring is accomplished by registering a listener on the workspace, so this monitoring only has access to the
239 * information that visible to the session created by the {@link #getSessionFactory() session factory} for the given
240 * repository and workspace name.
241 * </p>
242 * <p>
243 * The listener returned from this method is not managed by this SequencingService instance. If the listener is no longer
244 * needed, it simply must be {@link ObservationManager#removeEventListener(EventListener) removed} as a listener of the
245 * workspace and garbage collected. If this service is {@link ServiceAdministrator#shutdown() shutdown} while there are still
246 * active listeners, those listeners will disconnect themselves from this service and the workspace with which they're
247 * registered when they attempt to forward the next events.
248 * </p>
249 * <p>
250 * The set of events that are monitored can be filtered by specifying restrictions based on characteristics of the node
251 * associated with the event. In the case of event types {@link Event#NODE_ADDED NODE_ADDED} and
252 * {@link Event#NODE_REMOVED NODE_REMOVED}, the node associated with an event is the node at (or formerly at) the path
253 * returned by {@link Event#getPath() Event.getPath()}. In the case of event types
254 * {@link Event#PROPERTY_ADDED PROPERTY_ADDED}, {@link Event#PROPERTY_REMOVED PROPERTY_REMOVED} and
255 * {@link Event#PROPERTY_CHANGED PROPERTY_CHANGED}, the node associated with an event is the parent node of the property at
256 * (or formerly at) the path returned by <code>Event.getPath</code>:
257 * <ul>
258 * <li> <code>absolutePath</code>, <code>isDeep</code>: Only events whose associated node is at
259 * <code>absolutePath</code> (or within its subtree, if <code>isDeep</code> is <code>true</code>) will be received. It
260 * is permissible to register a listener for a path where no node currently exists. </li>
261 * <li> <code>uuids</code>: Only events whose associated node has one of the UUIDs in this list will be received. If his
262 * parameter is <code>null</code> then no UUID-related restriction is placed on events received. </li>
263 * <li> <code>nodeTypeNames</code>: Only events whose associated node has one of the node types (or a subtype of one of the
264 * node types) in this list will be received. If this parameter is <code>null</code> then no node type-related restriction
265 * is placed on events received. </li>
266 * </ul>
267 * The restrictions are "ANDed" together. In other words, for a particular node to be "listened to" it must meet all the
268 * restrictions.
269 * </p>
270 * <p>
271 * Additionally, if <code>noLocal</code> is <code>true</code>, then events generated by the session through which the
272 * listener was registered are ignored. Otherwise, they are not ignored.
273 * </p>
274 * <p>
275 * The filters of an already-registered {@link WorkspaceListener} can be changed at runtime by changing the attributes and
276 * {@link WorkspaceListener#reregister() registering}.
277 * </p>
278 *
279 * @param repositoryWorkspaceName the name to be used with the session factory to obtain a session to the repository and
280 * workspace that is to be monitored
281 * @param absolutePath the absolute path of the node at or below which changes are to be monitored; may be null if all nodes
282 * in the workspace are to be monitored
283 * @param eventTypes the bitmask of the {@link Event} types that are to be monitored
284 * @param isDeep true if events below the node given by the <code>absolutePath</code> or by the <code>uuids</code> are to
285 * be processed, or false if only the events at the node
286 * @param uuids array of UUIDs of nodes that are to be monitored; may be null or empty if the UUIDs are not known
287 * @param nodeTypeNames array of node type names that are to be monitored; may be null or empty if the monitoring has no node
288 * type restrictions
289 * @param noLocal true if the events originating in the supplied workspace are to be ignored, or false if they are also to be
290 * processed.
291 * @return the listener that was created and registered to perform the monitoring
292 * @throws RepositoryException if there is a problem registering the listener
293 */
294 public WorkspaceListener monitor( String repositoryWorkspaceName,
295 String absolutePath,
296 int eventTypes,
297 boolean isDeep,
298 String[] uuids,
299 String[] nodeTypeNames,
300 boolean noLocal ) throws RepositoryException {
301 WorkspaceListener listener = new WorkspaceListener(repositoryWorkspaceName, eventTypes, absolutePath, isDeep, uuids,
302 nodeTypeNames, noLocal);
303 listener.register();
304 this.workspaceListeners.add(listener);
305 return listener;
306 }
307
308 /**
309 * Monitor the supplied workspace for {@link WorkspaceListener#DEFAULT_EVENT_TYPES default event types} on any node at or
310 * under the supplied path.
311 * <p>
312 * Monitoring is accomplished by registering a listener on the workspace, so this monitoring only has access to the
313 * information that visible to the session created by the {@link #getSessionFactory() session factory} for the given
314 * repository and workspace name.
315 * </p>
316 * <p>
317 * The listener returned from this method is not managed by this SequencingService instance. If the listener is no longer
318 * needed, it simply must be {@link ObservationManager#removeEventListener(EventListener) removed} as a listener of the
319 * workspace and garbage collected.
320 * </p>
321 *
322 * @param repositoryWorkspaceName the name to be used with the session factory to obtain a session to the repository and
323 * workspace that is to be monitored
324 * @param absolutePath the absolute path of the node at or below which changes are to be monitored; may be null if all nodes
325 * in the workspace are to be monitored
326 * @param nodeTypeNames the names of the node types that are to be monitored; may be null or empty if the monitoring has no
327 * node type restrictions
328 * @return the listener that was created and registered to perform the monitoring
329 * @throws RepositoryException if there is a problem registering the listener
330 */
331 public WorkspaceListener monitor( String repositoryWorkspaceName,
332 String absolutePath,
333 String... nodeTypeNames ) throws RepositoryException {
334 return monitor(repositoryWorkspaceName,
335 absolutePath,
336 WorkspaceListener.DEFAULT_EVENT_TYPES,
337 WorkspaceListener.DEFAULT_IS_DEEP,
338 null,
339 nodeTypeNames,
340 WorkspaceListener.DEFAULT_NO_LOCAL);
341 }
342
343 /**
344 * Monitor the supplied workspace for the supplied event types on any node in the workspace.
345 * <p>
346 * Monitoring is accomplished by registering a listener on the workspace, so this monitoring only has access to the
347 * information that visible to the session created by the {@link #getSessionFactory() session factory} for the given
348 * repository and workspace name.
349 * </p>
350 * <p>
351 * The listener returned from this method is not managed by this SequencingService instance. If the listener is no longer
352 * needed, it simply must be {@link ObservationManager#removeEventListener(EventListener) removed} as a listener of the
353 * workspace and garbage collected.
354 * </p>
355 *
356 * @param repositoryWorkspaceName the name to be used with the session factory to obtain a session to the repository and
357 * workspace that is to be monitored
358 * @param eventTypes the bitmask of the {@link Event} types that are to be monitored
359 * @param nodeTypeNames the names of the node types that are to be monitored; may be null or empty if the monitoring has no
360 * node type restrictions
361 * @return the listener that was created and registered to perform the monitoring
362 * @throws RepositoryException if there is a problem registering the listener
363 */
364 public WorkspaceListener monitor( String repositoryWorkspaceName,
365 int eventTypes,
366 String... nodeTypeNames ) throws RepositoryException {
367 return monitor(repositoryWorkspaceName,
368 WorkspaceListener.DEFAULT_ABSOLUTE_PATH,
369 eventTypes,
370 WorkspaceListener.DEFAULT_IS_DEEP,
371 null,
372 nodeTypeNames,
373 WorkspaceListener.DEFAULT_NO_LOCAL);
374 }
375
376 protected void unregisterListener( WorkspaceListener listener ) {
377 if (listener != null) this.workspaceListeners.remove(listener);
378 }
379
380 /**
381 * From section 2.8.8 of the JSR-170 specification:
382 * <p>
383 * On each persistent change, those listeners that are entitled to receive one or more events will have their onEvent method
384 * called and be passed an EventIterator. The EventIterator will contain the event bundle reflecting the persistent changes
385 * made but excluding those to which that particular listener is not entitled, according to the listeners access permissions
386 * and filters.
387 * </p>
388 *
389 * @param eventIterator
390 * @param listener
391 */
392 protected void processEvents( EventIterator eventIterator,
393 WorkspaceListener listener ) {
394 if (eventIterator == null) return;
395 List<Event> events = new ArrayList<Event>();
396 // Copy the events ...
397 while (eventIterator.hasNext()) {
398 events.add((Event)eventIterator.next());
399 }
400 if (!getAdministrator().isStarted()) {
401 this.statistics.recordIgnoredEventSet(events.size());
402 return;
403 }
404
405 // Notify the event listeners ...
406 boolean notifiedSomebody = false;
407 List<EventListener> eventListeners = this.eventListeners; // use one consistent snapshot
408 if (!eventListeners.isEmpty()) {
409 DelegatingEventIterator eventIter = new DelegatingEventIterator(events.iterator(), events.size());
410 for (EventListener eventListener : eventListeners) {
411 eventListener.onEvent(eventIter);
412 }
413 notifiedSomebody = true;
414 }
415
416 // Now create the node change events ...
417 List<NodeChangeListener> nodeChangeListeners = this.nodeChangeListeners; // use one consistent snapshot
418 if (!nodeChangeListeners.isEmpty()) {
419 final String repositoryWorkspaceName = listener.getRepositoryWorkspaceName();
420 try {
421 NodeChanges nodeChanges = NodeChanges.create(repositoryWorkspaceName, events);
422
423 // And notify the node change listeners ...
424 int nodeChangeCount = nodeChanges.size();
425 this.statistics.recordNodesChanged(nodeChangeCount);
426 for (NodeChangeListener nodeChangeListener : nodeChangeListeners) {
427 nodeChangeListener.onNodeChanges(nodeChanges);
428 }
429 } catch (Throwable t) {
430 getProblemLog().error(repositoryWorkspaceName, t);
431 }
432 notifiedSomebody = true;
433 }
434
435 if (notifiedSomebody) {
436 this.statistics.recordEventSet(events.size());
437 } else {
438 this.statistics.recordIgnoredEventSet(events.size());
439 }
440 }
441
442 protected class DelegatingEventIterator implements EventIterator {
443
444 private final Iterator<Event> events;
445 private final int size;
446 private int position = 0;
447
448 protected DelegatingEventIterator( Iterator<Event> events,
449 int size ) {
450 this.events = events;
451 this.size = size;
452 }
453
454 /**
455 * {@inheritDoc}
456 */
457 public Event nextEvent() {
458 ++position;
459 return events.next();
460 }
461
462 /**
463 * {@inheritDoc}
464 */
465 public long getPosition() {
466 return position;
467 }
468
469 /**
470 * {@inheritDoc}
471 */
472 public long getSize() {
473 return size;
474 }
475
476 /**
477 * {@inheritDoc}
478 */
479 public void skip( long skipNum ) {
480 for (int i = 0; i != skipNum; ++i) {
481 next();
482 }
483 }
484
485 /**
486 * {@inheritDoc}
487 */
488 public boolean hasNext() {
489 return events.hasNext();
490 }
491
492 /**
493 * {@inheritDoc}
494 */
495 public Object next() {
496 return events.next();
497 }
498
499 /**
500 * {@inheritDoc}
501 */
502 public void remove() {
503 // does nothing
504 }
505
506 }
507
508 /**
509 * Implementation of the {@link EventListener JCR EventListener} interface, returned by the sequencing system.
510 *
511 * @author Randall Hauch
512 */
513 @ThreadSafe
514 public class WorkspaceListener implements EventListener {
515
516 public static final boolean DEFAULT_IS_DEEP = true;
517 public static final boolean DEFAULT_NO_LOCAL = false;
518 public static final int DEFAULT_EVENT_TYPES = Event.NODE_ADDED | /* Event.NODE_REMOVED| */Event.PROPERTY_ADDED
519 | Event.PROPERTY_CHANGED /* |Event.PROPERTY_REMOVED */;
520 public static final String DEFAULT_ABSOLUTE_PATH = "/";
521
522 private final String repositoryWorkspaceName;
523 private final Set<String> uuids;
524 private final Set<String> nodeTypeNames;
525 private final int eventTypes;
526 private final String absolutePath;
527 private final boolean deep;
528 private final boolean noLocal;
529 @GuardedBy( "this" )
530 private transient Session session;
531
532 protected WorkspaceListener( String repositoryWorkspaceName,
533 int eventTypes,
534 String absPath,
535 boolean isDeep,
536 String[] uuids,
537 String[] nodeTypeNames,
538 boolean noLocal ) {
539 this.repositoryWorkspaceName = repositoryWorkspaceName;
540 this.eventTypes = eventTypes;
541 this.deep = isDeep;
542 this.noLocal = noLocal;
543 this.absolutePath = absPath != null && absPath.trim().length() != 0 ? absPath.trim() : null;
544 // Set the UUIDs ...
545 Set<String> newUuids = new HashSet<String>();
546 if (uuids != null) {
547 for (String uuid : uuids) {
548 if (uuid != null && uuid.trim().length() != 0) newUuids.add(uuid.trim());
549 }
550 }
551 this.uuids = Collections.unmodifiableSet(newUuids);
552 // Set the node type names
553 Set<String> newNodeTypeNames = new HashSet<String>();
554 if (nodeTypeNames != null) {
555 for (String nodeTypeName : nodeTypeNames) {
556 if (nodeTypeName != null && nodeTypeName.trim().length() != 0) newNodeTypeNames.add(nodeTypeName.trim());
557 }
558 }
559 this.nodeTypeNames = Collections.unmodifiableSet(newNodeTypeNames);
560 }
561
562 /**
563 * @return repositoryWorkspaceName
564 */
565 public String getRepositoryWorkspaceName() {
566 return this.repositoryWorkspaceName;
567 }
568
569 /**
570 * @return eventTypes
571 */
572 public int getEventTypes() {
573 return this.eventTypes;
574 }
575
576 /**
577 * @return absolutePath
578 */
579 public String getAbsolutePath() {
580 return this.absolutePath;
581 }
582
583 /**
584 * @return deep
585 */
586 public boolean isDeep() {
587 return this.deep;
588 }
589
590 /**
591 * @return noLocal
592 */
593 public boolean isNoLocal() {
594 return this.noLocal;
595 }
596
597 /**
598 * @return uuids
599 */
600 public Set<String> getUuids() {
601 return this.uuids;
602 }
603
604 /**
605 * @return nodeTypeNames
606 */
607 public Set<String> getNodeTypeNames() {
608 return this.nodeTypeNames;
609 }
610
611 public synchronized boolean isRegistered() {
612 if (this.session != null && getAdministrator().isShutdown()) {
613 // This sequencing system has been shutdown, so unregister this listener
614 try {
615 unregister();
616 } catch (RepositoryException re) {
617 String msg = "Error unregistering workspace listener after sequencing system has been shutdow.";
618 Logger.getLogger(this.getClass()).debug(re, msg);
619 }
620 }
621 return this.session != null;
622 }
623
624 public synchronized WorkspaceListener register() throws UnsupportedRepositoryOperationException, RepositoryException {
625 if (this.session != null) return this;
626 this.session = ObservationService.this.getSessionFactory().createSession(this.repositoryWorkspaceName);
627 String[] uuids = this.uuids.isEmpty() ? null : this.uuids.toArray(new String[this.uuids.size()]);
628 String[] nodeTypeNames = this.nodeTypeNames.isEmpty() ? null : this.nodeTypeNames.toArray(new String[this.nodeTypeNames.size()]);
629 this.session.getWorkspace().getObservationManager().addEventListener(this,
630 eventTypes,
631 absolutePath,
632 deep,
633 uuids,
634 nodeTypeNames,
635 noLocal);
636 return this;
637 }
638
639 public synchronized WorkspaceListener unregister() throws UnsupportedRepositoryOperationException, RepositoryException {
640 if (this.session == null) return this;
641 try {
642 if (this.session.isLive()) {
643 this.session.getWorkspace().getObservationManager().removeEventListener(this);
644 this.session.logout();
645 }
646 } finally {
647 this.session = null;
648 unregisterListener(this);
649 }
650 return this;
651 }
652
653 public synchronized WorkspaceListener reregister() throws UnsupportedRepositoryOperationException, RepositoryException {
654 unregister();
655 register();
656 return this;
657 }
658
659 /**
660 * {@inheritDoc}
661 */
662 public void onEvent( EventIterator events ) {
663 if (events != null) {
664 if (getAdministrator().isShutdown()) {
665 // This sequencing system has been shutdown, so unregister this listener
666 try {
667 unregister();
668 } catch (RepositoryException re) {
669 String msg = "Error unregistering workspace listener after sequencing system has been shutdow.";
670 Logger.getLogger(this.getClass()).debug(re, msg);
671 }
672 } else {
673 ObservationService.this.processEvents(events, this);
674 }
675 }
676 }
677 }
678
679 /**
680 * The statistics for the system. Each sequencing system has an instance of this class that is updated.
681 *
682 * @author Randall Hauch
683 */
684 @ThreadSafe
685 public class Statistics {
686
687 @GuardedBy( "lock" )
688 private long numberOfEventsIgnored;
689 @GuardedBy( "lock" )
690 private long numberOfEventsEnqueued;
691 @GuardedBy( "lock" )
692 private long numberOfEventSetsIgnored;
693 @GuardedBy( "lock" )
694 private long numberOfEventSetsEnqueued;
695 private final AtomicLong numberOfNodeChangesEnqueued = new AtomicLong(0);
696 private final ReadWriteLock lock = new ReentrantReadWriteLock();
697 private final AtomicLong startTime;
698
699 protected Statistics() {
700 startTime = new AtomicLong(System.currentTimeMillis());
701 }
702
703 public Statistics reset() {
704 try {
705 lock.writeLock().lock();
706 this.startTime.set(System.currentTimeMillis());
707 this.numberOfEventsIgnored = 0;
708 this.numberOfEventsEnqueued = 0;
709 this.numberOfEventSetsIgnored = 0;
710 this.numberOfEventSetsEnqueued = 0;
711 this.numberOfNodeChangesEnqueued.set(0);
712 } finally {
713 lock.writeLock().unlock();
714 }
715 return this;
716 }
717
718 /**
719 * @return the system time when the statistics were started
720 */
721 public long getStartTime() {
722 return this.startTime.get();
723 }
724
725 /**
726 * @return the number of node changes that were processed
727 */
728 public long getNumberOfNodeChangesEnqueued() {
729 return this.numberOfNodeChangesEnqueued.get();
730 }
731
732 /**
733 * @return the number of events that were ignored because the system was not running
734 */
735 public long getNumberOfEventsIgnored() {
736 try {
737 lock.readLock().lock();
738 return this.numberOfEventsIgnored;
739 } finally {
740 lock.readLock().unlock();
741 }
742 }
743
744 /**
745 * @return the number of events that were enqueued for processing
746 */
747 public long getNumberOfEventsEnqueued() {
748 try {
749 lock.readLock().lock();
750 return this.numberOfEventsEnqueued;
751 } finally {
752 lock.readLock().unlock();
753 }
754 }
755
756 /**
757 * @return the number of event sets (transactions) that were enqueued for processing
758 */
759 public long getNumberOfEventSetsEnqueued() {
760 try {
761 lock.readLock().lock();
762 return this.numberOfEventSetsEnqueued;
763 } finally {
764 lock.readLock().unlock();
765 }
766 }
767
768 /**
769 * @return the number of event sets (transactions) that were ignored because the system was not running
770 */
771 public long getNumberOfEventSetsIgnored() {
772 try {
773 lock.readLock().lock();
774 return this.numberOfEventSetsIgnored;
775 } finally {
776 lock.readLock().unlock();
777 }
778 }
779
780 protected void recordNodesChanged( long changeCount ) {
781 this.numberOfNodeChangesEnqueued.addAndGet(changeCount);
782 }
783
784 protected void recordEventSet( long eventsInSet ) {
785 try {
786 lock.writeLock().lock();
787 this.numberOfEventsEnqueued += eventsInSet;
788 ++this.numberOfEventSetsEnqueued;
789 } finally {
790 lock.writeLock().unlock();
791 }
792 }
793
794 protected void recordIgnoredEventSet( long eventsInSet ) {
795 try {
796 lock.writeLock().lock();
797 this.numberOfEventsIgnored += eventsInSet;
798 this.numberOfEventSetsIgnored += 1;
799 ++this.numberOfEventSetsEnqueued;
800 } finally {
801 lock.writeLock().unlock();
802 }
803 }
804 }
805 }