001 /*
002 * JBoss DNA (http://www.jboss.org/dna)
003 * See the COPYRIGHT.txt file distributed with this work for information
004 * regarding copyright ownership. Some portions may be licensed
005 * to Red Hat, Inc. under one or more contributor license agreements.
006 * See the AUTHORS.txt file in the distribution for a full listing of
007 * individual contributors.
008 *
009 * JBoss DNA is free software. Unless otherwise indicated, all code in JBoss DNA
010 * is licensed to you under the terms of the GNU Lesser General Public License as
011 * published by the Free Software Foundation; either version 2.1 of
012 * the License, or (at your option) any later version.
013 *
014 * JBoss DNA is distributed in the hope that it will be useful,
015 * but WITHOUT ANY WARRANTY; without even the implied warranty of
016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
017 * Lesser General Public License for more details.
018 *
019 * You should have received a copy of the GNU Lesser General Public
020 * License along with this software; if not, write to the Free
021 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
022 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
023 */
024 package org.jboss.dna.repository.sequencer;
025
026 import java.util.ArrayList;
027 import java.util.HashMap;
028 import java.util.HashSet;
029 import java.util.List;
030 import java.util.Map;
031 import java.util.Set;
032 import java.util.concurrent.ExecutorService;
033 import java.util.concurrent.Executors;
034 import java.util.concurrent.RejectedExecutionException;
035 import java.util.concurrent.TimeUnit;
036 import java.util.concurrent.atomic.AtomicLong;
037 import net.jcip.annotations.Immutable;
038 import net.jcip.annotations.ThreadSafe;
039 import org.jboss.dna.common.collection.SimpleProblems;
040 import org.jboss.dna.common.component.ClassLoaderFactory;
041 import org.jboss.dna.common.component.ComponentLibrary;
042 import org.jboss.dna.common.component.StandardClassLoaderFactory;
043 import org.jboss.dna.common.util.CheckArg;
044 import org.jboss.dna.common.util.HashCode;
045 import org.jboss.dna.common.util.Logger;
046 import org.jboss.dna.graph.ExecutionContext;
047 import org.jboss.dna.graph.Graph;
048 import org.jboss.dna.graph.Node;
049 import org.jboss.dna.graph.connector.RepositorySource;
050 import org.jboss.dna.graph.observe.ChangeObserver;
051 import org.jboss.dna.graph.observe.NetChangeObserver;
052 import org.jboss.dna.graph.observe.NetChangeObserver.NetChange;
053 import org.jboss.dna.graph.property.Name;
054 import org.jboss.dna.graph.property.Path;
055 import org.jboss.dna.graph.property.Property;
056 import org.jboss.dna.repository.RepositoryI18n;
057 import org.jboss.dna.repository.RepositoryLibrary;
058 import org.jboss.dna.repository.service.AbstractServiceAdministrator;
059 import org.jboss.dna.repository.service.AdministeredService;
060 import org.jboss.dna.repository.service.ServiceAdministrator;
061 import org.jboss.dna.repository.util.RepositoryNodePath;
062
063 /**
064 * A sequencing system is used to monitor changes in the content of DNA repositories and to sequence the content to extract or to
065 * generate structured information.
066 *
067 * @author Randall Hauch
068 * @author John Verhaeg
069 */
070 public class SequencingService implements AdministeredService {
071
072 /**
073 * Interface used to select the set of {@link Sequencer} instances that should be run.
074 *
075 * @author Randall Hauch
076 */
077 public static interface Selector {
078
079 /**
080 * Select the sequencers that should be used to sequence the supplied node.
081 *
082 * @param sequencers the list of all sequencers available at the moment; never null
083 * @param node the node to be sequenced; never null
084 * @param nodeChange the set of node changes; never null
085 * @return the list of sequencers that should be used; may not be null
086 */
087 List<Sequencer> selectSequencers( List<Sequencer> sequencers,
088 Node node,
089 NetChange nodeChange );
090 }
091
092 /**
093 * The default {@link Selector} implementation that selects every sequencer every time it's called, regardless of the node (or
094 * logger) supplied.
095 *
096 * @author Randall Hauch
097 */
098 protected static class DefaultSelector implements Selector {
099
100 public List<Sequencer> selectSequencers( List<Sequencer> sequencers,
101 Node node,
102 NetChange nodeChange ) {
103 return sequencers;
104 }
105 }
106
107 /**
108 * The default {@link Selector} that considers every {@link Sequencer} to be used for every node.
109 *
110 * @see SequencingService#setSequencerSelector(org.jboss.dna.repository.sequencer.SequencingService.Selector)
111 */
112 public static final Selector DEFAULT_SEQUENCER_SELECTOR = new DefaultSelector();
113
114 /**
115 * Class loader factory instance that always returns the {@link Thread#getContextClassLoader() current thread's context class
116 * loader} (if not null) or component library's class loader.
117 */
118 protected static final ClassLoaderFactory DEFAULT_CLASSLOADER_FACTORY = new StandardClassLoaderFactory(
119 SequencingService.class.getClassLoader());
120
121 /**
122 * The administrative component for this service.
123 *
124 * @author Randall Hauch
125 */
126 protected class Administrator extends AbstractServiceAdministrator {
127
128 protected Administrator() {
129 super(RepositoryI18n.sequencingServiceName, State.PAUSED);
130 }
131
132 /**
133 * {@inheritDoc}
134 */
135 @Override
136 protected void doStart( State fromState ) {
137 super.doStart(fromState);
138 startService();
139 }
140
141 /**
142 * {@inheritDoc}
143 */
144 @Override
145 protected void doShutdown( State fromState ) {
146 super.doShutdown(fromState);
147 shutdownService();
148 }
149
150 /**
151 * {@inheritDoc}
152 */
153 @Override
154 protected boolean doCheckIsTerminated() {
155 return isServiceTerminated();
156 }
157
158 /**
159 * {@inheritDoc}
160 */
161 public boolean awaitTermination( long timeout,
162 TimeUnit unit ) throws InterruptedException {
163 return doAwaitTermination(timeout, unit);
164 }
165
166 }
167
168 private ExecutionContext executionContext;
169 private SequencerLibrary sequencerLibrary = new SequencerLibrary();
170 private Selector sequencerSelector = DEFAULT_SEQUENCER_SELECTOR;
171 private ExecutorService executorService;
172 private RepositoryLibrary repositoryLibrary;
173 private ChangeObserver repositoryObserver;
174 private final Statistics statistics = new Statistics();
175 private final Administrator administrator = new Administrator();
176
177 /**
178 * Create a new sequencing system, configured with no sequencers and not monitoring any workspaces. Upon construction, the
179 * system is {@link ServiceAdministrator#isPaused() paused} and must be configured and then
180 * {@link ServiceAdministrator#start() started}.
181 */
182 public SequencingService() {
183 this.sequencerLibrary.setClassLoaderFactory(DEFAULT_CLASSLOADER_FACTORY);
184 }
185
186 /**
187 * Return the administrative component for this service.
188 *
189 * @return the administrative component; never null
190 */
191 public ServiceAdministrator getAdministrator() {
192 return this.administrator;
193 }
194
195 /**
196 * Get the statistics for this system.
197 *
198 * @return statistics
199 */
200 public Statistics getStatistics() {
201 return this.statistics;
202 }
203
204 /**
205 * @return sequencerLibrary
206 */
207 protected ComponentLibrary<Sequencer, SequencerConfig> getSequencerLibrary() {
208 return this.sequencerLibrary;
209 }
210
211 /**
212 * Add the configuration for a sequencer, or update any existing one that represents the
213 * {@link SequencerConfig#equals(Object) same configuration}
214 *
215 * @param config the new configuration
216 * @return true if the sequencer was added, or false if there already was an existing and
217 * {@link SequencerConfig#hasChanged(SequencerConfig) unchanged} sequencer configuration
218 * @throws IllegalArgumentException if <code>config</code> is null
219 * @see #updateSequencer(SequencerConfig)
220 * @see #removeSequencer(SequencerConfig)
221 */
222 public boolean addSequencer( SequencerConfig config ) {
223 return this.sequencerLibrary.add(config);
224 }
225
226 /**
227 * Update the configuration for a sequencer, or add it if there is no {@link SequencerConfig#equals(Object) matching
228 * configuration}.
229 *
230 * @param config the updated (or new) configuration
231 * @return true if the sequencer was updated, or false if there already was an existing and
232 * {@link SequencerConfig#hasChanged(SequencerConfig) unchanged} sequencer configuration
233 * @throws IllegalArgumentException if <code>config</code> is null
234 * @see #addSequencer(SequencerConfig)
235 * @see #removeSequencer(SequencerConfig)
236 */
237 public boolean updateSequencer( SequencerConfig config ) {
238 return this.sequencerLibrary.update(config);
239 }
240
241 /**
242 * Remove the configuration for a sequencer.
243 *
244 * @param config the configuration to be removed
245 * @return true if the sequencer was removed, or false if there was no existing sequencer
246 * @throws IllegalArgumentException if <code>config</code> is null
247 * @see #addSequencer(SequencerConfig)
248 * @see #updateSequencer(SequencerConfig)
249 */
250 public boolean removeSequencer( SequencerConfig config ) {
251 return this.sequencerLibrary.remove(config);
252 }
253
254 /**
255 * @return executionContext
256 */
257 public ExecutionContext getExecutionContext() {
258 return this.executionContext;
259 }
260
261 /**
262 * @param executionContext Sets executionContext to the specified value.
263 */
264 public void setExecutionContext( ExecutionContext executionContext ) {
265 CheckArg.isNotNull(executionContext, "execution context");
266 if (this.getAdministrator().isStarted()) {
267 throw new IllegalStateException(RepositoryI18n.unableToChangeExecutionContextWhileRunning.text());
268 }
269 this.executionContext = executionContext;
270 this.sequencerLibrary.setClassLoaderFactory(executionContext);
271 }
272
273 /**
274 * Get the repository library to be used for repository lookup
275 *
276 * @return the repository library
277 */
278 public RepositoryLibrary getRepositoryLibrary() {
279 return this.repositoryLibrary;
280 }
281
282 public void setRepositoryLibrary( RepositoryLibrary repositoryLibrary ) {
283 this.repositoryLibrary = repositoryLibrary;
284 }
285
286 /**
287 * Get the executor service used to run the sequencers.
288 *
289 * @return the executor service
290 * @see #setExecutorService(ExecutorService)
291 */
292 public ExecutorService getExecutorService() {
293 return this.executorService;
294 }
295
296 /**
297 * Set the executor service that should be used by this system. By default, the system is set up with a
298 * {@link Executors#newSingleThreadExecutor() executor that uses a single thread}.
299 *
300 * @param executorService the executor service
301 * @see #getExecutorService()
302 * @see Executors#newCachedThreadPool()
303 * @see Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory)
304 * @see Executors#newFixedThreadPool(int)
305 * @see Executors#newFixedThreadPool(int, java.util.concurrent.ThreadFactory)
306 * @see Executors#newScheduledThreadPool(int)
307 * @see Executors#newScheduledThreadPool(int, java.util.concurrent.ThreadFactory)
308 * @see Executors#newSingleThreadExecutor()
309 * @see Executors#newSingleThreadExecutor(java.util.concurrent.ThreadFactory)
310 * @see Executors#newSingleThreadScheduledExecutor()
311 * @see Executors#newSingleThreadScheduledExecutor(java.util.concurrent.ThreadFactory)
312 */
313 public void setExecutorService( ExecutorService executorService ) {
314 CheckArg.isNotNull(executorService, "executor service");
315 if (this.getAdministrator().isStarted()) {
316 throw new IllegalStateException(RepositoryI18n.unableToChangeExecutionContextWhileRunning.text());
317 }
318 this.executorService = executorService;
319 }
320
321 /**
322 * Override this method to creates a different kind of default executor service. This method is called when the system is
323 * {@link #startService() started} without an executor service being {@link #setExecutorService(ExecutorService) set}.
324 * <p>
325 * This method creates a {@link Executors#newSingleThreadExecutor() single-threaded executor}.
326 * </p>
327 *
328 * @return the executor service
329 */
330 protected ExecutorService createDefaultExecutorService() {
331 return Executors.newSingleThreadExecutor();
332 }
333
334 protected void startService() {
335 if (this.getExecutionContext() == null) {
336 throw new IllegalStateException(RepositoryI18n.unableToStartSequencingServiceWithoutExecutionContext.text());
337 }
338 if (this.executorService == null) {
339 this.executorService = createDefaultExecutorService();
340 }
341 assert this.executorService != null;
342 assert this.sequencerSelector != null;
343 assert this.sequencerLibrary != null;
344 assert this.repositoryLibrary != null;
345 this.repositoryObserver = new RepositoryObserver();
346 // Register the observer ...
347 this.repositoryLibrary.register(this.repositoryObserver);
348 }
349
350 protected void shutdownService() {
351 // Unregister our observer ...
352 if (this.repositoryObserver != null) this.repositoryObserver.unregister();
353 // And shut down the executor service ..
354 if (this.executorService != null) {
355 this.executorService.shutdown();
356 }
357 }
358
359 protected boolean isServiceTerminated() {
360 if (this.executorService != null) {
361 return this.executorService.isTerminated();
362 }
363 return true;
364 }
365
366 protected boolean doAwaitTermination( long timeout,
367 TimeUnit unit ) throws InterruptedException {
368 if (this.executorService == null || this.executorService.isTerminated()) return true;
369 return this.executorService.awaitTermination(timeout, unit);
370 }
371
372 /**
373 * Get the sequencing selector used by this system.
374 *
375 * @return the sequencing selector
376 */
377 public Selector getSequencerSelector() {
378 return this.sequencerSelector;
379 }
380
381 /**
382 * Set the sequencer selector, or null if the {@link #DEFAULT_SEQUENCER_SELECTOR default sequencer selector} should be used.
383 *
384 * @param sequencerSelector the selector
385 */
386 public void setSequencerSelector( Selector sequencerSelector ) {
387 this.sequencerSelector = sequencerSelector != null ? sequencerSelector : DEFAULT_SEQUENCER_SELECTOR;
388 }
389
390 /**
391 * Do the work of processing by sequencing the node. This method is called by the {@link #executorService executor service}
392 * when it performs it's work on the enqueued {@link NetChange NetChange runnable objects}.
393 *
394 * @param change the change describing the node to be processed.
395 */
396 protected void processChange( NetChange change ) {
397 final ExecutionContext context = this.getExecutionContext();
398 final Logger logger = context.getLogger(getClass());
399 assert logger != null;
400
401 try {
402 final String repositorySourceName = change.getRepositorySourceName();
403 final String repositoryWorkspaceName = change.getRepositoryWorkspaceName();
404
405 // Figure out which sequencers accept this path,
406 // and track which output nodes should be passed to each sequencer...
407 final Path nodePath = change.getPath();
408 final String nodePathStr = context.getValueFactories().getStringFactory().create(nodePath);
409 Map<SequencerCall, Set<RepositoryNodePath>> sequencerCalls = new HashMap<SequencerCall, Set<RepositoryNodePath>>();
410 List<Sequencer> allSequencers = this.sequencerLibrary.getInstances();
411 List<Sequencer> sequencers = new ArrayList<Sequencer>(allSequencers.size());
412 for (Sequencer sequencer : allSequencers) {
413 final SequencerConfig config = sequencer.getConfiguration();
414 for (SequencerPathExpression pathExpression : config.getPathExpressions()) {
415 for (Property property : change.getModifiedProperties()) {
416 Name propertyName = property.getName();
417 String propertyNameStr = context.getValueFactories().getStringFactory().create(propertyName);
418 String path = nodePathStr + "/@" + propertyNameStr;
419 SequencerPathExpression.Matcher matcher = pathExpression.matcher(path);
420 if (matcher.matches()) {
421 // String selectedPath = matcher.getSelectedPath();
422 RepositoryNodePath outputPath = RepositoryNodePath.parse(matcher.getOutputPath(),
423 repositorySourceName,
424 repositoryWorkspaceName);
425 SequencerCall call = new SequencerCall(sequencer, propertyNameStr);
426 // Record the output path ...
427 Set<RepositoryNodePath> outputPaths = sequencerCalls.get(call);
428 if (outputPaths == null) {
429 outputPaths = new HashSet<RepositoryNodePath>();
430 sequencerCalls.put(call, outputPaths);
431 }
432 outputPaths.add(outputPath);
433 sequencers.add(sequencer);
434 break;
435 }
436 }
437 }
438 }
439
440 RepositorySource source = repositoryLibrary.getSource(repositorySourceName);
441 Graph graph = Graph.create(source, context);
442 Node node = null;
443 if (!sequencers.isEmpty()) {
444
445 // Find the changed node ...
446 node = graph.getNodeAt(nodePath);
447
448 // Figure out which sequencers should run ...
449 sequencers = this.sequencerSelector.selectSequencers(sequencers, node, change);
450 }
451 if (sequencers.isEmpty()) {
452 this.statistics.recordNodeSkipped();
453 if (logger.isDebugEnabled()) {
454 logger.trace("Skipping '{0}': no sequencers matched this condition", change);
455 }
456 } else {
457 // Run each of those sequencers ...
458 for (Map.Entry<SequencerCall, Set<RepositoryNodePath>> entry : sequencerCalls.entrySet()) {
459
460 final SequencerCall sequencerCall = entry.getKey();
461 final Set<RepositoryNodePath> outputPaths = entry.getValue();
462 final Sequencer sequencer = sequencerCall.getSequencer();
463 final String sequencerName = sequencer.getConfiguration().getName();
464 final String propertyName = sequencerCall.getSequencedPropertyName();
465
466 // Get the paths to the nodes where the sequencer should write it's output ...
467 assert outputPaths != null && outputPaths.size() != 0;
468
469 // Create a new execution context for each sequencer
470 final SimpleProblems problems = new SimpleProblems();
471 SequencerContext sequencerContext = new SequencerContext(context, graph);
472 try {
473 sequencer.execute(node, propertyName, change, outputPaths, sequencerContext, problems);
474 sequencerContext.getDestination().submit();
475 } catch (SequencerException e) {
476 logger.error(e, RepositoryI18n.errorWhileSequencingNode, sequencerName, change);
477 }
478 }
479 this.statistics.recordNodeSequenced();
480 }
481 } catch (Throwable e) {
482 logger.error(e, RepositoryI18n.errorFindingSequencersToRunAgainstNode, change);
483 }
484 }
485
486 /**
487 * The statistics for the system. Each sequencing system has an instance of this class that is updated.
488 *
489 * @author Randall Hauch
490 */
491 @ThreadSafe
492 public class Statistics {
493
494 private final AtomicLong numberOfNodesSequenced = new AtomicLong(0);
495 private final AtomicLong numberOfNodesSkipped = new AtomicLong(0);
496 private final AtomicLong startTime;
497
498 protected Statistics() {
499 startTime = new AtomicLong(System.currentTimeMillis());
500 }
501
502 public Statistics reset() {
503 this.startTime.set(System.currentTimeMillis());
504 this.numberOfNodesSequenced.set(0);
505 this.numberOfNodesSkipped.set(0);
506 return this;
507 }
508
509 /**
510 * @return the system time when the statistics were started
511 */
512 public long getStartTime() {
513 return this.startTime.get();
514 }
515
516 /**
517 * @return the number of nodes that were sequenced
518 */
519 public long getNumberOfNodesSequenced() {
520 return this.numberOfNodesSequenced.get();
521 }
522
523 /**
524 * @return the number of nodes that were skipped because no sequencers applied
525 */
526 public long getNumberOfNodesSkipped() {
527 return this.numberOfNodesSkipped.get();
528 }
529
530 protected void recordNodeSequenced() {
531 this.numberOfNodesSequenced.incrementAndGet();
532 }
533
534 protected void recordNodeSkipped() {
535 this.numberOfNodesSkipped.incrementAndGet();
536 }
537 }
538
539 @Immutable
540 protected class SequencerCall {
541
542 private final Sequencer sequencer;
543 private final String sequencerName;
544 private final String sequencedPropertyName;
545 private final int hc;
546
547 protected SequencerCall( Sequencer sequencer,
548 String sequencedPropertyName ) {
549 this.sequencer = sequencer;
550 this.sequencerName = sequencer.getConfiguration().getName();
551 this.sequencedPropertyName = sequencedPropertyName;
552 this.hc = HashCode.compute(this.sequencerName, this.sequencedPropertyName);
553 }
554
555 /**
556 * @return sequencer
557 */
558 public Sequencer getSequencer() {
559 return this.sequencer;
560 }
561
562 /**
563 * @return sequencedPropertyName
564 */
565 public String getSequencedPropertyName() {
566 return this.sequencedPropertyName;
567 }
568
569 /**
570 * {@inheritDoc}
571 */
572 @Override
573 public int hashCode() {
574 return this.hc;
575 }
576
577 /**
578 * {@inheritDoc}
579 */
580 @Override
581 public boolean equals( Object obj ) {
582 if (obj == this) return true;
583 if (obj instanceof SequencerCall) {
584 SequencerCall that = (SequencerCall)obj;
585 if (!this.sequencerName.equals(that.sequencerName)) return false;
586 if (!this.sequencedPropertyName.equals(that.sequencedPropertyName)) return false;
587 return true;
588 }
589 return false;
590 }
591 }
592
593 protected class RepositoryObserver extends NetChangeObserver {
594 protected RepositoryObserver() {
595 }
596
597 /**
598 * {@inheritDoc}
599 *
600 * @see org.jboss.dna.graph.observe.NetChangeObserver#notify(org.jboss.dna.graph.observe.NetChangeObserver.NetChange)
601 */
602 @Override
603 protected void notify( final NetChange change ) {
604 // Only care about new nodes or nodes that have new/changed properies ...
605 if (change.includes(ChangeType.NODE_ADDED, ChangeType.PROPERTY_ADDED, ChangeType.PROPERTY_CHANGED)) {
606 try {
607 getExecutorService().execute(new Runnable() {
608
609 public void run() {
610 processChange(change);
611 }
612 });
613 } catch (RejectedExecutionException e) {
614 // The executor service has been shut down, so do nothing with this set of changes
615 }
616 }
617 }
618 }
619 }