001    /*
002     * JBoss DNA (http://www.jboss.org/dna)
003     * See the COPYRIGHT.txt file distributed with this work for information
004     * regarding copyright ownership.  Some portions may be licensed
005     * to Red Hat, Inc. under one or more contributor license agreements.
006     * See the AUTHORS.txt file in the distribution for a full listing of 
007     * individual contributors. 
008     *
009     * JBoss DNA is free software. Unless otherwise indicated, all code in JBoss DNA
010     * is licensed to you under the terms of the GNU Lesser General Public License as
011     * published by the Free Software Foundation; either version 2.1 of
012     * the License, or (at your option) any later version.
013     *
014     * JBoss DNA is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
017     * Lesser General Public License for more details.
018     *
019     * You should have received a copy of the GNU Lesser General Public
020     * License along with this software; if not, write to the Free
021     * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
022     * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
023     */
024    package org.jboss.dna.repository.observation;
025    
026    import java.util.ArrayList;
027    import java.util.Collections;
028    import java.util.HashSet;
029    import java.util.Iterator;
030    import java.util.List;
031    import java.util.Set;
032    import java.util.concurrent.CopyOnWriteArrayList;
033    import java.util.concurrent.TimeUnit;
034    import java.util.concurrent.atomic.AtomicLong;
035    import java.util.concurrent.locks.ReadWriteLock;
036    import java.util.concurrent.locks.ReentrantReadWriteLock;
037    import javax.jcr.RepositoryException;
038    import javax.jcr.Session;
039    import javax.jcr.UnsupportedRepositoryOperationException;
040    import javax.jcr.observation.Event;
041    import javax.jcr.observation.EventIterator;
042    import javax.jcr.observation.EventListener;
043    import javax.jcr.observation.ObservationManager;
044    import net.jcip.annotations.GuardedBy;
045    import net.jcip.annotations.ThreadSafe;
046    import org.jboss.dna.common.util.CheckArg;
047    import org.jboss.dna.common.util.Logger;
048    import org.jboss.dna.repository.RepositoryI18n;
049    import org.jboss.dna.repository.service.AbstractServiceAdministrator;
050    import org.jboss.dna.repository.service.AdministeredService;
051    import org.jboss.dna.repository.service.ServiceAdministrator;
052    import org.jboss.dna.repository.util.SessionFactory;
053    
054    /**
055     * @author Randall Hauch
056     */
057    public class ObservationService implements AdministeredService {
058    
059        /**
060         * Interface to which problems with particular events are logged.
061         * 
062         * @author Randall Hauch
063         */
064        public static interface ProblemLog {
065    
066            void error( String repositoryWorkspaceName,
067                        Throwable t );
068        }
069    
070        /**
071         * Problem log implementation that records problems in the log.
072         * 
073         * @author Randall Hauch
074         */
075        public class DefaultProblemLog implements ProblemLog {
076    
077            /**
078             * {@inheritDoc}
079             */
080            public void error( String repositoryWorkspaceName,
081                               Throwable t ) {
082                getLogger().error(t, RepositoryI18n.errorProcessingEvents, repositoryWorkspaceName);
083            }
084        }
085    
086        protected static class NoOpProblemLog implements ProblemLog {
087    
088            /**
089             * {@inheritDoc}
090             */
091            public void error( String repositoryWorkspaceName,
092                               Throwable t ) {
093            }
094        }
095    
096        public static final ProblemLog NO_OP_PROBLEM_LOG = new NoOpProblemLog();
097    
098        /**
099         * The administrative component for this service.
100         * 
101         * @author Randall Hauch
102         */
103        protected class Administrator extends AbstractServiceAdministrator {
104    
105            protected Administrator() {
106                super(RepositoryI18n.observationServiceName, State.STARTED);
107            }
108    
109            /**
110             * {@inheritDoc}
111             */
112            @Override
113            protected void doShutdown( State fromState ) {
114                super.doShutdown(fromState);
115                shutdownService();
116            }
117    
118            /**
119             * {@inheritDoc}
120             */
121            public boolean awaitTermination( long timeout,
122                                             TimeUnit unit ) {
123                return true;
124            }
125    
126            /**
127             * {@inheritDoc}
128             */
129            @Override
130            protected boolean doCheckIsTerminated() {
131                return true;
132            }
133    
134        }
135    
136        private Logger logger = Logger.getLogger(this.getClass());
137        private ProblemLog problemLog = new DefaultProblemLog();
138        private final Statistics statistics = new Statistics();
139        private final SessionFactory sessionFactory;
140        private final CopyOnWriteArrayList<WorkspaceListener> workspaceListeners = new CopyOnWriteArrayList<WorkspaceListener>();
141        private final CopyOnWriteArrayList<EventListener> eventListeners = new CopyOnWriteArrayList<EventListener>();
142        private final CopyOnWriteArrayList<NodeChangeListener> nodeChangeListeners = new CopyOnWriteArrayList<NodeChangeListener>();
143        private final Administrator administrator = new Administrator();
144    
145        public ObservationService( SessionFactory sessionFactory ) {
146            CheckArg.isNotNull(sessionFactory, "session factory");
147            this.sessionFactory = sessionFactory;
148        }
149    
150        /**
151         * {@inheritDoc}
152         */
153        public ServiceAdministrator getAdministrator() {
154            return this.administrator;
155        }
156    
157        /**
158         * @return sessionFactory
159         */
160        public SessionFactory getSessionFactory() {
161            return this.sessionFactory;
162        }
163    
164        /**
165         * Get the statistics for this system.
166         * 
167         * @return the statistics, which are updated as the system is used
168         */
169        public Statistics getStatistics() {
170            return this.statistics;
171        }
172    
173        /**
174         * Get the logger for this system
175         * 
176         * @return the logger
177         */
178        public Logger getLogger() {
179            return this.logger;
180        }
181    
182        /**
183         * Set the logger for this system.
184         * 
185         * @param logger the logger, or null if the standard logging should be used
186         */
187        public void setLogger( Logger logger ) {
188            this.logger = logger != null ? logger : Logger.getLogger(this.getClass());
189        }
190    
191        /**
192         * @return problemLog
193         */
194        public ProblemLog getProblemLog() {
195            return this.problemLog;
196        }
197    
198        /**
199         * Set the problem log that will be notified of problems handling events. By default, such problems are sent to the log.
200         * 
201         * @param problemLog the new problem log implementation; if null, then the default problem log is used
202         */
203        public void setProblemLog( ProblemLog problemLog ) {
204            this.problemLog = problemLog != null ? problemLog : new DefaultProblemLog();
205        }
206    
207        public boolean addListener( EventListener listener ) {
208            if (listener == null) return false;
209            return this.eventListeners.addIfAbsent(listener);
210        }
211    
212        public boolean removeListener( EventListener listener ) {
213            if (listener == null) return false;
214            return this.eventListeners.remove(listener);
215        }
216    
217        public boolean addListener( NodeChangeListener listener ) {
218            return this.nodeChangeListeners.addIfAbsent(listener);
219        }
220    
221        public boolean removeListener( NodeChangeListener listener ) {
222            if (listener == null) return false;
223            return this.nodeChangeListeners.remove(listener);
224        }
225    
226        protected void shutdownService() {
227            // Unregister all listeners ...
228            for (WorkspaceListener listener : this.workspaceListeners) {
229                try {
230                    listener.unregister();
231                } catch (RepositoryException e) {
232                    this.logger.error(e, RepositoryI18n.errorUnregisteringWorkspaceListenerWhileShuttingDownObservationService);
233                }
234            }
235        }
236    
237        /**
238         * Monitor the supplied workspace for events of the given type on any node at or under the supplied path.
239         * <p>
240         * Monitoring is accomplished by registering a listener on the workspace, so this monitoring only has access to the
241         * information that visible to the session created by the {@link #getSessionFactory() session factory} for the given
242         * repository and workspace name.
243         * </p>
244         * <p>
245         * The listener returned from this method is not managed by this SequencingService instance. If the listener is no longer
246         * needed, it simply must be {@link ObservationManager#removeEventListener(EventListener) removed} as a listener of the
247         * workspace and garbage collected. If this service is {@link ServiceAdministrator#shutdown() shutdown} while there are still
248         * active listeners, those listeners will disconnect themselves from this service and the workspace with which they're
249         * registered when they attempt to forward the next events.
250         * </p>
251         * <p>
252         * The set of events that are monitored can be filtered by specifying restrictions based on characteristics of the node
253         * associated with the event. In the case of event types {@link Event#NODE_ADDED NODE_ADDED} and
254         * {@link Event#NODE_REMOVED NODE_REMOVED}, the node associated with an event is the node at (or formerly at) the path
255         * returned by {@link Event#getPath() Event.getPath()}. In the case of event types
256         * {@link Event#PROPERTY_ADDED PROPERTY_ADDED}, {@link Event#PROPERTY_REMOVED PROPERTY_REMOVED} and
257         * {@link Event#PROPERTY_CHANGED PROPERTY_CHANGED}, the node associated with an event is the parent node of the property at
258         * (or formerly at) the path returned by <code>Event.getPath</code>:
259         * <ul>
260         * <li> <code>absolutePath</code>, <code>isDeep</code>: Only events whose associated node is at
261         * <code>absolutePath</code> (or within its subtree, if <code>isDeep</code> is <code>true</code>) will be received. It
262         * is permissible to register a listener for a path where no node currently exists. </li>
263         * <li> <code>uuids</code>: Only events whose associated node has one of the UUIDs in this list will be received. If his
264         * parameter is <code>null</code> then no UUID-related restriction is placed on events received. </li>
265         * <li> <code>nodeTypeNames</code>: Only events whose associated node has one of the node types (or a subtype of one of the
266         * node types) in this list will be received. If this parameter is <code>null</code> then no node type-related restriction
267         * is placed on events received. </li>
268         * </ul>
269         * The restrictions are "ANDed" together. In other words, for a particular node to be "listened to" it must meet all the
270         * restrictions.
271         * </p>
272         * <p>
273         * Additionally, if <code>noLocal</code> is <code>true</code>, then events generated by the session through which the
274         * listener was registered are ignored. Otherwise, they are not ignored.
275         * </p>
276         * <p>
277         * The filters of an already-registered {@link WorkspaceListener} can be changed at runtime by changing the attributes and
278         * {@link WorkspaceListener#reregister() registering}.
279         * </p>
280         * 
281         * @param repositoryWorkspaceName the name to be used with the session factory to obtain a session to the repository and
282         *        workspace that is to be monitored
283         * @param absolutePath the absolute path of the node at or below which changes are to be monitored; may be null if all nodes
284         *        in the workspace are to be monitored
285         * @param eventTypes the bitmask of the {@link Event} types that are to be monitored
286         * @param isDeep true if events below the node given by the <code>absolutePath</code> or by the <code>uuids</code> are to
287         *        be processed, or false if only the events at the node
288         * @param uuids array of UUIDs of nodes that are to be monitored; may be null or empty if the UUIDs are not known
289         * @param nodeTypeNames array of node type names that are to be monitored; may be null or empty if the monitoring has no node
290         *        type restrictions
291         * @param noLocal true if the events originating in the supplied workspace are to be ignored, or false if they are also to be
292         *        processed.
293         * @return the listener that was created and registered to perform the monitoring
294         * @throws RepositoryException if there is a problem registering the listener
295         */
296        public WorkspaceListener monitor( String repositoryWorkspaceName,
297                                          String absolutePath,
298                                          int eventTypes,
299                                          boolean isDeep,
300                                          String[] uuids,
301                                          String[] nodeTypeNames,
302                                          boolean noLocal ) throws RepositoryException {
303            WorkspaceListener listener = new WorkspaceListener(repositoryWorkspaceName, eventTypes, absolutePath, isDeep, uuids,
304                                                               nodeTypeNames, noLocal);
305            listener.register();
306            this.workspaceListeners.add(listener);
307            return listener;
308        }
309    
310        /**
311         * Monitor the supplied workspace for {@link WorkspaceListener#DEFAULT_EVENT_TYPES default event types} on any node at or
312         * under the supplied path.
313         * <p>
314         * Monitoring is accomplished by registering a listener on the workspace, so this monitoring only has access to the
315         * information that visible to the session created by the {@link #getSessionFactory() session factory} for the given
316         * repository and workspace name.
317         * </p>
318         * <p>
319         * The listener returned from this method is not managed by this SequencingService instance. If the listener is no longer
320         * needed, it simply must be {@link ObservationManager#removeEventListener(EventListener) removed} as a listener of the
321         * workspace and garbage collected.
322         * </p>
323         * 
324         * @param repositoryWorkspaceName the name to be used with the session factory to obtain a session to the repository and
325         *        workspace that is to be monitored
326         * @param absolutePath the absolute path of the node at or below which changes are to be monitored; may be null if all nodes
327         *        in the workspace are to be monitored
328         * @param nodeTypeNames the names of the node types that are to be monitored; may be null or empty if the monitoring has no
329         *        node type restrictions
330         * @return the listener that was created and registered to perform the monitoring
331         * @throws RepositoryException if there is a problem registering the listener
332         */
333        public WorkspaceListener monitor( String repositoryWorkspaceName,
334                                          String absolutePath,
335                                          String... nodeTypeNames ) throws RepositoryException {
336            return monitor(repositoryWorkspaceName,
337                           absolutePath,
338                           WorkspaceListener.DEFAULT_EVENT_TYPES,
339                           WorkspaceListener.DEFAULT_IS_DEEP,
340                           null,
341                           nodeTypeNames,
342                           WorkspaceListener.DEFAULT_NO_LOCAL);
343        }
344    
345        /**
346         * Monitor the supplied workspace for the supplied event types on any node in the workspace.
347         * <p>
348         * Monitoring is accomplished by registering a listener on the workspace, so this monitoring only has access to the
349         * information that visible to the session created by the {@link #getSessionFactory() session factory} for the given
350         * repository and workspace name.
351         * </p>
352         * <p>
353         * The listener returned from this method is not managed by this SequencingService instance. If the listener is no longer
354         * needed, it simply must be {@link ObservationManager#removeEventListener(EventListener) removed} as a listener of the
355         * workspace and garbage collected.
356         * </p>
357         * 
358         * @param repositoryWorkspaceName the name to be used with the session factory to obtain a session to the repository and
359         *        workspace that is to be monitored
360         * @param eventTypes the bitmask of the {@link Event} types that are to be monitored
361         * @param nodeTypeNames the names of the node types that are to be monitored; may be null or empty if the monitoring has no
362         *        node type restrictions
363         * @return the listener that was created and registered to perform the monitoring
364         * @throws RepositoryException if there is a problem registering the listener
365         */
366        public WorkspaceListener monitor( String repositoryWorkspaceName,
367                                          int eventTypes,
368                                          String... nodeTypeNames ) throws RepositoryException {
369            return monitor(repositoryWorkspaceName,
370                           WorkspaceListener.DEFAULT_ABSOLUTE_PATH,
371                           eventTypes,
372                           WorkspaceListener.DEFAULT_IS_DEEP,
373                           null,
374                           nodeTypeNames,
375                           WorkspaceListener.DEFAULT_NO_LOCAL);
376        }
377    
378        protected void unregisterListener( WorkspaceListener listener ) {
379            if (listener != null) this.workspaceListeners.remove(listener);
380        }
381    
382        /**
383         * From section 2.8.8 of the JSR-170 specification:
384         * <p>
385         * On each persistent change, those listeners that are entitled to receive one or more events will have their onEvent method
386         * called and be passed an EventIterator. The EventIterator will contain the event bundle reflecting the persistent changes
387         * made but excluding those to which that particular listener is not entitled, according to the listeners access permissions
388         * and filters.
389         * </p>
390         * 
391         * @param eventIterator
392         * @param listener
393         */
394        protected void processEvents( EventIterator eventIterator,
395                                      WorkspaceListener listener ) {
396            if (eventIterator == null) return;
397            List<Event> events = new ArrayList<Event>();
398            // Copy the events ...
399            while (eventIterator.hasNext()) {
400                events.add((Event)eventIterator.next());
401            }
402            if (!getAdministrator().isStarted()) {
403                this.statistics.recordIgnoredEventSet(events.size());
404                return;
405            }
406    
407            // Notify the event listeners ...
408            boolean notifiedSomebody = false;
409            List<EventListener> eventListeners = this.eventListeners; // use one consistent snapshot
410            if (!eventListeners.isEmpty()) {
411                DelegatingEventIterator eventIter = new DelegatingEventIterator(events.iterator(), events.size());
412                for (EventListener eventListener : eventListeners) {
413                    eventListener.onEvent(eventIter);
414                }
415                notifiedSomebody = true;
416            }
417    
418            // Now create the node change events ...
419            List<NodeChangeListener> nodeChangeListeners = this.nodeChangeListeners; // use one consistent snapshot
420            if (!nodeChangeListeners.isEmpty()) {
421                final String repositoryWorkspaceName = listener.getRepositoryWorkspaceName();
422                try {
423                    NodeChanges nodeChanges = NodeChanges.create(repositoryWorkspaceName, events);
424    
425                    // And notify the node change listeners ...
426                    int nodeChangeCount = nodeChanges.size();
427                    this.statistics.recordNodesChanged(nodeChangeCount);
428                    for (NodeChangeListener nodeChangeListener : nodeChangeListeners) {
429                        nodeChangeListener.onNodeChanges(nodeChanges);
430                    }
431                } catch (Throwable t) {
432                    getProblemLog().error(repositoryWorkspaceName, t);
433                }
434                notifiedSomebody = true;
435            }
436    
437            if (notifiedSomebody) {
438                this.statistics.recordEventSet(events.size());
439            } else {
440                this.statistics.recordIgnoredEventSet(events.size());
441            }
442        }
443    
444        protected class DelegatingEventIterator implements EventIterator {
445    
446            private final Iterator<Event> events;
447            private final int size;
448            private int position = 0;
449    
450            protected DelegatingEventIterator( Iterator<Event> events,
451                                               int size ) {
452                this.events = events;
453                this.size = size;
454            }
455    
456            /**
457             * {@inheritDoc}
458             */
459            public Event nextEvent() {
460                ++position;
461                return events.next();
462            }
463    
464            /**
465             * {@inheritDoc}
466             */
467            public long getPosition() {
468                return position;
469            }
470    
471            /**
472             * {@inheritDoc}
473             */
474            public long getSize() {
475                return size;
476            }
477    
478            /**
479             * {@inheritDoc}
480             */
481            public void skip( long skipNum ) {
482                for (int i = 0; i != skipNum; ++i) {
483                    next();
484                }
485            }
486    
487            /**
488             * {@inheritDoc}
489             */
490            public boolean hasNext() {
491                return events.hasNext();
492            }
493    
494            /**
495             * {@inheritDoc}
496             */
497            public Object next() {
498                return events.next();
499            }
500    
501            /**
502             * {@inheritDoc}
503             */
504            public void remove() {
505                // does nothing
506            }
507    
508        }
509    
510        /**
511         * Implementation of the {@link EventListener JCR EventListener} interface, returned by the sequencing system.
512         * 
513         * @author Randall Hauch
514         */
515        @ThreadSafe
516        public class WorkspaceListener implements EventListener {
517    
518            public static final boolean DEFAULT_IS_DEEP = true;
519            public static final boolean DEFAULT_NO_LOCAL = false;
520            public static final int DEFAULT_EVENT_TYPES = Event.NODE_ADDED | /* Event.NODE_REMOVED| */Event.PROPERTY_ADDED
521                                                          | Event.PROPERTY_CHANGED /* |Event.PROPERTY_REMOVED */;
522            public static final String DEFAULT_ABSOLUTE_PATH = "/";
523    
524            private final String repositoryWorkspaceName;
525            private final Set<String> uuids;
526            private final Set<String> nodeTypeNames;
527            private final int eventTypes;
528            private final String absolutePath;
529            private final boolean deep;
530            private final boolean noLocal;
531            @GuardedBy( "this" )
532            private transient Session session;
533    
534            protected WorkspaceListener( String repositoryWorkspaceName,
535                                         int eventTypes,
536                                         String absPath,
537                                         boolean isDeep,
538                                         String[] uuids,
539                                         String[] nodeTypeNames,
540                                         boolean noLocal ) {
541                this.repositoryWorkspaceName = repositoryWorkspaceName;
542                this.eventTypes = eventTypes;
543                this.deep = isDeep;
544                this.noLocal = noLocal;
545                this.absolutePath = absPath != null && absPath.trim().length() != 0 ? absPath.trim() : null;
546                // Set the UUIDs ...
547                Set<String> newUuids = new HashSet<String>();
548                if (uuids != null) {
549                    for (String uuid : uuids) {
550                        if (uuid != null && uuid.trim().length() != 0) newUuids.add(uuid.trim());
551                    }
552                }
553                this.uuids = Collections.unmodifiableSet(newUuids);
554                // Set the node type names
555                Set<String> newNodeTypeNames = new HashSet<String>();
556                if (nodeTypeNames != null) {
557                    for (String nodeTypeName : nodeTypeNames) {
558                        if (nodeTypeName != null && nodeTypeName.trim().length() != 0) newNodeTypeNames.add(nodeTypeName.trim());
559                    }
560                }
561                this.nodeTypeNames = Collections.unmodifiableSet(newNodeTypeNames);
562            }
563    
564            /**
565             * @return repositoryWorkspaceName
566             */
567            public String getRepositoryWorkspaceName() {
568                return this.repositoryWorkspaceName;
569            }
570    
571            /**
572             * @return eventTypes
573             */
574            public int getEventTypes() {
575                return this.eventTypes;
576            }
577    
578            /**
579             * @return absolutePath
580             */
581            public String getAbsolutePath() {
582                return this.absolutePath;
583            }
584    
585            /**
586             * @return deep
587             */
588            public boolean isDeep() {
589                return this.deep;
590            }
591    
592            /**
593             * @return noLocal
594             */
595            public boolean isNoLocal() {
596                return this.noLocal;
597            }
598    
599            /**
600             * @return uuids
601             */
602            public Set<String> getUuids() {
603                return this.uuids;
604            }
605    
606            /**
607             * @return nodeTypeNames
608             */
609            public Set<String> getNodeTypeNames() {
610                return this.nodeTypeNames;
611            }
612    
613            public synchronized boolean isRegistered() {
614                if (this.session != null && getAdministrator().isShutdown()) {
615                    // This sequencing system has been shutdown, so unregister this listener
616                    try {
617                        unregister();
618                    } catch (RepositoryException re) {
619                        String msg = "Error unregistering workspace listener after sequencing system has been shutdow.";
620                        Logger.getLogger(this.getClass()).debug(re, msg);
621                    }
622                }
623                return this.session != null;
624            }
625    
626            public synchronized WorkspaceListener register() throws UnsupportedRepositoryOperationException, RepositoryException {
627                if (this.session != null) return this;
628                this.session = ObservationService.this.getSessionFactory().createSession(this.repositoryWorkspaceName);
629                String[] uuids = this.uuids.isEmpty() ? null : this.uuids.toArray(new String[this.uuids.size()]);
630                String[] nodeTypeNames = this.nodeTypeNames.isEmpty() ? null : this.nodeTypeNames.toArray(new String[this.nodeTypeNames.size()]);
631                this.session.getWorkspace().getObservationManager().addEventListener(this,
632                                                                                     eventTypes,
633                                                                                     absolutePath,
634                                                                                     deep,
635                                                                                     uuids,
636                                                                                     nodeTypeNames,
637                                                                                     noLocal);
638                return this;
639            }
640    
641            public synchronized WorkspaceListener unregister() throws UnsupportedRepositoryOperationException, RepositoryException {
642                if (this.session == null) return this;
643                try {
644                    if (this.session.isLive()) {
645                        this.session.getWorkspace().getObservationManager().removeEventListener(this);
646                        this.session.logout();
647                    }
648                } finally {
649                    this.session = null;
650                    unregisterListener(this);
651                }
652                return this;
653            }
654    
655            public synchronized WorkspaceListener reregister() throws UnsupportedRepositoryOperationException, RepositoryException {
656                unregister();
657                register();
658                return this;
659            }
660    
661            /**
662             * {@inheritDoc}
663             */
664            public void onEvent( EventIterator events ) {
665                if (events != null) {
666                    if (getAdministrator().isShutdown()) {
667                        // This sequencing system has been shutdown, so unregister this listener
668                        try {
669                            unregister();
670                        } catch (RepositoryException re) {
671                            String msg = "Error unregistering workspace listener after sequencing system has been shutdow.";
672                            Logger.getLogger(this.getClass()).debug(re, msg);
673                        }
674                    } else {
675                        ObservationService.this.processEvents(events, this);
676                    }
677                }
678            }
679        }
680    
681        /**
682         * The statistics for the system. Each sequencing system has an instance of this class that is updated.
683         * 
684         * @author Randall Hauch
685         */
686        @ThreadSafe
687        public class Statistics {
688    
689            @GuardedBy( "lock" )
690            private long numberOfEventsIgnored;
691            @GuardedBy( "lock" )
692            private long numberOfEventsEnqueued;
693            @GuardedBy( "lock" )
694            private long numberOfEventSetsIgnored;
695            @GuardedBy( "lock" )
696            private long numberOfEventSetsEnqueued;
697            private final AtomicLong numberOfNodeChangesEnqueued = new AtomicLong(0);
698            private final ReadWriteLock lock = new ReentrantReadWriteLock();
699            private final AtomicLong startTime;
700    
701            protected Statistics() {
702                startTime = new AtomicLong(System.currentTimeMillis());
703            }
704    
705            public Statistics reset() {
706                try {
707                    lock.writeLock().lock();
708                    this.startTime.set(System.currentTimeMillis());
709                    this.numberOfEventsIgnored = 0;
710                    this.numberOfEventsEnqueued = 0;
711                    this.numberOfEventSetsIgnored = 0;
712                    this.numberOfEventSetsEnqueued = 0;
713                    this.numberOfNodeChangesEnqueued.set(0);
714                } finally {
715                    lock.writeLock().unlock();
716                }
717                return this;
718            }
719    
720            /**
721             * @return the system time when the statistics were started
722             */
723            public long getStartTime() {
724                return this.startTime.get();
725            }
726    
727            /**
728             * @return the number of node changes that were processed
729             */
730            public long getNumberOfNodeChangesEnqueued() {
731                return this.numberOfNodeChangesEnqueued.get();
732            }
733    
734            /**
735             * @return the number of events that were ignored because the system was not running
736             */
737            public long getNumberOfEventsIgnored() {
738                try {
739                    lock.readLock().lock();
740                    return this.numberOfEventsIgnored;
741                } finally {
742                    lock.readLock().unlock();
743                }
744            }
745    
746            /**
747             * @return the number of events that were enqueued for processing
748             */
749            public long getNumberOfEventsEnqueued() {
750                try {
751                    lock.readLock().lock();
752                    return this.numberOfEventsEnqueued;
753                } finally {
754                    lock.readLock().unlock();
755                }
756            }
757    
758            /**
759             * @return the number of event sets (transactions) that were enqueued for processing
760             */
761            public long getNumberOfEventSetsEnqueued() {
762                try {
763                    lock.readLock().lock();
764                    return this.numberOfEventSetsEnqueued;
765                } finally {
766                    lock.readLock().unlock();
767                }
768            }
769    
770            /**
771             * @return the number of event sets (transactions) that were ignored because the system was not running
772             */
773            public long getNumberOfEventSetsIgnored() {
774                try {
775                    lock.readLock().lock();
776                    return this.numberOfEventSetsIgnored;
777                } finally {
778                    lock.readLock().unlock();
779                }
780            }
781    
782            protected void recordNodesChanged( long changeCount ) {
783                this.numberOfNodeChangesEnqueued.addAndGet(changeCount);
784            }
785    
786            protected void recordEventSet( long eventsInSet ) {
787                try {
788                    lock.writeLock().lock();
789                    this.numberOfEventsEnqueued += eventsInSet;
790                    ++this.numberOfEventSetsEnqueued;
791                } finally {
792                    lock.writeLock().unlock();
793                }
794            }
795    
796            protected void recordIgnoredEventSet( long eventsInSet ) {
797                try {
798                    lock.writeLock().lock();
799                    this.numberOfEventsIgnored += eventsInSet;
800                    this.numberOfEventSetsIgnored += 1;
801                    ++this.numberOfEventSetsEnqueued;
802                } finally {
803                    lock.writeLock().unlock();
804                }
805            }
806        }
807    }