001 /*
002 * JBoss DNA (http://www.jboss.org/dna)
003 * See the COPYRIGHT.txt file distributed with this work for information
004 * regarding copyright ownership. Some portions may be licensed
005 * to Red Hat, Inc. under one or more contributor license agreements.
006 * See the AUTHORS.txt file in the distribution for a full listing of
007 * individual contributors.
008 *
009 * JBoss DNA is free software. Unless otherwise indicated, all code in JBoss DNA
010 * is licensed to you under the terms of the GNU Lesser General Public License as
011 * published by the Free Software Foundation; either version 2.1 of
012 * the License, or (at your option) any later version.
013 *
014 * JBoss DNA is distributed in the hope that it will be useful,
015 * but WITHOUT ANY WARRANTY; without even the implied warranty of
016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
017 * Lesser General Public License for more details.
018 *
019 * You should have received a copy of the GNU Lesser General Public
020 * License along with this software; if not, write to the Free
021 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
022 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
023 */
024 package org.jboss.dna.repository.sequencer;
025
026 import java.util.ArrayList;
027 import java.util.HashMap;
028 import java.util.HashSet;
029 import java.util.List;
030 import java.util.Map;
031 import java.util.Set;
032 import java.util.concurrent.ExecutorService;
033 import java.util.concurrent.Executors;
034 import java.util.concurrent.RejectedExecutionException;
035 import java.util.concurrent.TimeUnit;
036 import java.util.concurrent.atomic.AtomicLong;
037 import javax.jcr.Node;
038 import javax.jcr.Repository;
039 import javax.jcr.RepositoryException;
040 import javax.jcr.Session;
041 import javax.jcr.observation.Event;
042 import net.jcip.annotations.Immutable;
043 import net.jcip.annotations.ThreadSafe;
044 import org.jboss.dna.common.collection.SimpleProblems;
045 import org.jboss.dna.common.component.ClassLoaderFactory;
046 import org.jboss.dna.common.component.ComponentLibrary;
047 import org.jboss.dna.common.component.StandardClassLoaderFactory;
048 import org.jboss.dna.common.util.CheckArg;
049 import org.jboss.dna.common.util.HashCode;
050 import org.jboss.dna.common.util.Logger;
051 import org.jboss.dna.repository.RepositoryI18n;
052 import org.jboss.dna.repository.observation.NodeChange;
053 import org.jboss.dna.repository.observation.NodeChangeListener;
054 import org.jboss.dna.repository.observation.NodeChanges;
055 import org.jboss.dna.repository.service.AbstractServiceAdministrator;
056 import org.jboss.dna.repository.service.AdministeredService;
057 import org.jboss.dna.repository.service.ServiceAdministrator;
058 import org.jboss.dna.repository.util.JcrExecutionContext;
059 import org.jboss.dna.repository.util.RepositoryNodePath;
060
061 /**
062 * A sequencing system is used to monitor changes in the content of {@link Repository JCR repositories} and to sequence the
063 * content to extract or to generate structured information.
064 *
065 * @author Randall Hauch
066 * @author John Verhaeg
067 */
068 public class SequencingService implements AdministeredService, NodeChangeListener {
069
070 /**
071 * Interface used to select the set of {@link Sequencer} instances that should be run.
072 *
073 * @author Randall Hauch
074 */
075 public static interface Selector {
076
077 /**
078 * Select the sequencers that should be used to sequence the supplied node.
079 *
080 * @param sequencers the list of all sequencers available at the moment; never null
081 * @param node the node to be sequenced; never null
082 * @param nodeChange the set of node changes; never null
083 * @return the list of sequencers that should be used; may not be null
084 */
085 List<Sequencer> selectSequencers( List<Sequencer> sequencers,
086 Node node,
087 NodeChange nodeChange );
088 }
089
090 /**
091 * The default {@link Selector} implementation that selects every sequencer every time it's called, regardless of the node (or
092 * logger) supplied.
093 *
094 * @author Randall Hauch
095 */
096 protected static class DefaultSelector implements Selector {
097
098 public List<Sequencer> selectSequencers( List<Sequencer> sequencers,
099 Node node,
100 NodeChange nodeChange ) {
101 return sequencers;
102 }
103 }
104
105 /**
106 * Interface used to determine whether a {@link NodeChange} should be processed.
107 *
108 * @author Randall Hauch
109 */
110 public static interface NodeFilter {
111
112 /**
113 * Determine whether the node represented by the supplied change should be submitted for sequencing.
114 *
115 * @param nodeChange the node change event
116 * @return true if the node should be submitted for sequencing, or false if the change should be ignored
117 */
118 boolean accept( NodeChange nodeChange );
119 }
120
121 /**
122 * The default filter implementation, which accepts only new nodes or nodes that have new or changed properties.
123 *
124 * @author Randall Hauch
125 */
126 protected static class DefaultNodeFilter implements NodeFilter {
127
128 public boolean accept( NodeChange nodeChange ) {
129 // Only care about new nodes or nodes that have new/changed properies ...
130 return nodeChange.includesEventTypes(Event.NODE_ADDED, Event.PROPERTY_ADDED, Event.PROPERTY_CHANGED);
131 }
132 }
133
134 /**
135 * The default {@link Selector} that considers every {@link Sequencer} to be used for every node.
136 *
137 * @see SequencingService#setSequencerSelector(org.jboss.dna.repository.sequencer.SequencingService.Selector)
138 */
139 public static final Selector DEFAULT_SEQUENCER_SELECTOR = new DefaultSelector();
140 /**
141 * The default {@link NodeFilter} that accepts new nodes or nodes that have new/changed properties.
142 *
143 * @see SequencingService#setSequencerSelector(org.jboss.dna.repository.sequencer.SequencingService.Selector)
144 */
145 public static final NodeFilter DEFAULT_NODE_FILTER = new DefaultNodeFilter();
146
147 /**
148 * Class loader factory instance that always returns the {@link Thread#getContextClassLoader() current thread's context class
149 * loader} (if not null) or component library's class loader.
150 */
151 protected static final ClassLoaderFactory DEFAULT_CLASSLOADER_FACTORY = new StandardClassLoaderFactory(
152 SequencingService.class.getClassLoader());
153
154 /**
155 * The administrative component for this service.
156 *
157 * @author Randall Hauch
158 */
159 protected class Administrator extends AbstractServiceAdministrator {
160
161 protected Administrator() {
162 super(RepositoryI18n.sequencingServiceName, State.PAUSED);
163 }
164
165 /**
166 * {@inheritDoc}
167 */
168 @Override
169 protected void doStart( State fromState ) {
170 super.doStart(fromState);
171 startService();
172 }
173
174 /**
175 * {@inheritDoc}
176 */
177 @Override
178 protected void doShutdown( State fromState ) {
179 super.doShutdown(fromState);
180 shutdownService();
181 }
182
183 /**
184 * {@inheritDoc}
185 */
186 @Override
187 protected boolean doCheckIsTerminated() {
188 return isServiceTerminated();
189 }
190
191 /**
192 * {@inheritDoc}
193 */
194 public boolean awaitTermination( long timeout,
195 TimeUnit unit ) throws InterruptedException {
196 return doAwaitTermination(timeout, unit);
197 }
198
199 }
200
201 private JcrExecutionContext executionContext;
202 private SequencerLibrary sequencerLibrary = new SequencerLibrary();
203 private Selector sequencerSelector = DEFAULT_SEQUENCER_SELECTOR;
204 private NodeFilter nodeFilter = DEFAULT_NODE_FILTER;
205 private ExecutorService executorService;
206 private final Statistics statistics = new Statistics();
207 private final Administrator administrator = new Administrator();
208
209 /**
210 * Create a new sequencing system, configured with no sequencers and not monitoring any workspaces. Upon construction, the
211 * system is {@link ServiceAdministrator#isPaused() paused} and must be configured and then
212 * {@link ServiceAdministrator#start() started}.
213 */
214 public SequencingService() {
215 this.sequencerLibrary.setClassLoaderFactory(DEFAULT_CLASSLOADER_FACTORY);
216 }
217
218 /**
219 * Return the administrative component for this service.
220 *
221 * @return the administrative component; never null
222 */
223 public ServiceAdministrator getAdministrator() {
224 return this.administrator;
225 }
226
227 /**
228 * Get the statistics for this system.
229 *
230 * @return statistics
231 */
232 public Statistics getStatistics() {
233 return this.statistics;
234 }
235
236 /**
237 * @return sequencerLibrary
238 */
239 protected ComponentLibrary<Sequencer, SequencerConfig> getSequencerLibrary() {
240 return this.sequencerLibrary;
241 }
242
243 /**
244 * Add the configuration for a sequencer, or update any existing one that represents the
245 * {@link SequencerConfig#equals(Object) same configuration}
246 *
247 * @param config the new configuration
248 * @return true if the sequencer was added, or false if there already was an existing and
249 * {@link SequencerConfig#hasChanged(SequencerConfig) unchanged} sequencer configuration
250 * @throws IllegalArgumentException if <code>config</code> is null
251 * @see #updateSequencer(SequencerConfig)
252 * @see #removeSequencer(SequencerConfig)
253 */
254 public boolean addSequencer( SequencerConfig config ) {
255 return this.sequencerLibrary.add(config);
256 }
257
258 /**
259 * Update the configuration for a sequencer, or add it if there is no {@link SequencerConfig#equals(Object) matching
260 * configuration}.
261 *
262 * @param config the updated (or new) configuration
263 * @return true if the sequencer was updated, or false if there already was an existing and
264 * {@link SequencerConfig#hasChanged(SequencerConfig) unchanged} sequencer configuration
265 * @throws IllegalArgumentException if <code>config</code> is null
266 * @see #addSequencer(SequencerConfig)
267 * @see #removeSequencer(SequencerConfig)
268 */
269 public boolean updateSequencer( SequencerConfig config ) {
270 return this.sequencerLibrary.update(config);
271 }
272
273 /**
274 * Remove the configuration for a sequencer.
275 *
276 * @param config the configuration to be removed
277 * @return true if the sequencer was removed, or false if there was no existing sequencer
278 * @throws IllegalArgumentException if <code>config</code> is null
279 * @see #addSequencer(SequencerConfig)
280 * @see #updateSequencer(SequencerConfig)
281 */
282 public boolean removeSequencer( SequencerConfig config ) {
283 return this.sequencerLibrary.remove(config);
284 }
285
286 /**
287 * @return executionContext
288 */
289 public JcrExecutionContext getExecutionContext() {
290 return this.executionContext;
291 }
292
293 /**
294 * @param executionContext Sets executionContext to the specified value.
295 */
296 public void setExecutionContext( JcrExecutionContext executionContext ) {
297 CheckArg.isNotNull(executionContext, "execution context");
298 if (this.getAdministrator().isStarted()) {
299 throw new IllegalStateException(RepositoryI18n.unableToChangeExecutionContextWhileRunning.text());
300 }
301 this.executionContext = executionContext;
302 this.sequencerLibrary.setClassLoaderFactory(executionContext);
303 }
304
305 /**
306 * Get the executor service used to run the sequencers.
307 *
308 * @return the executor service
309 * @see #setExecutorService(ExecutorService)
310 */
311 public ExecutorService getExecutorService() {
312 return this.executorService;
313 }
314
315 /**
316 * Set the executor service that should be used by this system. By default, the system is set up with a
317 * {@link Executors#newSingleThreadExecutor() executor that uses a single thread}.
318 *
319 * @param executorService the executor service
320 * @see #getExecutorService()
321 * @see Executors#newCachedThreadPool()
322 * @see Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory)
323 * @see Executors#newFixedThreadPool(int)
324 * @see Executors#newFixedThreadPool(int, java.util.concurrent.ThreadFactory)
325 * @see Executors#newScheduledThreadPool(int)
326 * @see Executors#newScheduledThreadPool(int, java.util.concurrent.ThreadFactory)
327 * @see Executors#newSingleThreadExecutor()
328 * @see Executors#newSingleThreadExecutor(java.util.concurrent.ThreadFactory)
329 * @see Executors#newSingleThreadScheduledExecutor()
330 * @see Executors#newSingleThreadScheduledExecutor(java.util.concurrent.ThreadFactory)
331 */
332 public void setExecutorService( ExecutorService executorService ) {
333 CheckArg.isNotNull(executorService, "executor service");
334 if (this.getAdministrator().isStarted()) {
335 throw new IllegalStateException(RepositoryI18n.unableToChangeExecutionContextWhileRunning.text());
336 }
337 this.executorService = executorService;
338 }
339
340 /**
341 * Override this method to creates a different kind of default executor service. This method is called when the system is
342 * {@link #startService() started} without an executor service being {@link #setExecutorService(ExecutorService) set}.
343 * <p>
344 * This method creates a {@link Executors#newSingleThreadExecutor() single-threaded executor}.
345 * </p>
346 *
347 * @return the executor service
348 */
349 protected ExecutorService createDefaultExecutorService() {
350 return Executors.newSingleThreadExecutor();
351 }
352
353 protected void startService() {
354 if (this.getExecutionContext() == null) {
355 throw new IllegalStateException(RepositoryI18n.unableToStartSequencingServiceWithoutExecutionContext.text());
356 }
357 if (this.executorService == null) {
358 this.executorService = createDefaultExecutorService();
359 }
360 assert this.executorService != null;
361 assert this.sequencerSelector != null;
362 assert this.nodeFilter != null;
363 assert this.sequencerLibrary != null;
364 }
365
366 protected void shutdownService() {
367 if (this.executorService != null) {
368 this.executorService.shutdown();
369 }
370 }
371
372 protected boolean isServiceTerminated() {
373 if (this.executorService != null) {
374 return this.executorService.isTerminated();
375 }
376 return true;
377 }
378
379 protected boolean doAwaitTermination( long timeout,
380 TimeUnit unit ) throws InterruptedException {
381 if (this.executorService == null || this.executorService.isTerminated()) return true;
382 return this.executorService.awaitTermination(timeout, unit);
383 }
384
385 /**
386 * Get the sequencing selector used by this system.
387 *
388 * @return the sequencing selector
389 */
390 public Selector getSequencerSelector() {
391 return this.sequencerSelector;
392 }
393
394 /**
395 * Set the sequencer selector, or null if the {@link #DEFAULT_SEQUENCER_SELECTOR default sequencer selector} should be used.
396 *
397 * @param sequencerSelector the selector
398 */
399 public void setSequencerSelector( Selector sequencerSelector ) {
400 this.sequencerSelector = sequencerSelector != null ? sequencerSelector : DEFAULT_SEQUENCER_SELECTOR;
401 }
402
403 /**
404 * Get the node filter used by this system.
405 *
406 * @return the node filter
407 */
408 public NodeFilter getNodeFilter() {
409 return this.nodeFilter;
410 }
411
412 /**
413 * Set the filter that checks which nodes are to be sequenced, or null if the {@link #DEFAULT_NODE_FILTER default node filter}
414 * should be used.
415 *
416 * @param nodeFilter the new node filter
417 */
418 public void setNodeFilter( NodeFilter nodeFilter ) {
419 this.nodeFilter = nodeFilter != null ? nodeFilter : DEFAULT_NODE_FILTER;
420 }
421
422 /**
423 * {@inheritDoc}
424 */
425 public void onNodeChanges( NodeChanges changes ) {
426 NodeFilter filter = this.getNodeFilter();
427 for (final NodeChange changedNode : changes) {
428 // Only care about new nodes or nodes that have new/changed properies ...
429 if (filter.accept(changedNode)) {
430 try {
431 this.executorService.execute(new Runnable() {
432
433 public void run() {
434 processChangedNode(changedNode);
435 }
436 });
437 } catch (RejectedExecutionException e) {
438 // The executor service has been shut down, so do nothing with this set of changes
439 }
440 }
441 }
442 }
443
444 /**
445 * Do the work of processing by sequencing the node. This method is called by the {@link #executorService executor service}
446 * when it performs it's work on the enqueued {@link NodeChange NodeChange runnable objects}.
447 *
448 * @param changedNode the node to be processed.
449 */
450 protected void processChangedNode( NodeChange changedNode ) {
451 final JcrExecutionContext context = this.getExecutionContext();
452 final Logger logger = context.getLogger(getClass());
453 assert logger != null;
454 try {
455 final String repositoryWorkspaceName = changedNode.getRepositoryWorkspaceName();
456 Session session = null;
457 try {
458 // Figure out which sequencers accept this path,
459 // and track which output nodes should be passed to each sequencer...
460 final String nodePath = changedNode.getAbsolutePath();
461 Map<SequencerCall, Set<RepositoryNodePath>> sequencerCalls = new HashMap<SequencerCall, Set<RepositoryNodePath>>();
462 List<Sequencer> allSequencers = this.sequencerLibrary.getInstances();
463 List<Sequencer> sequencers = new ArrayList<Sequencer>(allSequencers.size());
464 for (Sequencer sequencer : allSequencers) {
465 final SequencerConfig config = sequencer.getConfiguration();
466 for (SequencerPathExpression pathExpression : config.getPathExpressions()) {
467 for (String propertyName : changedNode.getModifiedProperties()) {
468 String path = nodePath + "/@" + propertyName;
469 SequencerPathExpression.Matcher matcher = pathExpression.matcher(path);
470 if (matcher.matches()) {
471 // String selectedPath = matcher.getSelectedPath();
472 RepositoryNodePath outputPath = RepositoryNodePath.parse(matcher.getOutputPath(),
473 repositoryWorkspaceName);
474 SequencerCall call = new SequencerCall(sequencer, propertyName);
475 // Record the output path ...
476 Set<RepositoryNodePath> outputPaths = sequencerCalls.get(call);
477 if (outputPaths == null) {
478 outputPaths = new HashSet<RepositoryNodePath>();
479 sequencerCalls.put(call, outputPaths);
480 }
481 outputPaths.add(outputPath);
482 sequencers.add(sequencer);
483 break;
484 }
485 }
486 }
487 }
488
489 Node node = null;
490 if (!sequencers.isEmpty()) {
491 // Create a session that we'll use for all sequencing ...
492 session = context.getSessionFactory().createSession(repositoryWorkspaceName);
493
494 // Find the changed node ...
495 String relPath = changedNode.getAbsolutePath().replaceAll("^/+", "");
496 node = session.getRootNode().getNode(relPath);
497
498 // Figure out which sequencers should run ...
499 sequencers = this.sequencerSelector.selectSequencers(sequencers, node, changedNode);
500 }
501 if (sequencers.isEmpty()) {
502 this.statistics.recordNodeSkipped();
503 if (logger.isDebugEnabled()) {
504 logger.trace("Skipping '{0}': no sequencers matched this condition", changedNode);
505 }
506 } else {
507 // Run each of those sequencers ...
508 for (Map.Entry<SequencerCall, Set<RepositoryNodePath>> entry : sequencerCalls.entrySet()) {
509 final SequencerCall sequencerCall = entry.getKey();
510 final Set<RepositoryNodePath> outputPaths = entry.getValue();
511 final Sequencer sequencer = sequencerCall.getSequencer();
512 final String sequencerName = sequencer.getConfiguration().getName();
513 final String propertyName = sequencerCall.getSequencedPropertyName();
514
515 // Get the paths to the nodes where the sequencer should write it's output ...
516 assert outputPaths != null && outputPaths.size() != 0;
517
518 // Create a new execution context for each sequencer
519 final SimpleProblems problems = new SimpleProblems();
520 JcrExecutionContext sequencerContext = context.clone();
521 try {
522 sequencer.execute(node, propertyName, changedNode, outputPaths, sequencerContext, problems);
523 } catch (RepositoryException e) {
524 logger.error(e, RepositoryI18n.errorInRepositoryWhileSequencingNode, sequencerName, changedNode);
525 } catch (SequencerException e) {
526 logger.error(e, RepositoryI18n.errorWhileSequencingNode, sequencerName, changedNode);
527 } finally {
528 try {
529 // Save the changes made by each sequencer ...
530 if (session != null) session.save();
531 } finally {
532 // And always close the context.
533 // This closes all sessions that may have been created by the sequencer.
534 sequencerContext.close();
535 }
536 }
537 }
538 this.statistics.recordNodeSequenced();
539 }
540 } finally {
541 if (session != null) session.logout();
542 }
543 } catch (RepositoryException e) {
544 logger.error(e, RepositoryI18n.errorInRepositoryWhileFindingSequencersToRunAgainstNode, changedNode);
545 } catch (Throwable e) {
546 logger.error(e, RepositoryI18n.errorFindingSequencersToRunAgainstNode, changedNode);
547 }
548 }
549
550 /**
551 * The statistics for the system. Each sequencing system has an instance of this class that is updated.
552 *
553 * @author Randall Hauch
554 */
555 @ThreadSafe
556 public class Statistics {
557
558 private final AtomicLong numberOfNodesSequenced = new AtomicLong(0);
559 private final AtomicLong numberOfNodesSkipped = new AtomicLong(0);
560 private final AtomicLong startTime;
561
562 protected Statistics() {
563 startTime = new AtomicLong(System.currentTimeMillis());
564 }
565
566 public Statistics reset() {
567 this.startTime.set(System.currentTimeMillis());
568 this.numberOfNodesSequenced.set(0);
569 this.numberOfNodesSkipped.set(0);
570 return this;
571 }
572
573 /**
574 * @return the system time when the statistics were started
575 */
576 public long getStartTime() {
577 return this.startTime.get();
578 }
579
580 /**
581 * @return the number of nodes that were sequenced
582 */
583 public long getNumberOfNodesSequenced() {
584 return this.numberOfNodesSequenced.get();
585 }
586
587 /**
588 * @return the number of nodes that were skipped because no sequencers applied
589 */
590 public long getNumberOfNodesSkipped() {
591 return this.numberOfNodesSkipped.get();
592 }
593
594 protected void recordNodeSequenced() {
595 this.numberOfNodesSequenced.incrementAndGet();
596 }
597
598 protected void recordNodeSkipped() {
599 this.numberOfNodesSkipped.incrementAndGet();
600 }
601 }
602
603 @Immutable
604 protected class SequencerCall {
605
606 private final Sequencer sequencer;
607 private final String sequencerName;
608 private final String sequencedPropertyName;
609 private final int hc;
610
611 protected SequencerCall( Sequencer sequencer,
612 String sequencedPropertyName ) {
613 this.sequencer = sequencer;
614 this.sequencerName = sequencer.getConfiguration().getName();
615 this.sequencedPropertyName = sequencedPropertyName;
616 this.hc = HashCode.compute(this.sequencerName, this.sequencedPropertyName);
617 }
618
619 /**
620 * @return sequencer
621 */
622 public Sequencer getSequencer() {
623 return this.sequencer;
624 }
625
626 /**
627 * @return sequencedPropertyName
628 */
629 public String getSequencedPropertyName() {
630 return this.sequencedPropertyName;
631 }
632
633 /**
634 * {@inheritDoc}
635 */
636 @Override
637 public int hashCode() {
638 return this.hc;
639 }
640
641 /**
642 * {@inheritDoc}
643 */
644 @Override
645 public boolean equals( Object obj ) {
646 if (obj == this) return true;
647 if (obj instanceof SequencerCall) {
648 SequencerCall that = (SequencerCall)obj;
649 if (!this.sequencerName.equals(that.sequencerName)) return false;
650 if (!this.sequencedPropertyName.equals(that.sequencedPropertyName)) return false;
651 return true;
652 }
653 return false;
654 }
655 }
656 }