View Javadoc

1   /*
2    * ModeShape (http://www.modeshape.org)
3    * See the COPYRIGHT.txt file distributed with this work for information
4    * regarding copyright ownership.  Some portions may be licensed
5    * to Red Hat, Inc. under one or more contributor license agreements.
6    * See the AUTHORS.txt file in the distribution for a full listing of 
7    * individual contributors. 
8    *
9    * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
10   * is licensed to you under the terms of the GNU Lesser General Public License as
11   * published by the Free Software Foundation; either version 2.1 of
12   * the License, or (at your option) any later version.
13   *
14   * ModeShape is distributed in the hope that it will be useful,
15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17   * Lesser General Public License for more details.
18   *
19   * You should have received a copy of the GNU Lesser General Public
20   * License along with this software; if not, write to the Free
21   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23   */
24  package org.modeshape.repository.sequencer;
25  
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.LinkedList;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Set;
32  import java.util.concurrent.ExecutorService;
33  import java.util.concurrent.Executors;
34  import java.util.concurrent.RejectedExecutionException;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.atomic.AtomicLong;
37  import net.jcip.annotations.Immutable;
38  import net.jcip.annotations.ThreadSafe;
39  import org.modeshape.common.collection.SimpleProblems;
40  import org.modeshape.common.component.ClassLoaderFactory;
41  import org.modeshape.common.component.ComponentLibrary;
42  import org.modeshape.common.component.StandardClassLoaderFactory;
43  import org.modeshape.common.util.CheckArg;
44  import org.modeshape.common.util.HashCode;
45  import org.modeshape.common.util.Logger;
46  import org.modeshape.common.util.NamedThreadFactory;
47  import org.modeshape.graph.ExecutionContext;
48  import org.modeshape.graph.Graph;
49  import org.modeshape.graph.Node;
50  import org.modeshape.graph.connector.RepositorySource;
51  import org.modeshape.graph.observe.ChangeObserver;
52  import org.modeshape.graph.observe.NetChangeObserver;
53  import org.modeshape.graph.observe.NetChangeObserver.ChangeType;
54  import org.modeshape.graph.observe.NetChangeObserver.NetChange;
55  import org.modeshape.graph.observe.NetChangeObserver.NetChanges;
56  import org.modeshape.graph.property.Name;
57  import org.modeshape.graph.property.Path;
58  import org.modeshape.graph.property.Property;
59  import org.modeshape.repository.RepositoryI18n;
60  import org.modeshape.repository.RepositoryLibrary;
61  import org.modeshape.repository.service.AbstractServiceAdministrator;
62  import org.modeshape.repository.service.AdministeredService;
63  import org.modeshape.repository.service.ServiceAdministrator;
64  import org.modeshape.repository.util.RepositoryNodePath;
65  
66  /**
67   * A sequencing system is used to monitor changes in the content of ModeShape repositories and to sequence the content to extract or to
68   * generate structured information.
69   */
70  public class SequencingService implements AdministeredService {
71  
72      /**
73       * Interface used to select the set of {@link Sequencer} instances that should be run.
74       * 
75       * @author Randall Hauch
76       */
77      public static interface Selector {
78  
79          /**
80           * Select the sequencers that should be used to sequence the supplied node.
81           * 
82           * @param sequencers the list of all sequencers available at the moment; never null
83           * @param node the node to be sequenced; never null
84           * @param nodeChange the set of node changes; never null
85           * @return the list of sequencers that should be used; may not be null
86           */
87          List<Sequencer> selectSequencers( List<Sequencer> sequencers,
88                                            Node node,
89                                            NetChange nodeChange );
90      }
91  
92      /**
93       * The default {@link Selector} implementation that selects every sequencer every time it's called, regardless of the node (or
94       * logger) supplied.
95       * 
96       * @author Randall Hauch
97       */
98      protected static class DefaultSelector implements Selector {
99  
100         public List<Sequencer> selectSequencers( List<Sequencer> sequencers,
101                                                  Node node,
102                                                  NetChange nodeChange ) {
103             return sequencers;
104         }
105     }
106 
107     /**
108      * The default {@link Selector} that considers every {@link Sequencer} to be used for every node.
109      * 
110      * @see SequencingService#setSequencerSelector(org.modeshape.repository.sequencer.SequencingService.Selector)
111      */
112     public static final Selector DEFAULT_SEQUENCER_SELECTOR = new DefaultSelector();
113 
114     /**
115      * Class loader factory instance that always returns the {@link Thread#getContextClassLoader() current thread's context class
116      * loader} (if not null) or component library's class loader.
117      */
118     protected static final ClassLoaderFactory DEFAULT_CLASSLOADER_FACTORY = new StandardClassLoaderFactory(
119                                                                                                            SequencingService.class.getClassLoader());
120 
121     /**
122      * The administrative component for this service.
123      * 
124      * @author Randall Hauch
125      */
126     protected class Administrator extends AbstractServiceAdministrator {
127 
128         protected Administrator() {
129             super(RepositoryI18n.sequencingServiceName, State.PAUSED);
130         }
131 
132         /**
133          * {@inheritDoc}
134          */
135         @Override
136         protected void doStart( State fromState ) {
137             super.doStart(fromState);
138             startService();
139         }
140 
141         /**
142          * {@inheritDoc}
143          */
144         @Override
145         protected void doShutdown( State fromState ) {
146             super.doShutdown(fromState);
147             shutdownService();
148         }
149 
150         /**
151          * {@inheritDoc}
152          */
153         @Override
154         protected boolean doCheckIsTerminated() {
155             return isServiceTerminated();
156         }
157 
158         /**
159          * {@inheritDoc}
160          */
161         public boolean awaitTermination( long timeout,
162                                          TimeUnit unit ) throws InterruptedException {
163             return doAwaitTermination(timeout, unit);
164         }
165 
166     }
167 
168     private ExecutionContext executionContext;
169     private SequencerLibrary sequencerLibrary = new SequencerLibrary();
170     private Selector sequencerSelector = DEFAULT_SEQUENCER_SELECTOR;
171     private ExecutorService executorService;
172     private RepositoryLibrary repositoryLibrary;
173     private ChangeObserver repositoryObserver;
174     private final Statistics statistics = new Statistics();
175     private final Administrator administrator = new Administrator();
176 
177     /**
178      * Create a new sequencing system, configured with no sequencers and not monitoring any workspaces. Upon construction, the
179      * system is {@link ServiceAdministrator#isPaused() paused} and must be configured and then
180      * {@link ServiceAdministrator#start() started}.
181      */
182     public SequencingService() {
183         this.sequencerLibrary.setClassLoaderFactory(DEFAULT_CLASSLOADER_FACTORY);
184     }
185 
186     /**
187      * Return the administrative component for this service.
188      * 
189      * @return the administrative component; never null
190      */
191     public ServiceAdministrator getAdministrator() {
192         return this.administrator;
193     }
194 
195     /**
196      * Get the statistics for this system.
197      * 
198      * @return statistics
199      */
200     public Statistics getStatistics() {
201         return this.statistics;
202     }
203 
204     /**
205      * @return sequencerLibrary
206      */
207     protected ComponentLibrary<Sequencer, SequencerConfig> getSequencerLibrary() {
208         return this.sequencerLibrary;
209     }
210 
211     /**
212      * Add the configuration for a sequencer, or update any existing one that represents the
213      * {@link SequencerConfig#equals(Object) same configuration}
214      * 
215      * @param config the new configuration
216      * @return true if the sequencer was added, or false if there already was an existing and
217      *         {@link SequencerConfig#hasChanged(SequencerConfig) unchanged} sequencer configuration
218      * @throws IllegalArgumentException if <code>config</code> is null
219      * @see #updateSequencer(SequencerConfig)
220      * @see #removeSequencer(SequencerConfig)
221      */
222     public boolean addSequencer( SequencerConfig config ) {
223         return this.sequencerLibrary.add(config);
224     }
225 
226     /**
227      * Update the configuration for a sequencer, or add it if there is no {@link SequencerConfig#equals(Object) matching
228      * configuration}.
229      * 
230      * @param config the updated (or new) configuration
231      * @return true if the sequencer was updated, or false if there already was an existing and
232      *         {@link SequencerConfig#hasChanged(SequencerConfig) unchanged} sequencer configuration
233      * @throws IllegalArgumentException if <code>config</code> is null
234      * @see #addSequencer(SequencerConfig)
235      * @see #removeSequencer(SequencerConfig)
236      */
237     public boolean updateSequencer( SequencerConfig config ) {
238         return this.sequencerLibrary.update(config);
239     }
240 
241     /**
242      * Remove the configuration for a sequencer.
243      * 
244      * @param config the configuration to be removed
245      * @return true if the sequencer was removed, or false if there was no existing sequencer
246      * @throws IllegalArgumentException if <code>config</code> is null
247      * @see #addSequencer(SequencerConfig)
248      * @see #updateSequencer(SequencerConfig)
249      */
250     public boolean removeSequencer( SequencerConfig config ) {
251         return this.sequencerLibrary.remove(config);
252     }
253 
254     /**
255      * @return executionContext
256      */
257     public ExecutionContext getExecutionContext() {
258         return this.executionContext;
259     }
260 
261     /**
262      * @param executionContext Sets executionContext to the specified value.
263      */
264     public void setExecutionContext( ExecutionContext executionContext ) {
265         CheckArg.isNotNull(executionContext, "execution context");
266         if (this.getAdministrator().isStarted()) {
267             throw new IllegalStateException(RepositoryI18n.unableToChangeExecutionContextWhileRunning.text());
268         }
269         this.executionContext = executionContext;
270         this.sequencerLibrary.setClassLoaderFactory(executionContext);
271     }
272 
273     /**
274      * Get the repository library to be used for repository lookup
275      * 
276      * @return the repository library
277      */
278     public RepositoryLibrary getRepositoryLibrary() {
279         return this.repositoryLibrary;
280     }
281 
282     public void setRepositoryLibrary( RepositoryLibrary repositoryLibrary ) {
283         this.repositoryLibrary = repositoryLibrary;
284     }
285 
286     /**
287      * Get the executor service used to run the sequencers.
288      * 
289      * @return the executor service
290      * @see #setExecutorService(ExecutorService)
291      */
292     public ExecutorService getExecutorService() {
293         return this.executorService;
294     }
295 
296     /**
297      * Set the executor service that should be used by this system. By default, the system is set up with a
298      * {@link Executors#newSingleThreadExecutor() executor that uses a single thread}.
299      * 
300      * @param executorService the executor service
301      * @see #getExecutorService()
302      * @see Executors#newCachedThreadPool()
303      * @see Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory)
304      * @see Executors#newFixedThreadPool(int)
305      * @see Executors#newFixedThreadPool(int, java.util.concurrent.ThreadFactory)
306      * @see Executors#newScheduledThreadPool(int)
307      * @see Executors#newScheduledThreadPool(int, java.util.concurrent.ThreadFactory)
308      * @see Executors#newSingleThreadExecutor()
309      * @see Executors#newSingleThreadExecutor(java.util.concurrent.ThreadFactory)
310      * @see Executors#newSingleThreadScheduledExecutor()
311      * @see Executors#newSingleThreadScheduledExecutor(java.util.concurrent.ThreadFactory)
312      */
313     public void setExecutorService( ExecutorService executorService ) {
314         CheckArg.isNotNull(executorService, "executor service");
315         if (this.getAdministrator().isStarted()) {
316             throw new IllegalStateException(RepositoryI18n.unableToChangeExecutionContextWhileRunning.text());
317         }
318         this.executorService = executorService;
319     }
320 
321     /**
322      * Override this method to creates a different kind of default executor service. This method is called when the system is
323      * {@link #startService() started} without an executor service being {@link #setExecutorService(ExecutorService) set}.
324      * <p>
325      * This method creates a {@link Executors#newSingleThreadExecutor() single-threaded executor}.
326      * </p>
327      * 
328      * @return the executor service
329      */
330     protected ExecutorService createDefaultExecutorService() {
331         return Executors.newSingleThreadExecutor(new NamedThreadFactory("sequencing"));
332     }
333 
334     protected void startService() {
335         if (this.getExecutionContext() == null) {
336             throw new IllegalStateException(RepositoryI18n.unableToStartSequencingServiceWithoutExecutionContext.text());
337         }
338         if (this.executorService == null) {
339             this.executorService = createDefaultExecutorService();
340         }
341         assert this.executorService != null;
342         assert this.sequencerSelector != null;
343         assert this.sequencerLibrary != null;
344         assert this.repositoryLibrary != null;
345         this.repositoryObserver = new RepositoryObserver();
346         // Register the observer ...
347         this.repositoryLibrary.register(this.repositoryObserver);
348     }
349 
350     protected void shutdownService() {
351         // Unregister our observer ...
352         if (this.repositoryObserver != null) this.repositoryObserver.unregister();
353         // And shut down the executor service ..
354         if (this.executorService != null) {
355             this.executorService.shutdown();
356         }
357     }
358 
359     protected boolean isServiceTerminated() {
360         if (this.executorService != null) {
361             return this.executorService.isTerminated();
362         }
363         return true;
364     }
365 
366     protected boolean doAwaitTermination( long timeout,
367                                           TimeUnit unit ) throws InterruptedException {
368         if (this.executorService == null || this.executorService.isTerminated()) return true;
369         return this.executorService.awaitTermination(timeout, unit);
370     }
371 
372     /**
373      * Get the sequencing selector used by this system.
374      * 
375      * @return the sequencing selector
376      */
377     public Selector getSequencerSelector() {
378         return this.sequencerSelector;
379     }
380 
381     /**
382      * Set the sequencer selector, or null if the {@link #DEFAULT_SEQUENCER_SELECTOR default sequencer selector} should be used.
383      * 
384      * @param sequencerSelector the selector
385      */
386     public void setSequencerSelector( Selector sequencerSelector ) {
387         this.sequencerSelector = sequencerSelector != null ? sequencerSelector : DEFAULT_SEQUENCER_SELECTOR;
388     }
389 
390     /**
391      * Do the work of processing by sequencing the node. This method is called by the {@link #executorService executor service}
392      * when it performs it's work on the enqueued {@link NetChange NetChange runnable objects}.
393      * 
394      * @param changes the change describing the node to be processed.
395      */
396     protected void processChange( NetChanges changes ) {
397         final ExecutionContext context = this.getExecutionContext();
398         final Logger logger = context.getLogger(getClass());
399         assert logger != null;
400 
401         try {
402             List<Sequencer> allSequencers = null;
403             final String repositorySourceName = changes.getSourceName();
404             for (NetChange change : changes.getNetChanges()) {
405                 // Go through each net change, and only process node/property adds and property changes ...
406                 if (change.includes(ChangeType.NODE_ADDED, ChangeType.PROPERTY_ADDED, ChangeType.PROPERTY_CHANGED)) {
407                     final String repositoryWorkspaceName = change.getRepositoryWorkspaceName();
408 
409                     // Figure out which sequencers accept this path,
410                     // and track which output nodes should be passed to each sequencer...
411                     final Path nodePath = change.getPath();
412                     final String nodePathStr = context.getValueFactories().getStringFactory().create(nodePath);
413                     Map<SequencerCall, Set<RepositoryNodePath>> sequencerCalls = new HashMap<SequencerCall, Set<RepositoryNodePath>>();
414                     if (allSequencers == null) {
415                         allSequencers = this.sequencerLibrary.getInstances();
416                     }
417                     List<Sequencer> sequencers = new LinkedList<Sequencer>();
418                     for (Sequencer sequencer : allSequencers) {
419                         final SequencerConfig config = sequencer.getConfiguration();
420                         for (SequencerPathExpression pathExpression : config.getPathExpressions()) {
421                             for (Property property : change.getAddedOrModifiedProperties()) {
422                                 Name propertyName = property.getName();
423                                 String propertyNameStr = context.getValueFactories().getStringFactory().create(propertyName);
424                                 String path = nodePathStr + "/@" + propertyNameStr;
425                                 SequencerPathExpression.Matcher matcher = pathExpression.matcher(path);
426                                 if (matcher.matches()) {
427                                     // String selectedPath = matcher.getSelectedPath();
428                                     RepositoryNodePath outputPath = RepositoryNodePath.parse(matcher.getOutputPath(),
429                                                                                              repositorySourceName,
430                                                                                              repositoryWorkspaceName);
431                                     SequencerCall call = new SequencerCall(sequencer, propertyNameStr);
432                                     // Record the output path ...
433                                     Set<RepositoryNodePath> outputPaths = sequencerCalls.get(call);
434                                     if (outputPaths == null) {
435                                         outputPaths = new HashSet<RepositoryNodePath>();
436                                         sequencerCalls.put(call, outputPaths);
437                                     }
438                                     outputPaths.add(outputPath);
439                                     sequencers.add(sequencer);
440                                     break;
441                                 }
442                             }
443                         }
444                     }
445 
446                     RepositorySource source = repositoryLibrary.getSource(repositorySourceName);
447                     Graph graph = Graph.create(source, context);
448                     Node node = null;
449                     if (!sequencers.isEmpty()) {
450 
451                         // Find the changed node ...
452                         node = graph.getNodeAt(nodePath);
453 
454                         // Figure out which sequencers should run ...
455                         sequencers = this.sequencerSelector.selectSequencers(sequencers, node, change);
456                     }
457                     if (sequencers.isEmpty()) {
458                         this.statistics.recordNodeSkipped();
459                         if (logger.isDebugEnabled()) {
460                             logger.trace("Skipping '{0}': no sequencers matched this condition", change);
461                         }
462                     } else {
463                         // Run each of those sequencers ...
464                         for (Map.Entry<SequencerCall, Set<RepositoryNodePath>> entry : sequencerCalls.entrySet()) {
465 
466                             final SequencerCall sequencerCall = entry.getKey();
467                             final Set<RepositoryNodePath> outputPaths = entry.getValue();
468                             final Sequencer sequencer = sequencerCall.getSequencer();
469                             final String sequencerName = sequencer.getConfiguration().getName();
470                             final String propertyName = sequencerCall.getSequencedPropertyName();
471 
472                             // Get the paths to the nodes where the sequencer should write it's output ...
473                             assert outputPaths != null && outputPaths.size() != 0;
474 
475                             // Create a new execution context for each sequencer
476                             final SimpleProblems problems = new SimpleProblems();
477                             SequencerContext sequencerContext = new SequencerContext(context, graph);
478                             try {
479                                 sequencer.execute(node, propertyName, change, outputPaths, sequencerContext, problems);
480                                 sequencerContext.getDestination().submit();
481                             } catch (SequencerException e) {
482                                 logger.error(e, RepositoryI18n.errorWhileSequencingNode, sequencerName, change);
483                             }
484                         }
485                         this.statistics.recordNodeSequenced();
486                     }
487                 }
488             }
489         } catch (Throwable e) {
490             logger.error(e, RepositoryI18n.errorFindingSequencersToRunAgainstNode, changes);
491         }
492     }
493 
494     /**
495      * The statistics for the system. Each sequencing system has an instance of this class that is updated.
496      * 
497      * @author Randall Hauch
498      */
499     @ThreadSafe
500     public class Statistics {
501 
502         private final AtomicLong numberOfNodesSequenced = new AtomicLong(0);
503         private final AtomicLong numberOfNodesSkipped = new AtomicLong(0);
504         private final AtomicLong startTime;
505 
506         protected Statistics() {
507             startTime = new AtomicLong(System.currentTimeMillis());
508         }
509 
510         public Statistics reset() {
511             this.startTime.set(System.currentTimeMillis());
512             this.numberOfNodesSequenced.set(0);
513             this.numberOfNodesSkipped.set(0);
514             return this;
515         }
516 
517         /**
518          * @return the system time when the statistics were started
519          */
520         public long getStartTime() {
521             return this.startTime.get();
522         }
523 
524         /**
525          * @return the number of nodes that were sequenced
526          */
527         public long getNumberOfNodesSequenced() {
528             return this.numberOfNodesSequenced.get();
529         }
530 
531         /**
532          * @return the number of nodes that were skipped because no sequencers applied
533          */
534         public long getNumberOfNodesSkipped() {
535             return this.numberOfNodesSkipped.get();
536         }
537 
538         protected void recordNodeSequenced() {
539             this.numberOfNodesSequenced.incrementAndGet();
540         }
541 
542         protected void recordNodeSkipped() {
543             this.numberOfNodesSkipped.incrementAndGet();
544         }
545     }
546 
547     @Immutable
548     protected class SequencerCall {
549 
550         private final Sequencer sequencer;
551         private final String sequencerName;
552         private final String sequencedPropertyName;
553         private final int hc;
554 
555         protected SequencerCall( Sequencer sequencer,
556                                  String sequencedPropertyName ) {
557             this.sequencer = sequencer;
558             this.sequencerName = sequencer.getConfiguration().getName();
559             this.sequencedPropertyName = sequencedPropertyName;
560             this.hc = HashCode.compute(this.sequencerName, this.sequencedPropertyName);
561         }
562 
563         /**
564          * @return sequencer
565          */
566         public Sequencer getSequencer() {
567             return this.sequencer;
568         }
569 
570         /**
571          * @return sequencedPropertyName
572          */
573         public String getSequencedPropertyName() {
574             return this.sequencedPropertyName;
575         }
576 
577         /**
578          * {@inheritDoc}
579          */
580         @Override
581         public int hashCode() {
582             return this.hc;
583         }
584 
585         /**
586          * {@inheritDoc}
587          */
588         @Override
589         public boolean equals( Object obj ) {
590             if (obj == this) return true;
591             if (obj instanceof SequencerCall) {
592                 SequencerCall that = (SequencerCall)obj;
593                 if (!this.sequencerName.equals(that.sequencerName)) return false;
594                 if (!this.sequencedPropertyName.equals(that.sequencedPropertyName)) return false;
595                 return true;
596             }
597             return false;
598         }
599     }
600 
601     protected class RepositoryObserver extends NetChangeObserver {
602         protected RepositoryObserver() {
603         }
604 
605         /**
606          * {@inheritDoc}
607          * 
608          * @see org.modeshape.graph.observe.NetChangeObserver#notify(org.modeshape.graph.observe.NetChangeObserver.NetChanges)
609          */
610         @Override
611         protected void notify( final NetChanges netChanges ) {
612             try {
613                 getExecutorService().execute(new Runnable() {
614 
615                     public void run() {
616                         processChange(netChanges);
617                     }
618                 });
619             } catch (RejectedExecutionException e) {
620                 // The executor service has been shut down, so do nothing with this set of changes
621             }
622         }
623     }
624 }