001 /*
002 * JBoss DNA (http://www.jboss.org/dna)
003 * See the COPYRIGHT.txt file distributed with this work for information
004 * regarding copyright ownership. Some portions may be licensed
005 * to Red Hat, Inc. under one or more contributor license agreements.
006 * See the AUTHORS.txt file in the distribution for a full listing of
007 * individual contributors.
008 *
009 * JBoss DNA is free software. Unless otherwise indicated, all code in JBoss DNA
010 * is licensed to you under the terms of the GNU Lesser General Public License as
011 * published by the Free Software Foundation; either version 2.1 of
012 * the License, or (at your option) any later version.
013 *
014 * JBoss DNA is distributed in the hope that it will be useful,
015 * but WITHOUT ANY WARRANTY; without even the implied warranty of
016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
017 * Lesser General Public License for more details.
018 *
019 * You should have received a copy of the GNU Lesser General Public
020 * License along with this software; if not, write to the Free
021 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
022 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
023 */
024 package org.jboss.dna.repository.observation;
025
026 import java.util.ArrayList;
027 import java.util.Collections;
028 import java.util.HashSet;
029 import java.util.Iterator;
030 import java.util.List;
031 import java.util.Set;
032 import java.util.concurrent.CopyOnWriteArrayList;
033 import java.util.concurrent.TimeUnit;
034 import java.util.concurrent.atomic.AtomicLong;
035 import java.util.concurrent.locks.ReadWriteLock;
036 import java.util.concurrent.locks.ReentrantReadWriteLock;
037 import javax.jcr.RepositoryException;
038 import javax.jcr.Session;
039 import javax.jcr.UnsupportedRepositoryOperationException;
040 import javax.jcr.observation.Event;
041 import javax.jcr.observation.EventIterator;
042 import javax.jcr.observation.EventListener;
043 import javax.jcr.observation.ObservationManager;
044 import net.jcip.annotations.GuardedBy;
045 import net.jcip.annotations.ThreadSafe;
046 import org.jboss.dna.common.util.CheckArg;
047 import org.jboss.dna.common.util.Logger;
048 import org.jboss.dna.repository.RepositoryI18n;
049 import org.jboss.dna.repository.service.AbstractServiceAdministrator;
050 import org.jboss.dna.repository.service.AdministeredService;
051 import org.jboss.dna.repository.service.ServiceAdministrator;
052 import org.jboss.dna.repository.util.SessionFactory;
053
054 /**
055 * @author Randall Hauch
056 */
057 public class ObservationService implements AdministeredService {
058
059 /**
060 * Interface to which problems with particular events are logged.
061 *
062 * @author Randall Hauch
063 */
064 public static interface ProblemLog {
065
066 void error( String repositoryWorkspaceName,
067 Throwable t );
068 }
069
070 /**
071 * Problem log implementation that records problems in the log.
072 *
073 * @author Randall Hauch
074 */
075 public class DefaultProblemLog implements ProblemLog {
076
077 /**
078 * {@inheritDoc}
079 */
080 public void error( String repositoryWorkspaceName,
081 Throwable t ) {
082 getLogger().error(t, RepositoryI18n.errorProcessingEvents, repositoryWorkspaceName);
083 }
084 }
085
086 protected static class NoOpProblemLog implements ProblemLog {
087
088 /**
089 * {@inheritDoc}
090 */
091 public void error( String repositoryWorkspaceName,
092 Throwable t ) {
093 }
094 }
095
096 public static final ProblemLog NO_OP_PROBLEM_LOG = new NoOpProblemLog();
097
098 /**
099 * The administrative component for this service.
100 *
101 * @author Randall Hauch
102 */
103 protected class Administrator extends AbstractServiceAdministrator {
104
105 protected Administrator() {
106 super(RepositoryI18n.observationServiceName, State.STARTED);
107 }
108
109 /**
110 * {@inheritDoc}
111 */
112 @Override
113 protected void doShutdown( State fromState ) {
114 super.doShutdown(fromState);
115 shutdownService();
116 }
117
118 /**
119 * {@inheritDoc}
120 */
121 public boolean awaitTermination( long timeout,
122 TimeUnit unit ) {
123 return true;
124 }
125
126 /**
127 * {@inheritDoc}
128 */
129 @Override
130 protected boolean doCheckIsTerminated() {
131 return true;
132 }
133
134 }
135
136 private Logger logger = Logger.getLogger(this.getClass());
137 private ProblemLog problemLog = new DefaultProblemLog();
138 private final Statistics statistics = new Statistics();
139 private final SessionFactory sessionFactory;
140 private final CopyOnWriteArrayList<WorkspaceListener> workspaceListeners = new CopyOnWriteArrayList<WorkspaceListener>();
141 private final CopyOnWriteArrayList<EventListener> eventListeners = new CopyOnWriteArrayList<EventListener>();
142 private final CopyOnWriteArrayList<NodeChangeListener> nodeChangeListeners = new CopyOnWriteArrayList<NodeChangeListener>();
143 private final Administrator administrator = new Administrator();
144
145 public ObservationService( SessionFactory sessionFactory ) {
146 CheckArg.isNotNull(sessionFactory, "session factory");
147 this.sessionFactory = sessionFactory;
148 }
149
150 /**
151 * {@inheritDoc}
152 */
153 public ServiceAdministrator getAdministrator() {
154 return this.administrator;
155 }
156
157 /**
158 * @return sessionFactory
159 */
160 public SessionFactory getSessionFactory() {
161 return this.sessionFactory;
162 }
163
164 /**
165 * Get the statistics for this system.
166 *
167 * @return the statistics, which are updated as the system is used
168 */
169 public Statistics getStatistics() {
170 return this.statistics;
171 }
172
173 /**
174 * Get the logger for this system
175 *
176 * @return the logger
177 */
178 public Logger getLogger() {
179 return this.logger;
180 }
181
182 /**
183 * Set the logger for this system.
184 *
185 * @param logger the logger, or null if the standard logging should be used
186 */
187 public void setLogger( Logger logger ) {
188 this.logger = logger != null ? logger : Logger.getLogger(this.getClass());
189 }
190
191 /**
192 * @return problemLog
193 */
194 public ProblemLog getProblemLog() {
195 return this.problemLog;
196 }
197
198 /**
199 * Set the problem log that will be notified of problems handling events. By default, such problems are sent to the log.
200 *
201 * @param problemLog the new problem log implementation; if null, then the default problem log is used
202 */
203 public void setProblemLog( ProblemLog problemLog ) {
204 this.problemLog = problemLog != null ? problemLog : new DefaultProblemLog();
205 }
206
207 public boolean addListener( EventListener listener ) {
208 if (listener == null) return false;
209 return this.eventListeners.addIfAbsent(listener);
210 }
211
212 public boolean removeListener( EventListener listener ) {
213 if (listener == null) return false;
214 return this.eventListeners.remove(listener);
215 }
216
217 public boolean addListener( NodeChangeListener listener ) {
218 return this.nodeChangeListeners.addIfAbsent(listener);
219 }
220
221 public boolean removeListener( NodeChangeListener listener ) {
222 if (listener == null) return false;
223 return this.nodeChangeListeners.remove(listener);
224 }
225
226 protected void shutdownService() {
227 // Unregister all listeners ...
228 for (WorkspaceListener listener : this.workspaceListeners) {
229 try {
230 listener.unregister();
231 } catch (RepositoryException e) {
232 this.logger.error(e, RepositoryI18n.errorUnregisteringWorkspaceListenerWhileShuttingDownObservationService);
233 }
234 }
235 }
236
237 /**
238 * Monitor the supplied workspace for events of the given type on any node at or under the supplied path.
239 * <p>
240 * Monitoring is accomplished by registering a listener on the workspace, so this monitoring only has access to the
241 * information that visible to the session created by the {@link #getSessionFactory() session factory} for the given
242 * repository and workspace name.
243 * </p>
244 * <p>
245 * The listener returned from this method is not managed by this SequencingService instance. If the listener is no longer
246 * needed, it simply must be {@link ObservationManager#removeEventListener(EventListener) removed} as a listener of the
247 * workspace and garbage collected. If this service is {@link ServiceAdministrator#shutdown() shutdown} while there are still
248 * active listeners, those listeners will disconnect themselves from this service and the workspace with which they're
249 * registered when they attempt to forward the next events.
250 * </p>
251 * <p>
252 * The set of events that are monitored can be filtered by specifying restrictions based on characteristics of the node
253 * associated with the event. In the case of event types {@link Event#NODE_ADDED NODE_ADDED} and
254 * {@link Event#NODE_REMOVED NODE_REMOVED}, the node associated with an event is the node at (or formerly at) the path
255 * returned by {@link Event#getPath() Event.getPath()}. In the case of event types
256 * {@link Event#PROPERTY_ADDED PROPERTY_ADDED}, {@link Event#PROPERTY_REMOVED PROPERTY_REMOVED} and
257 * {@link Event#PROPERTY_CHANGED PROPERTY_CHANGED}, the node associated with an event is the parent node of the property at
258 * (or formerly at) the path returned by <code>Event.getPath</code>:
259 * <ul>
260 * <li> <code>absolutePath</code>, <code>isDeep</code>: Only events whose associated node is at
261 * <code>absolutePath</code> (or within its subtree, if <code>isDeep</code> is <code>true</code>) will be received. It
262 * is permissible to register a listener for a path where no node currently exists. </li>
263 * <li> <code>uuids</code>: Only events whose associated node has one of the UUIDs in this list will be received. If his
264 * parameter is <code>null</code> then no UUID-related restriction is placed on events received. </li>
265 * <li> <code>nodeTypeNames</code>: Only events whose associated node has one of the node types (or a subtype of one of the
266 * node types) in this list will be received. If this parameter is <code>null</code> then no node type-related restriction
267 * is placed on events received. </li>
268 * </ul>
269 * The restrictions are "ANDed" together. In other words, for a particular node to be "listened to" it must meet all the
270 * restrictions.
271 * </p>
272 * <p>
273 * Additionally, if <code>noLocal</code> is <code>true</code>, then events generated by the session through which the
274 * listener was registered are ignored. Otherwise, they are not ignored.
275 * </p>
276 * <p>
277 * The filters of an already-registered {@link WorkspaceListener} can be changed at runtime by changing the attributes and
278 * {@link WorkspaceListener#reregister() registering}.
279 * </p>
280 *
281 * @param repositoryWorkspaceName the name to be used with the session factory to obtain a session to the repository and
282 * workspace that is to be monitored
283 * @param absolutePath the absolute path of the node at or below which changes are to be monitored; may be null if all nodes
284 * in the workspace are to be monitored
285 * @param eventTypes the bitmask of the {@link Event} types that are to be monitored
286 * @param isDeep true if events below the node given by the <code>absolutePath</code> or by the <code>uuids</code> are to
287 * be processed, or false if only the events at the node
288 * @param uuids array of UUIDs of nodes that are to be monitored; may be null or empty if the UUIDs are not known
289 * @param nodeTypeNames array of node type names that are to be monitored; may be null or empty if the monitoring has no node
290 * type restrictions
291 * @param noLocal true if the events originating in the supplied workspace are to be ignored, or false if they are also to be
292 * processed.
293 * @return the listener that was created and registered to perform the monitoring
294 * @throws RepositoryException if there is a problem registering the listener
295 */
296 public WorkspaceListener monitor( String repositoryWorkspaceName,
297 String absolutePath,
298 int eventTypes,
299 boolean isDeep,
300 String[] uuids,
301 String[] nodeTypeNames,
302 boolean noLocal ) throws RepositoryException {
303 WorkspaceListener listener = new WorkspaceListener(repositoryWorkspaceName, eventTypes, absolutePath, isDeep, uuids,
304 nodeTypeNames, noLocal);
305 listener.register();
306 this.workspaceListeners.add(listener);
307 return listener;
308 }
309
310 /**
311 * Monitor the supplied workspace for {@link WorkspaceListener#DEFAULT_EVENT_TYPES default event types} on any node at or
312 * under the supplied path.
313 * <p>
314 * Monitoring is accomplished by registering a listener on the workspace, so this monitoring only has access to the
315 * information that visible to the session created by the {@link #getSessionFactory() session factory} for the given
316 * repository and workspace name.
317 * </p>
318 * <p>
319 * The listener returned from this method is not managed by this SequencingService instance. If the listener is no longer
320 * needed, it simply must be {@link ObservationManager#removeEventListener(EventListener) removed} as a listener of the
321 * workspace and garbage collected.
322 * </p>
323 *
324 * @param repositoryWorkspaceName the name to be used with the session factory to obtain a session to the repository and
325 * workspace that is to be monitored
326 * @param absolutePath the absolute path of the node at or below which changes are to be monitored; may be null if all nodes
327 * in the workspace are to be monitored
328 * @param nodeTypeNames the names of the node types that are to be monitored; may be null or empty if the monitoring has no
329 * node type restrictions
330 * @return the listener that was created and registered to perform the monitoring
331 * @throws RepositoryException if there is a problem registering the listener
332 */
333 public WorkspaceListener monitor( String repositoryWorkspaceName,
334 String absolutePath,
335 String... nodeTypeNames ) throws RepositoryException {
336 return monitor(repositoryWorkspaceName,
337 absolutePath,
338 WorkspaceListener.DEFAULT_EVENT_TYPES,
339 WorkspaceListener.DEFAULT_IS_DEEP,
340 null,
341 nodeTypeNames,
342 WorkspaceListener.DEFAULT_NO_LOCAL);
343 }
344
345 /**
346 * Monitor the supplied workspace for the supplied event types on any node in the workspace.
347 * <p>
348 * Monitoring is accomplished by registering a listener on the workspace, so this monitoring only has access to the
349 * information that visible to the session created by the {@link #getSessionFactory() session factory} for the given
350 * repository and workspace name.
351 * </p>
352 * <p>
353 * The listener returned from this method is not managed by this SequencingService instance. If the listener is no longer
354 * needed, it simply must be {@link ObservationManager#removeEventListener(EventListener) removed} as a listener of the
355 * workspace and garbage collected.
356 * </p>
357 *
358 * @param repositoryWorkspaceName the name to be used with the session factory to obtain a session to the repository and
359 * workspace that is to be monitored
360 * @param eventTypes the bitmask of the {@link Event} types that are to be monitored
361 * @param nodeTypeNames the names of the node types that are to be monitored; may be null or empty if the monitoring has no
362 * node type restrictions
363 * @return the listener that was created and registered to perform the monitoring
364 * @throws RepositoryException if there is a problem registering the listener
365 */
366 public WorkspaceListener monitor( String repositoryWorkspaceName,
367 int eventTypes,
368 String... nodeTypeNames ) throws RepositoryException {
369 return monitor(repositoryWorkspaceName,
370 WorkspaceListener.DEFAULT_ABSOLUTE_PATH,
371 eventTypes,
372 WorkspaceListener.DEFAULT_IS_DEEP,
373 null,
374 nodeTypeNames,
375 WorkspaceListener.DEFAULT_NO_LOCAL);
376 }
377
378 protected void unregisterListener( WorkspaceListener listener ) {
379 if (listener != null) this.workspaceListeners.remove(listener);
380 }
381
382 /**
383 * From section 2.8.8 of the JSR-170 specification:
384 * <p>
385 * On each persistent change, those listeners that are entitled to receive one or more events will have their onEvent method
386 * called and be passed an EventIterator. The EventIterator will contain the event bundle reflecting the persistent changes
387 * made but excluding those to which that particular listener is not entitled, according to the listeners access permissions
388 * and filters.
389 * </p>
390 *
391 * @param eventIterator
392 * @param listener
393 */
394 protected void processEvents( EventIterator eventIterator,
395 WorkspaceListener listener ) {
396 if (eventIterator == null) return;
397 List<Event> events = new ArrayList<Event>();
398 // Copy the events ...
399 while (eventIterator.hasNext()) {
400 events.add((Event)eventIterator.next());
401 }
402 if (!getAdministrator().isStarted()) {
403 this.statistics.recordIgnoredEventSet(events.size());
404 return;
405 }
406
407 // Notify the event listeners ...
408 boolean notifiedSomebody = false;
409 List<EventListener> eventListeners = this.eventListeners; // use one consistent snapshot
410 if (!eventListeners.isEmpty()) {
411 DelegatingEventIterator eventIter = new DelegatingEventIterator(events.iterator(), events.size());
412 for (EventListener eventListener : eventListeners) {
413 eventListener.onEvent(eventIter);
414 }
415 notifiedSomebody = true;
416 }
417
418 // Now create the node change events ...
419 List<NodeChangeListener> nodeChangeListeners = this.nodeChangeListeners; // use one consistent snapshot
420 if (!nodeChangeListeners.isEmpty()) {
421 final String repositoryWorkspaceName = listener.getRepositoryWorkspaceName();
422 try {
423 NodeChanges nodeChanges = NodeChanges.create(repositoryWorkspaceName, events);
424
425 // And notify the node change listeners ...
426 int nodeChangeCount = nodeChanges.size();
427 this.statistics.recordNodesChanged(nodeChangeCount);
428 for (NodeChangeListener nodeChangeListener : nodeChangeListeners) {
429 nodeChangeListener.onNodeChanges(nodeChanges);
430 }
431 } catch (Throwable t) {
432 getProblemLog().error(repositoryWorkspaceName, t);
433 }
434 notifiedSomebody = true;
435 }
436
437 if (notifiedSomebody) {
438 this.statistics.recordEventSet(events.size());
439 } else {
440 this.statistics.recordIgnoredEventSet(events.size());
441 }
442 }
443
444 protected class DelegatingEventIterator implements EventIterator {
445
446 private final Iterator<Event> events;
447 private final int size;
448 private int position = 0;
449
450 protected DelegatingEventIterator( Iterator<Event> events,
451 int size ) {
452 this.events = events;
453 this.size = size;
454 }
455
456 /**
457 * {@inheritDoc}
458 */
459 public Event nextEvent() {
460 ++position;
461 return events.next();
462 }
463
464 /**
465 * {@inheritDoc}
466 */
467 public long getPosition() {
468 return position;
469 }
470
471 /**
472 * {@inheritDoc}
473 */
474 public long getSize() {
475 return size;
476 }
477
478 /**
479 * {@inheritDoc}
480 */
481 public void skip( long skipNum ) {
482 for (int i = 0; i != skipNum; ++i) {
483 next();
484 }
485 }
486
487 /**
488 * {@inheritDoc}
489 */
490 public boolean hasNext() {
491 return events.hasNext();
492 }
493
494 /**
495 * {@inheritDoc}
496 */
497 public Object next() {
498 return events.next();
499 }
500
501 /**
502 * {@inheritDoc}
503 */
504 public void remove() {
505 // does nothing
506 }
507
508 }
509
510 /**
511 * Implementation of the {@link EventListener JCR EventListener} interface, returned by the sequencing system.
512 *
513 * @author Randall Hauch
514 */
515 @ThreadSafe
516 public class WorkspaceListener implements EventListener {
517
518 public static final boolean DEFAULT_IS_DEEP = true;
519 public static final boolean DEFAULT_NO_LOCAL = false;
520 public static final int DEFAULT_EVENT_TYPES = Event.NODE_ADDED | /* Event.NODE_REMOVED| */Event.PROPERTY_ADDED
521 | Event.PROPERTY_CHANGED /* |Event.PROPERTY_REMOVED */;
522 public static final String DEFAULT_ABSOLUTE_PATH = "/";
523
524 private final String repositoryWorkspaceName;
525 private final Set<String> uuids;
526 private final Set<String> nodeTypeNames;
527 private final int eventTypes;
528 private final String absolutePath;
529 private final boolean deep;
530 private final boolean noLocal;
531 @GuardedBy( "this" )
532 private transient Session session;
533
534 protected WorkspaceListener( String repositoryWorkspaceName,
535 int eventTypes,
536 String absPath,
537 boolean isDeep,
538 String[] uuids,
539 String[] nodeTypeNames,
540 boolean noLocal ) {
541 this.repositoryWorkspaceName = repositoryWorkspaceName;
542 this.eventTypes = eventTypes;
543 this.deep = isDeep;
544 this.noLocal = noLocal;
545 this.absolutePath = absPath != null && absPath.trim().length() != 0 ? absPath.trim() : null;
546 // Set the UUIDs ...
547 Set<String> newUuids = new HashSet<String>();
548 if (uuids != null) {
549 for (String uuid : uuids) {
550 if (uuid != null && uuid.trim().length() != 0) newUuids.add(uuid.trim());
551 }
552 }
553 this.uuids = Collections.unmodifiableSet(newUuids);
554 // Set the node type names
555 Set<String> newNodeTypeNames = new HashSet<String>();
556 if (nodeTypeNames != null) {
557 for (String nodeTypeName : nodeTypeNames) {
558 if (nodeTypeName != null && nodeTypeName.trim().length() != 0) newNodeTypeNames.add(nodeTypeName.trim());
559 }
560 }
561 this.nodeTypeNames = Collections.unmodifiableSet(newNodeTypeNames);
562 }
563
564 /**
565 * @return repositoryWorkspaceName
566 */
567 public String getRepositoryWorkspaceName() {
568 return this.repositoryWorkspaceName;
569 }
570
571 /**
572 * @return eventTypes
573 */
574 public int getEventTypes() {
575 return this.eventTypes;
576 }
577
578 /**
579 * @return absolutePath
580 */
581 public String getAbsolutePath() {
582 return this.absolutePath;
583 }
584
585 /**
586 * @return deep
587 */
588 public boolean isDeep() {
589 return this.deep;
590 }
591
592 /**
593 * @return noLocal
594 */
595 public boolean isNoLocal() {
596 return this.noLocal;
597 }
598
599 /**
600 * @return uuids
601 */
602 public Set<String> getUuids() {
603 return this.uuids;
604 }
605
606 /**
607 * @return nodeTypeNames
608 */
609 public Set<String> getNodeTypeNames() {
610 return this.nodeTypeNames;
611 }
612
613 public synchronized boolean isRegistered() {
614 if (this.session != null && getAdministrator().isShutdown()) {
615 // This sequencing system has been shutdown, so unregister this listener
616 try {
617 unregister();
618 } catch (RepositoryException re) {
619 String msg = "Error unregistering workspace listener after sequencing system has been shutdow.";
620 Logger.getLogger(this.getClass()).debug(re, msg);
621 }
622 }
623 return this.session != null;
624 }
625
626 public synchronized WorkspaceListener register() throws UnsupportedRepositoryOperationException, RepositoryException {
627 if (this.session != null) return this;
628 this.session = ObservationService.this.getSessionFactory().createSession(this.repositoryWorkspaceName);
629 String[] uuids = this.uuids.isEmpty() ? null : this.uuids.toArray(new String[this.uuids.size()]);
630 String[] nodeTypeNames = this.nodeTypeNames.isEmpty() ? null : this.nodeTypeNames.toArray(new String[this.nodeTypeNames.size()]);
631 this.session.getWorkspace().getObservationManager().addEventListener(this,
632 eventTypes,
633 absolutePath,
634 deep,
635 uuids,
636 nodeTypeNames,
637 noLocal);
638 return this;
639 }
640
641 public synchronized WorkspaceListener unregister() throws UnsupportedRepositoryOperationException, RepositoryException {
642 if (this.session == null) return this;
643 try {
644 if (this.session.isLive()) {
645 this.session.getWorkspace().getObservationManager().removeEventListener(this);
646 this.session.logout();
647 }
648 } finally {
649 this.session = null;
650 unregisterListener(this);
651 }
652 return this;
653 }
654
655 public synchronized WorkspaceListener reregister() throws UnsupportedRepositoryOperationException, RepositoryException {
656 unregister();
657 register();
658 return this;
659 }
660
661 /**
662 * {@inheritDoc}
663 */
664 public void onEvent( EventIterator events ) {
665 if (events != null) {
666 if (getAdministrator().isShutdown()) {
667 // This sequencing system has been shutdown, so unregister this listener
668 try {
669 unregister();
670 } catch (RepositoryException re) {
671 String msg = "Error unregistering workspace listener after sequencing system has been shutdow.";
672 Logger.getLogger(this.getClass()).debug(re, msg);
673 }
674 } else {
675 ObservationService.this.processEvents(events, this);
676 }
677 }
678 }
679 }
680
681 /**
682 * The statistics for the system. Each sequencing system has an instance of this class that is updated.
683 *
684 * @author Randall Hauch
685 */
686 @ThreadSafe
687 public class Statistics {
688
689 @GuardedBy( "lock" )
690 private long numberOfEventsIgnored;
691 @GuardedBy( "lock" )
692 private long numberOfEventsEnqueued;
693 @GuardedBy( "lock" )
694 private long numberOfEventSetsIgnored;
695 @GuardedBy( "lock" )
696 private long numberOfEventSetsEnqueued;
697 private final AtomicLong numberOfNodeChangesEnqueued = new AtomicLong(0);
698 private final ReadWriteLock lock = new ReentrantReadWriteLock();
699 private final AtomicLong startTime;
700
701 protected Statistics() {
702 startTime = new AtomicLong(System.currentTimeMillis());
703 }
704
705 public Statistics reset() {
706 try {
707 lock.writeLock().lock();
708 this.startTime.set(System.currentTimeMillis());
709 this.numberOfEventsIgnored = 0;
710 this.numberOfEventsEnqueued = 0;
711 this.numberOfEventSetsIgnored = 0;
712 this.numberOfEventSetsEnqueued = 0;
713 this.numberOfNodeChangesEnqueued.set(0);
714 } finally {
715 lock.writeLock().unlock();
716 }
717 return this;
718 }
719
720 /**
721 * @return the system time when the statistics were started
722 */
723 public long getStartTime() {
724 return this.startTime.get();
725 }
726
727 /**
728 * @return the number of node changes that were processed
729 */
730 public long getNumberOfNodeChangesEnqueued() {
731 return this.numberOfNodeChangesEnqueued.get();
732 }
733
734 /**
735 * @return the number of events that were ignored because the system was not running
736 */
737 public long getNumberOfEventsIgnored() {
738 try {
739 lock.readLock().lock();
740 return this.numberOfEventsIgnored;
741 } finally {
742 lock.readLock().unlock();
743 }
744 }
745
746 /**
747 * @return the number of events that were enqueued for processing
748 */
749 public long getNumberOfEventsEnqueued() {
750 try {
751 lock.readLock().lock();
752 return this.numberOfEventsEnqueued;
753 } finally {
754 lock.readLock().unlock();
755 }
756 }
757
758 /**
759 * @return the number of event sets (transactions) that were enqueued for processing
760 */
761 public long getNumberOfEventSetsEnqueued() {
762 try {
763 lock.readLock().lock();
764 return this.numberOfEventSetsEnqueued;
765 } finally {
766 lock.readLock().unlock();
767 }
768 }
769
770 /**
771 * @return the number of event sets (transactions) that were ignored because the system was not running
772 */
773 public long getNumberOfEventSetsIgnored() {
774 try {
775 lock.readLock().lock();
776 return this.numberOfEventSetsIgnored;
777 } finally {
778 lock.readLock().unlock();
779 }
780 }
781
782 protected void recordNodesChanged( long changeCount ) {
783 this.numberOfNodeChangesEnqueued.addAndGet(changeCount);
784 }
785
786 protected void recordEventSet( long eventsInSet ) {
787 try {
788 lock.writeLock().lock();
789 this.numberOfEventsEnqueued += eventsInSet;
790 ++this.numberOfEventSetsEnqueued;
791 } finally {
792 lock.writeLock().unlock();
793 }
794 }
795
796 protected void recordIgnoredEventSet( long eventsInSet ) {
797 try {
798 lock.writeLock().lock();
799 this.numberOfEventsIgnored += eventsInSet;
800 this.numberOfEventSetsIgnored += 1;
801 ++this.numberOfEventSetsEnqueued;
802 } finally {
803 lock.writeLock().unlock();
804 }
805 }
806 }
807 }