001 /* 002 * JBoss DNA (http://www.jboss.org/dna) 003 * See the COPYRIGHT.txt file distributed with this work for information 004 * regarding copyright ownership. Some portions may be licensed 005 * to Red Hat, Inc. under one or more contributor license agreements. 006 * See the AUTHORS.txt file in the distribution for a full listing of 007 * individual contributors. 008 * 009 * JBoss DNA is free software. Unless otherwise indicated, all code in JBoss DNA 010 * is licensed to you under the terms of the GNU Lesser General Public License as 011 * published by the Free Software Foundation; either version 2.1 of 012 * the License, or (at your option) any later version. 013 * 014 * JBoss DNA is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 017 * Lesser General Public License for more details. 018 * 019 * You should have received a copy of the GNU Lesser General Public 020 * License along with this software; if not, write to the Free 021 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 022 * 02110-1301 USA, or see the FSF site: http://www.fsf.org. 023 */ 024 package org.jboss.dna.repository.observation; 025 026 import java.util.ArrayList; 027 import java.util.Collections; 028 import java.util.HashSet; 029 import java.util.Iterator; 030 import java.util.List; 031 import java.util.Set; 032 import java.util.concurrent.CopyOnWriteArrayList; 033 import java.util.concurrent.TimeUnit; 034 import java.util.concurrent.atomic.AtomicLong; 035 import java.util.concurrent.locks.ReadWriteLock; 036 import java.util.concurrent.locks.ReentrantReadWriteLock; 037 import javax.jcr.RepositoryException; 038 import javax.jcr.Session; 039 import javax.jcr.UnsupportedRepositoryOperationException; 040 import javax.jcr.observation.Event; 041 import javax.jcr.observation.EventIterator; 042 import javax.jcr.observation.EventListener; 043 import javax.jcr.observation.ObservationManager; 044 import net.jcip.annotations.GuardedBy; 045 import net.jcip.annotations.ThreadSafe; 046 import org.jboss.dna.common.util.CheckArg; 047 import org.jboss.dna.common.util.Logger; 048 import org.jboss.dna.repository.RepositoryI18n; 049 import org.jboss.dna.repository.service.AbstractServiceAdministrator; 050 import org.jboss.dna.repository.service.AdministeredService; 051 import org.jboss.dna.repository.service.ServiceAdministrator; 052 import org.jboss.dna.repository.util.SessionFactory; 053 054 /** 055 * @author Randall Hauch 056 */ 057 public class ObservationService implements AdministeredService { 058 059 /** 060 * Interface to which problems with particular events are logged. 061 * 062 * @author Randall Hauch 063 */ 064 public static interface ProblemLog { 065 066 void error( String repositoryWorkspaceName, 067 Throwable t ); 068 } 069 070 /** 071 * Problem log implementation that records problems in the log. 072 * 073 * @author Randall Hauch 074 */ 075 public class DefaultProblemLog implements ProblemLog { 076 077 /** 078 * {@inheritDoc} 079 */ 080 public void error( String repositoryWorkspaceName, 081 Throwable t ) { 082 getLogger().error(t, RepositoryI18n.errorProcessingEvents, repositoryWorkspaceName); 083 } 084 } 085 086 protected static class NoOpProblemLog implements ProblemLog { 087 088 /** 089 * {@inheritDoc} 090 */ 091 public void error( String repositoryWorkspaceName, 092 Throwable t ) { 093 } 094 } 095 096 public static final ProblemLog NO_OP_PROBLEM_LOG = new NoOpProblemLog(); 097 098 /** 099 * The administrative component for this service. 100 * 101 * @author Randall Hauch 102 */ 103 protected class Administrator extends AbstractServiceAdministrator { 104 105 protected Administrator() { 106 super(RepositoryI18n.observationServiceName, State.STARTED); 107 } 108 109 /** 110 * {@inheritDoc} 111 */ 112 @Override 113 protected void doShutdown( State fromState ) { 114 super.doShutdown(fromState); 115 shutdownService(); 116 } 117 118 /** 119 * {@inheritDoc} 120 */ 121 public boolean awaitTermination( long timeout, 122 TimeUnit unit ) { 123 return true; 124 } 125 126 /** 127 * {@inheritDoc} 128 */ 129 @Override 130 protected boolean doCheckIsTerminated() { 131 return true; 132 } 133 134 } 135 136 private Logger logger = Logger.getLogger(this.getClass()); 137 private ProblemLog problemLog = new DefaultProblemLog(); 138 private final Statistics statistics = new Statistics(); 139 private final SessionFactory sessionFactory; 140 private final CopyOnWriteArrayList<WorkspaceListener> workspaceListeners = new CopyOnWriteArrayList<WorkspaceListener>(); 141 private final CopyOnWriteArrayList<EventListener> eventListeners = new CopyOnWriteArrayList<EventListener>(); 142 private final CopyOnWriteArrayList<NodeChangeListener> nodeChangeListeners = new CopyOnWriteArrayList<NodeChangeListener>(); 143 private final Administrator administrator = new Administrator(); 144 145 public ObservationService( SessionFactory sessionFactory ) { 146 CheckArg.isNotNull(sessionFactory, "session factory"); 147 this.sessionFactory = sessionFactory; 148 } 149 150 /** 151 * {@inheritDoc} 152 */ 153 public ServiceAdministrator getAdministrator() { 154 return this.administrator; 155 } 156 157 /** 158 * @return sessionFactory 159 */ 160 public SessionFactory getSessionFactory() { 161 return this.sessionFactory; 162 } 163 164 /** 165 * Get the statistics for this system. 166 * 167 * @return the statistics, which are updated as the system is used 168 */ 169 public Statistics getStatistics() { 170 return this.statistics; 171 } 172 173 /** 174 * Get the logger for this system 175 * 176 * @return the logger 177 */ 178 public Logger getLogger() { 179 return this.logger; 180 } 181 182 /** 183 * Set the logger for this system. 184 * 185 * @param logger the logger, or null if the standard logging should be used 186 */ 187 public void setLogger( Logger logger ) { 188 this.logger = logger != null ? logger : Logger.getLogger(this.getClass()); 189 } 190 191 /** 192 * @return problemLog 193 */ 194 public ProblemLog getProblemLog() { 195 return this.problemLog; 196 } 197 198 /** 199 * Set the problem log that will be notified of problems handling events. By default, such problems are sent to the log. 200 * 201 * @param problemLog the new problem log implementation; if null, then the default problem log is used 202 */ 203 public void setProblemLog( ProblemLog problemLog ) { 204 this.problemLog = problemLog != null ? problemLog : new DefaultProblemLog(); 205 } 206 207 public boolean addListener( EventListener listener ) { 208 if (listener == null) return false; 209 return this.eventListeners.addIfAbsent(listener); 210 } 211 212 public boolean removeListener( EventListener listener ) { 213 if (listener == null) return false; 214 return this.eventListeners.remove(listener); 215 } 216 217 public boolean addListener( NodeChangeListener listener ) { 218 return this.nodeChangeListeners.addIfAbsent(listener); 219 } 220 221 public boolean removeListener( NodeChangeListener listener ) { 222 if (listener == null) return false; 223 return this.nodeChangeListeners.remove(listener); 224 } 225 226 protected void shutdownService() { 227 // Unregister all listeners ... 228 for (WorkspaceListener listener : this.workspaceListeners) { 229 try { 230 listener.unregister(); 231 } catch (RepositoryException e) { 232 this.logger.error(e, RepositoryI18n.errorUnregisteringWorkspaceListenerWhileShuttingDownObservationService); 233 } 234 } 235 } 236 237 /** 238 * Monitor the supplied workspace for events of the given type on any node at or under the supplied path. 239 * <p> 240 * Monitoring is accomplished by registering a listener on the workspace, so this monitoring only has access to the 241 * information that visible to the session created by the {@link #getSessionFactory() session factory} for the given 242 * repository and workspace name. 243 * </p> 244 * <p> 245 * The listener returned from this method is not managed by this SequencingService instance. If the listener is no longer 246 * needed, it simply must be {@link ObservationManager#removeEventListener(EventListener) removed} as a listener of the 247 * workspace and garbage collected. If this service is {@link ServiceAdministrator#shutdown() shutdown} while there are still 248 * active listeners, those listeners will disconnect themselves from this service and the workspace with which they're 249 * registered when they attempt to forward the next events. 250 * </p> 251 * <p> 252 * The set of events that are monitored can be filtered by specifying restrictions based on characteristics of the node 253 * associated with the event. In the case of event types {@link Event#NODE_ADDED NODE_ADDED} and 254 * {@link Event#NODE_REMOVED NODE_REMOVED}, the node associated with an event is the node at (or formerly at) the path 255 * returned by {@link Event#getPath() Event.getPath()}. In the case of event types 256 * {@link Event#PROPERTY_ADDED PROPERTY_ADDED}, {@link Event#PROPERTY_REMOVED PROPERTY_REMOVED} and 257 * {@link Event#PROPERTY_CHANGED PROPERTY_CHANGED}, the node associated with an event is the parent node of the property at 258 * (or formerly at) the path returned by <code>Event.getPath</code>: 259 * <ul> 260 * <li> <code>absolutePath</code>, <code>isDeep</code>: Only events whose associated node is at 261 * <code>absolutePath</code> (or within its subtree, if <code>isDeep</code> is <code>true</code>) will be received. It 262 * is permissible to register a listener for a path where no node currently exists. </li> 263 * <li> <code>uuids</code>: Only events whose associated node has one of the UUIDs in this list will be received. If his 264 * parameter is <code>null</code> then no UUID-related restriction is placed on events received. </li> 265 * <li> <code>nodeTypeNames</code>: Only events whose associated node has one of the node types (or a subtype of one of the 266 * node types) in this list will be received. If this parameter is <code>null</code> then no node type-related restriction 267 * is placed on events received. </li> 268 * </ul> 269 * The restrictions are "ANDed" together. In other words, for a particular node to be "listened to" it must meet all the 270 * restrictions. 271 * </p> 272 * <p> 273 * Additionally, if <code>noLocal</code> is <code>true</code>, then events generated by the session through which the 274 * listener was registered are ignored. Otherwise, they are not ignored. 275 * </p> 276 * <p> 277 * The filters of an already-registered {@link WorkspaceListener} can be changed at runtime by changing the attributes and 278 * {@link WorkspaceListener#reregister() registering}. 279 * </p> 280 * 281 * @param repositoryWorkspaceName the name to be used with the session factory to obtain a session to the repository and 282 * workspace that is to be monitored 283 * @param absolutePath the absolute path of the node at or below which changes are to be monitored; may be null if all nodes 284 * in the workspace are to be monitored 285 * @param eventTypes the bitmask of the {@link Event} types that are to be monitored 286 * @param isDeep true if events below the node given by the <code>absolutePath</code> or by the <code>uuids</code> are to 287 * be processed, or false if only the events at the node 288 * @param uuids array of UUIDs of nodes that are to be monitored; may be null or empty if the UUIDs are not known 289 * @param nodeTypeNames array of node type names that are to be monitored; may be null or empty if the monitoring has no node 290 * type restrictions 291 * @param noLocal true if the events originating in the supplied workspace are to be ignored, or false if they are also to be 292 * processed. 293 * @return the listener that was created and registered to perform the monitoring 294 * @throws RepositoryException if there is a problem registering the listener 295 */ 296 public WorkspaceListener monitor( String repositoryWorkspaceName, 297 String absolutePath, 298 int eventTypes, 299 boolean isDeep, 300 String[] uuids, 301 String[] nodeTypeNames, 302 boolean noLocal ) throws RepositoryException { 303 WorkspaceListener listener = new WorkspaceListener(repositoryWorkspaceName, eventTypes, absolutePath, isDeep, uuids, 304 nodeTypeNames, noLocal); 305 listener.register(); 306 this.workspaceListeners.add(listener); 307 return listener; 308 } 309 310 /** 311 * Monitor the supplied workspace for {@link WorkspaceListener#DEFAULT_EVENT_TYPES default event types} on any node at or 312 * under the supplied path. 313 * <p> 314 * Monitoring is accomplished by registering a listener on the workspace, so this monitoring only has access to the 315 * information that visible to the session created by the {@link #getSessionFactory() session factory} for the given 316 * repository and workspace name. 317 * </p> 318 * <p> 319 * The listener returned from this method is not managed by this SequencingService instance. If the listener is no longer 320 * needed, it simply must be {@link ObservationManager#removeEventListener(EventListener) removed} as a listener of the 321 * workspace and garbage collected. 322 * </p> 323 * 324 * @param repositoryWorkspaceName the name to be used with the session factory to obtain a session to the repository and 325 * workspace that is to be monitored 326 * @param absolutePath the absolute path of the node at or below which changes are to be monitored; may be null if all nodes 327 * in the workspace are to be monitored 328 * @param nodeTypeNames the names of the node types that are to be monitored; may be null or empty if the monitoring has no 329 * node type restrictions 330 * @return the listener that was created and registered to perform the monitoring 331 * @throws RepositoryException if there is a problem registering the listener 332 */ 333 public WorkspaceListener monitor( String repositoryWorkspaceName, 334 String absolutePath, 335 String... nodeTypeNames ) throws RepositoryException { 336 return monitor(repositoryWorkspaceName, 337 absolutePath, 338 WorkspaceListener.DEFAULT_EVENT_TYPES, 339 WorkspaceListener.DEFAULT_IS_DEEP, 340 null, 341 nodeTypeNames, 342 WorkspaceListener.DEFAULT_NO_LOCAL); 343 } 344 345 /** 346 * Monitor the supplied workspace for the supplied event types on any node in the workspace. 347 * <p> 348 * Monitoring is accomplished by registering a listener on the workspace, so this monitoring only has access to the 349 * information that visible to the session created by the {@link #getSessionFactory() session factory} for the given 350 * repository and workspace name. 351 * </p> 352 * <p> 353 * The listener returned from this method is not managed by this SequencingService instance. If the listener is no longer 354 * needed, it simply must be {@link ObservationManager#removeEventListener(EventListener) removed} as a listener of the 355 * workspace and garbage collected. 356 * </p> 357 * 358 * @param repositoryWorkspaceName the name to be used with the session factory to obtain a session to the repository and 359 * workspace that is to be monitored 360 * @param eventTypes the bitmask of the {@link Event} types that are to be monitored 361 * @param nodeTypeNames the names of the node types that are to be monitored; may be null or empty if the monitoring has no 362 * node type restrictions 363 * @return the listener that was created and registered to perform the monitoring 364 * @throws RepositoryException if there is a problem registering the listener 365 */ 366 public WorkspaceListener monitor( String repositoryWorkspaceName, 367 int eventTypes, 368 String... nodeTypeNames ) throws RepositoryException { 369 return monitor(repositoryWorkspaceName, 370 WorkspaceListener.DEFAULT_ABSOLUTE_PATH, 371 eventTypes, 372 WorkspaceListener.DEFAULT_IS_DEEP, 373 null, 374 nodeTypeNames, 375 WorkspaceListener.DEFAULT_NO_LOCAL); 376 } 377 378 protected void unregisterListener( WorkspaceListener listener ) { 379 if (listener != null) this.workspaceListeners.remove(listener); 380 } 381 382 /** 383 * From section 2.8.8 of the JSR-170 specification: 384 * <p> 385 * On each persistent change, those listeners that are entitled to receive one or more events will have their onEvent method 386 * called and be passed an EventIterator. The EventIterator will contain the event bundle reflecting the persistent changes 387 * made but excluding those to which that particular listener is not entitled, according to the listeners access permissions 388 * and filters. 389 * </p> 390 * 391 * @param eventIterator 392 * @param listener 393 */ 394 protected void processEvents( EventIterator eventIterator, 395 WorkspaceListener listener ) { 396 if (eventIterator == null) return; 397 List<Event> events = new ArrayList<Event>(); 398 // Copy the events ... 399 while (eventIterator.hasNext()) { 400 events.add((Event)eventIterator.next()); 401 } 402 if (!getAdministrator().isStarted()) { 403 this.statistics.recordIgnoredEventSet(events.size()); 404 return; 405 } 406 407 // Notify the event listeners ... 408 boolean notifiedSomebody = false; 409 List<EventListener> eventListeners = this.eventListeners; // use one consistent snapshot 410 if (!eventListeners.isEmpty()) { 411 DelegatingEventIterator eventIter = new DelegatingEventIterator(events.iterator(), events.size()); 412 for (EventListener eventListener : eventListeners) { 413 eventListener.onEvent(eventIter); 414 } 415 notifiedSomebody = true; 416 } 417 418 // Now create the node change events ... 419 List<NodeChangeListener> nodeChangeListeners = this.nodeChangeListeners; // use one consistent snapshot 420 if (!nodeChangeListeners.isEmpty()) { 421 final String repositoryWorkspaceName = listener.getRepositoryWorkspaceName(); 422 try { 423 NodeChanges nodeChanges = NodeChanges.create(repositoryWorkspaceName, events); 424 425 // And notify the node change listeners ... 426 int nodeChangeCount = nodeChanges.size(); 427 this.statistics.recordNodesChanged(nodeChangeCount); 428 for (NodeChangeListener nodeChangeListener : nodeChangeListeners) { 429 nodeChangeListener.onNodeChanges(nodeChanges); 430 } 431 } catch (Throwable t) { 432 getProblemLog().error(repositoryWorkspaceName, t); 433 } 434 notifiedSomebody = true; 435 } 436 437 if (notifiedSomebody) { 438 this.statistics.recordEventSet(events.size()); 439 } else { 440 this.statistics.recordIgnoredEventSet(events.size()); 441 } 442 } 443 444 protected class DelegatingEventIterator implements EventIterator { 445 446 private final Iterator<Event> events; 447 private final int size; 448 private int position = 0; 449 450 protected DelegatingEventIterator( Iterator<Event> events, 451 int size ) { 452 this.events = events; 453 this.size = size; 454 } 455 456 /** 457 * {@inheritDoc} 458 */ 459 public Event nextEvent() { 460 ++position; 461 return events.next(); 462 } 463 464 /** 465 * {@inheritDoc} 466 */ 467 public long getPosition() { 468 return position; 469 } 470 471 /** 472 * {@inheritDoc} 473 */ 474 public long getSize() { 475 return size; 476 } 477 478 /** 479 * {@inheritDoc} 480 */ 481 public void skip( long skipNum ) { 482 for (int i = 0; i != skipNum; ++i) { 483 next(); 484 } 485 } 486 487 /** 488 * {@inheritDoc} 489 */ 490 public boolean hasNext() { 491 return events.hasNext(); 492 } 493 494 /** 495 * {@inheritDoc} 496 */ 497 public Object next() { 498 return events.next(); 499 } 500 501 /** 502 * {@inheritDoc} 503 */ 504 public void remove() { 505 // does nothing 506 } 507 508 } 509 510 /** 511 * Implementation of the {@link EventListener JCR EventListener} interface, returned by the sequencing system. 512 * 513 * @author Randall Hauch 514 */ 515 @ThreadSafe 516 public class WorkspaceListener implements EventListener { 517 518 public static final boolean DEFAULT_IS_DEEP = true; 519 public static final boolean DEFAULT_NO_LOCAL = false; 520 public static final int DEFAULT_EVENT_TYPES = Event.NODE_ADDED | /* Event.NODE_REMOVED| */Event.PROPERTY_ADDED 521 | Event.PROPERTY_CHANGED /* |Event.PROPERTY_REMOVED */; 522 public static final String DEFAULT_ABSOLUTE_PATH = "/"; 523 524 private final String repositoryWorkspaceName; 525 private final Set<String> uuids; 526 private final Set<String> nodeTypeNames; 527 private final int eventTypes; 528 private final String absolutePath; 529 private final boolean deep; 530 private final boolean noLocal; 531 @GuardedBy( "this" ) 532 private transient Session session; 533 534 protected WorkspaceListener( String repositoryWorkspaceName, 535 int eventTypes, 536 String absPath, 537 boolean isDeep, 538 String[] uuids, 539 String[] nodeTypeNames, 540 boolean noLocal ) { 541 this.repositoryWorkspaceName = repositoryWorkspaceName; 542 this.eventTypes = eventTypes; 543 this.deep = isDeep; 544 this.noLocal = noLocal; 545 this.absolutePath = absPath != null && absPath.trim().length() != 0 ? absPath.trim() : null; 546 // Set the UUIDs ... 547 Set<String> newUuids = new HashSet<String>(); 548 if (uuids != null) { 549 for (String uuid : uuids) { 550 if (uuid != null && uuid.trim().length() != 0) newUuids.add(uuid.trim()); 551 } 552 } 553 this.uuids = Collections.unmodifiableSet(newUuids); 554 // Set the node type names 555 Set<String> newNodeTypeNames = new HashSet<String>(); 556 if (nodeTypeNames != null) { 557 for (String nodeTypeName : nodeTypeNames) { 558 if (nodeTypeName != null && nodeTypeName.trim().length() != 0) newNodeTypeNames.add(nodeTypeName.trim()); 559 } 560 } 561 this.nodeTypeNames = Collections.unmodifiableSet(newNodeTypeNames); 562 } 563 564 /** 565 * @return repositoryWorkspaceName 566 */ 567 public String getRepositoryWorkspaceName() { 568 return this.repositoryWorkspaceName; 569 } 570 571 /** 572 * @return eventTypes 573 */ 574 public int getEventTypes() { 575 return this.eventTypes; 576 } 577 578 /** 579 * @return absolutePath 580 */ 581 public String getAbsolutePath() { 582 return this.absolutePath; 583 } 584 585 /** 586 * @return deep 587 */ 588 public boolean isDeep() { 589 return this.deep; 590 } 591 592 /** 593 * @return noLocal 594 */ 595 public boolean isNoLocal() { 596 return this.noLocal; 597 } 598 599 /** 600 * @return uuids 601 */ 602 public Set<String> getUuids() { 603 return this.uuids; 604 } 605 606 /** 607 * @return nodeTypeNames 608 */ 609 public Set<String> getNodeTypeNames() { 610 return this.nodeTypeNames; 611 } 612 613 public synchronized boolean isRegistered() { 614 if (this.session != null && getAdministrator().isShutdown()) { 615 // This sequencing system has been shutdown, so unregister this listener 616 try { 617 unregister(); 618 } catch (RepositoryException re) { 619 String msg = "Error unregistering workspace listener after sequencing system has been shutdow."; 620 Logger.getLogger(this.getClass()).debug(re, msg); 621 } 622 } 623 return this.session != null; 624 } 625 626 public synchronized WorkspaceListener register() throws UnsupportedRepositoryOperationException, RepositoryException { 627 if (this.session != null) return this; 628 this.session = ObservationService.this.getSessionFactory().createSession(this.repositoryWorkspaceName); 629 String[] uuids = this.uuids.isEmpty() ? null : this.uuids.toArray(new String[this.uuids.size()]); 630 String[] nodeTypeNames = this.nodeTypeNames.isEmpty() ? null : this.nodeTypeNames.toArray(new String[this.nodeTypeNames.size()]); 631 this.session.getWorkspace().getObservationManager().addEventListener(this, 632 eventTypes, 633 absolutePath, 634 deep, 635 uuids, 636 nodeTypeNames, 637 noLocal); 638 return this; 639 } 640 641 public synchronized WorkspaceListener unregister() throws UnsupportedRepositoryOperationException, RepositoryException { 642 if (this.session == null) return this; 643 try { 644 if (this.session.isLive()) { 645 this.session.getWorkspace().getObservationManager().removeEventListener(this); 646 this.session.logout(); 647 } 648 } finally { 649 this.session = null; 650 unregisterListener(this); 651 } 652 return this; 653 } 654 655 public synchronized WorkspaceListener reregister() throws UnsupportedRepositoryOperationException, RepositoryException { 656 unregister(); 657 register(); 658 return this; 659 } 660 661 /** 662 * {@inheritDoc} 663 */ 664 public void onEvent( EventIterator events ) { 665 if (events != null) { 666 if (getAdministrator().isShutdown()) { 667 // This sequencing system has been shutdown, so unregister this listener 668 try { 669 unregister(); 670 } catch (RepositoryException re) { 671 String msg = "Error unregistering workspace listener after sequencing system has been shutdow."; 672 Logger.getLogger(this.getClass()).debug(re, msg); 673 } 674 } else { 675 ObservationService.this.processEvents(events, this); 676 } 677 } 678 } 679 } 680 681 /** 682 * The statistics for the system. Each sequencing system has an instance of this class that is updated. 683 * 684 * @author Randall Hauch 685 */ 686 @ThreadSafe 687 public class Statistics { 688 689 @GuardedBy( "lock" ) 690 private long numberOfEventsIgnored; 691 @GuardedBy( "lock" ) 692 private long numberOfEventsEnqueued; 693 @GuardedBy( "lock" ) 694 private long numberOfEventSetsIgnored; 695 @GuardedBy( "lock" ) 696 private long numberOfEventSetsEnqueued; 697 private final AtomicLong numberOfNodeChangesEnqueued = new AtomicLong(0); 698 private final ReadWriteLock lock = new ReentrantReadWriteLock(); 699 private final AtomicLong startTime; 700 701 protected Statistics() { 702 startTime = new AtomicLong(System.currentTimeMillis()); 703 } 704 705 public Statistics reset() { 706 try { 707 lock.writeLock().lock(); 708 this.startTime.set(System.currentTimeMillis()); 709 this.numberOfEventsIgnored = 0; 710 this.numberOfEventsEnqueued = 0; 711 this.numberOfEventSetsIgnored = 0; 712 this.numberOfEventSetsEnqueued = 0; 713 this.numberOfNodeChangesEnqueued.set(0); 714 } finally { 715 lock.writeLock().unlock(); 716 } 717 return this; 718 } 719 720 /** 721 * @return the system time when the statistics were started 722 */ 723 public long getStartTime() { 724 return this.startTime.get(); 725 } 726 727 /** 728 * @return the number of node changes that were processed 729 */ 730 public long getNumberOfNodeChangesEnqueued() { 731 return this.numberOfNodeChangesEnqueued.get(); 732 } 733 734 /** 735 * @return the number of events that were ignored because the system was not running 736 */ 737 public long getNumberOfEventsIgnored() { 738 try { 739 lock.readLock().lock(); 740 return this.numberOfEventsIgnored; 741 } finally { 742 lock.readLock().unlock(); 743 } 744 } 745 746 /** 747 * @return the number of events that were enqueued for processing 748 */ 749 public long getNumberOfEventsEnqueued() { 750 try { 751 lock.readLock().lock(); 752 return this.numberOfEventsEnqueued; 753 } finally { 754 lock.readLock().unlock(); 755 } 756 } 757 758 /** 759 * @return the number of event sets (transactions) that were enqueued for processing 760 */ 761 public long getNumberOfEventSetsEnqueued() { 762 try { 763 lock.readLock().lock(); 764 return this.numberOfEventSetsEnqueued; 765 } finally { 766 lock.readLock().unlock(); 767 } 768 } 769 770 /** 771 * @return the number of event sets (transactions) that were ignored because the system was not running 772 */ 773 public long getNumberOfEventSetsIgnored() { 774 try { 775 lock.readLock().lock(); 776 return this.numberOfEventSetsIgnored; 777 } finally { 778 lock.readLock().unlock(); 779 } 780 } 781 782 protected void recordNodesChanged( long changeCount ) { 783 this.numberOfNodeChangesEnqueued.addAndGet(changeCount); 784 } 785 786 protected void recordEventSet( long eventsInSet ) { 787 try { 788 lock.writeLock().lock(); 789 this.numberOfEventsEnqueued += eventsInSet; 790 ++this.numberOfEventSetsEnqueued; 791 } finally { 792 lock.writeLock().unlock(); 793 } 794 } 795 796 protected void recordIgnoredEventSet( long eventsInSet ) { 797 try { 798 lock.writeLock().lock(); 799 this.numberOfEventsIgnored += eventsInSet; 800 this.numberOfEventSetsIgnored += 1; 801 ++this.numberOfEventSetsEnqueued; 802 } finally { 803 lock.writeLock().unlock(); 804 } 805 } 806 } 807 }