1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package org.modeshape.repository.sequencer;
25
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.RejectedExecutionException;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicLong;
37 import net.jcip.annotations.Immutable;
38 import net.jcip.annotations.ThreadSafe;
39 import org.modeshape.common.collection.SimpleProblems;
40 import org.modeshape.common.component.ClassLoaderFactory;
41 import org.modeshape.common.component.ComponentLibrary;
42 import org.modeshape.common.component.StandardClassLoaderFactory;
43 import org.modeshape.common.util.CheckArg;
44 import org.modeshape.common.util.HashCode;
45 import org.modeshape.common.util.Logger;
46 import org.modeshape.common.util.NamedThreadFactory;
47 import org.modeshape.graph.ExecutionContext;
48 import org.modeshape.graph.Graph;
49 import org.modeshape.graph.Node;
50 import org.modeshape.graph.connector.RepositorySource;
51 import org.modeshape.graph.observe.ChangeObserver;
52 import org.modeshape.graph.observe.NetChangeObserver;
53 import org.modeshape.graph.observe.NetChangeObserver.ChangeType;
54 import org.modeshape.graph.observe.NetChangeObserver.NetChange;
55 import org.modeshape.graph.observe.NetChangeObserver.NetChanges;
56 import org.modeshape.graph.property.Name;
57 import org.modeshape.graph.property.Path;
58 import org.modeshape.graph.property.Property;
59 import org.modeshape.repository.RepositoryI18n;
60 import org.modeshape.repository.RepositoryLibrary;
61 import org.modeshape.repository.service.AbstractServiceAdministrator;
62 import org.modeshape.repository.service.AdministeredService;
63 import org.modeshape.repository.service.ServiceAdministrator;
64 import org.modeshape.repository.util.RepositoryNodePath;
65
66
67
68
69
70 public class SequencingService implements AdministeredService {
71
72
73
74
75
76
77 public static interface Selector {
78
79
80
81
82
83
84
85
86
87 List<Sequencer> selectSequencers( List<Sequencer> sequencers,
88 Node node,
89 NetChange nodeChange );
90 }
91
92
93
94
95
96
97
98 protected static class DefaultSelector implements Selector {
99
100 public List<Sequencer> selectSequencers( List<Sequencer> sequencers,
101 Node node,
102 NetChange nodeChange ) {
103 return sequencers;
104 }
105 }
106
107
108
109
110
111
112 public static final Selector DEFAULT_SEQUENCER_SELECTOR = new DefaultSelector();
113
114
115
116
117
118 protected static final ClassLoaderFactory DEFAULT_CLASSLOADER_FACTORY = new StandardClassLoaderFactory(
119 SequencingService.class.getClassLoader());
120
121
122
123
124
125
126 protected class Administrator extends AbstractServiceAdministrator {
127
128 protected Administrator() {
129 super(RepositoryI18n.sequencingServiceName, State.PAUSED);
130 }
131
132
133
134
135 @Override
136 protected void doStart( State fromState ) {
137 super.doStart(fromState);
138 startService();
139 }
140
141
142
143
144 @Override
145 protected void doShutdown( State fromState ) {
146 super.doShutdown(fromState);
147 shutdownService();
148 }
149
150
151
152
153 @Override
154 protected boolean doCheckIsTerminated() {
155 return isServiceTerminated();
156 }
157
158
159
160
161 public boolean awaitTermination( long timeout,
162 TimeUnit unit ) throws InterruptedException {
163 return doAwaitTermination(timeout, unit);
164 }
165
166 }
167
168 private ExecutionContext executionContext;
169 private SequencerLibrary sequencerLibrary = new SequencerLibrary();
170 private Selector sequencerSelector = DEFAULT_SEQUENCER_SELECTOR;
171 private ExecutorService executorService;
172 private RepositoryLibrary repositoryLibrary;
173 private ChangeObserver repositoryObserver;
174 private final Statistics statistics = new Statistics();
175 private final Administrator administrator = new Administrator();
176
177
178
179
180
181
182 public SequencingService() {
183 this.sequencerLibrary.setClassLoaderFactory(DEFAULT_CLASSLOADER_FACTORY);
184 }
185
186
187
188
189
190
191 public ServiceAdministrator getAdministrator() {
192 return this.administrator;
193 }
194
195
196
197
198
199
200 public Statistics getStatistics() {
201 return this.statistics;
202 }
203
204
205
206
207 protected ComponentLibrary<Sequencer, SequencerConfig> getSequencerLibrary() {
208 return this.sequencerLibrary;
209 }
210
211
212
213
214
215
216
217
218
219
220
221
222 public boolean addSequencer( SequencerConfig config ) {
223 return this.sequencerLibrary.add(config);
224 }
225
226
227
228
229
230
231
232
233
234
235
236
237 public boolean updateSequencer( SequencerConfig config ) {
238 return this.sequencerLibrary.update(config);
239 }
240
241
242
243
244
245
246
247
248
249
250 public boolean removeSequencer( SequencerConfig config ) {
251 return this.sequencerLibrary.remove(config);
252 }
253
254
255
256
257 public ExecutionContext getExecutionContext() {
258 return this.executionContext;
259 }
260
261
262
263
264 public void setExecutionContext( ExecutionContext executionContext ) {
265 CheckArg.isNotNull(executionContext, "execution context");
266 if (this.getAdministrator().isStarted()) {
267 throw new IllegalStateException(RepositoryI18n.unableToChangeExecutionContextWhileRunning.text());
268 }
269 this.executionContext = executionContext;
270 this.sequencerLibrary.setClassLoaderFactory(executionContext);
271 }
272
273
274
275
276
277
278 public RepositoryLibrary getRepositoryLibrary() {
279 return this.repositoryLibrary;
280 }
281
282 public void setRepositoryLibrary( RepositoryLibrary repositoryLibrary ) {
283 this.repositoryLibrary = repositoryLibrary;
284 }
285
286
287
288
289
290
291
292 public ExecutorService getExecutorService() {
293 return this.executorService;
294 }
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313 public void setExecutorService( ExecutorService executorService ) {
314 CheckArg.isNotNull(executorService, "executor service");
315 if (this.getAdministrator().isStarted()) {
316 throw new IllegalStateException(RepositoryI18n.unableToChangeExecutionContextWhileRunning.text());
317 }
318 this.executorService = executorService;
319 }
320
321
322
323
324
325
326
327
328
329
330 protected ExecutorService createDefaultExecutorService() {
331 return Executors.newSingleThreadExecutor(new NamedThreadFactory("sequencing"));
332 }
333
334 protected void startService() {
335 if (this.getExecutionContext() == null) {
336 throw new IllegalStateException(RepositoryI18n.unableToStartSequencingServiceWithoutExecutionContext.text());
337 }
338 if (this.executorService == null) {
339 this.executorService = createDefaultExecutorService();
340 }
341 assert this.executorService != null;
342 assert this.sequencerSelector != null;
343 assert this.sequencerLibrary != null;
344 assert this.repositoryLibrary != null;
345 this.repositoryObserver = new RepositoryObserver();
346
347 this.repositoryLibrary.register(this.repositoryObserver);
348 }
349
350 protected void shutdownService() {
351
352 if (this.repositoryObserver != null) this.repositoryObserver.unregister();
353
354 if (this.executorService != null) {
355 this.executorService.shutdown();
356 }
357 }
358
359 protected boolean isServiceTerminated() {
360 if (this.executorService != null) {
361 return this.executorService.isTerminated();
362 }
363 return true;
364 }
365
366 protected boolean doAwaitTermination( long timeout,
367 TimeUnit unit ) throws InterruptedException {
368 if (this.executorService == null || this.executorService.isTerminated()) return true;
369 return this.executorService.awaitTermination(timeout, unit);
370 }
371
372
373
374
375
376
377 public Selector getSequencerSelector() {
378 return this.sequencerSelector;
379 }
380
381
382
383
384
385
386 public void setSequencerSelector( Selector sequencerSelector ) {
387 this.sequencerSelector = sequencerSelector != null ? sequencerSelector : DEFAULT_SEQUENCER_SELECTOR;
388 }
389
390
391
392
393
394
395
396 protected void processChange( NetChanges changes ) {
397 final ExecutionContext context = this.getExecutionContext();
398 final Logger logger = context.getLogger(getClass());
399 assert logger != null;
400
401 try {
402 List<Sequencer> allSequencers = null;
403 final String repositorySourceName = changes.getSourceName();
404 for (NetChange change : changes.getNetChanges()) {
405
406 if (change.includes(ChangeType.NODE_ADDED, ChangeType.PROPERTY_ADDED, ChangeType.PROPERTY_CHANGED)) {
407 final String repositoryWorkspaceName = change.getRepositoryWorkspaceName();
408
409
410
411 final Path nodePath = change.getPath();
412 final String nodePathStr = context.getValueFactories().getStringFactory().create(nodePath);
413 Map<SequencerCall, Set<RepositoryNodePath>> sequencerCalls = new HashMap<SequencerCall, Set<RepositoryNodePath>>();
414 if (allSequencers == null) {
415 allSequencers = this.sequencerLibrary.getInstances();
416 }
417 List<Sequencer> sequencers = new LinkedList<Sequencer>();
418 for (Sequencer sequencer : allSequencers) {
419 final SequencerConfig config = sequencer.getConfiguration();
420 for (SequencerPathExpression pathExpression : config.getPathExpressions()) {
421 for (Property property : change.getAddedOrModifiedProperties()) {
422 Name propertyName = property.getName();
423 String propertyNameStr = context.getValueFactories().getStringFactory().create(propertyName);
424 String path = nodePathStr + "/@" + propertyNameStr;
425 SequencerPathExpression.Matcher matcher = pathExpression.matcher(path);
426 if (matcher.matches()) {
427
428 RepositoryNodePath outputPath = RepositoryNodePath.parse(matcher.getOutputPath(),
429 repositorySourceName,
430 repositoryWorkspaceName);
431 SequencerCall call = new SequencerCall(sequencer, propertyNameStr);
432
433 Set<RepositoryNodePath> outputPaths = sequencerCalls.get(call);
434 if (outputPaths == null) {
435 outputPaths = new HashSet<RepositoryNodePath>();
436 sequencerCalls.put(call, outputPaths);
437 }
438 outputPaths.add(outputPath);
439 sequencers.add(sequencer);
440 break;
441 }
442 }
443 }
444 }
445
446 RepositorySource source = repositoryLibrary.getSource(repositorySourceName);
447 Graph graph = Graph.create(source, context);
448 Node node = null;
449 if (!sequencers.isEmpty()) {
450
451
452 node = graph.getNodeAt(nodePath);
453
454
455 sequencers = this.sequencerSelector.selectSequencers(sequencers, node, change);
456 }
457 if (sequencers.isEmpty()) {
458 this.statistics.recordNodeSkipped();
459 if (logger.isDebugEnabled()) {
460 logger.trace("Skipping '{0}': no sequencers matched this condition", change);
461 }
462 } else {
463
464 for (Map.Entry<SequencerCall, Set<RepositoryNodePath>> entry : sequencerCalls.entrySet()) {
465
466 final SequencerCall sequencerCall = entry.getKey();
467 final Set<RepositoryNodePath> outputPaths = entry.getValue();
468 final Sequencer sequencer = sequencerCall.getSequencer();
469 final String sequencerName = sequencer.getConfiguration().getName();
470 final String propertyName = sequencerCall.getSequencedPropertyName();
471
472
473 assert outputPaths != null && outputPaths.size() != 0;
474
475
476 final SimpleProblems problems = new SimpleProblems();
477 SequencerContext sequencerContext = new SequencerContext(context, graph);
478 try {
479 sequencer.execute(node, propertyName, change, outputPaths, sequencerContext, problems);
480 sequencerContext.getDestination().submit();
481 } catch (SequencerException e) {
482 logger.error(e, RepositoryI18n.errorWhileSequencingNode, sequencerName, change);
483 }
484 }
485 this.statistics.recordNodeSequenced();
486 }
487 }
488 }
489 } catch (Throwable e) {
490 logger.error(e, RepositoryI18n.errorFindingSequencersToRunAgainstNode, changes);
491 }
492 }
493
494
495
496
497
498
499 @ThreadSafe
500 public class Statistics {
501
502 private final AtomicLong numberOfNodesSequenced = new AtomicLong(0);
503 private final AtomicLong numberOfNodesSkipped = new AtomicLong(0);
504 private final AtomicLong startTime;
505
506 protected Statistics() {
507 startTime = new AtomicLong(System.currentTimeMillis());
508 }
509
510 public Statistics reset() {
511 this.startTime.set(System.currentTimeMillis());
512 this.numberOfNodesSequenced.set(0);
513 this.numberOfNodesSkipped.set(0);
514 return this;
515 }
516
517
518
519
520 public long getStartTime() {
521 return this.startTime.get();
522 }
523
524
525
526
527 public long getNumberOfNodesSequenced() {
528 return this.numberOfNodesSequenced.get();
529 }
530
531
532
533
534 public long getNumberOfNodesSkipped() {
535 return this.numberOfNodesSkipped.get();
536 }
537
538 protected void recordNodeSequenced() {
539 this.numberOfNodesSequenced.incrementAndGet();
540 }
541
542 protected void recordNodeSkipped() {
543 this.numberOfNodesSkipped.incrementAndGet();
544 }
545 }
546
547 @Immutable
548 protected class SequencerCall {
549
550 private final Sequencer sequencer;
551 private final String sequencerName;
552 private final String sequencedPropertyName;
553 private final int hc;
554
555 protected SequencerCall( Sequencer sequencer,
556 String sequencedPropertyName ) {
557 this.sequencer = sequencer;
558 this.sequencerName = sequencer.getConfiguration().getName();
559 this.sequencedPropertyName = sequencedPropertyName;
560 this.hc = HashCode.compute(this.sequencerName, this.sequencedPropertyName);
561 }
562
563
564
565
566 public Sequencer getSequencer() {
567 return this.sequencer;
568 }
569
570
571
572
573 public String getSequencedPropertyName() {
574 return this.sequencedPropertyName;
575 }
576
577
578
579
580 @Override
581 public int hashCode() {
582 return this.hc;
583 }
584
585
586
587
588 @Override
589 public boolean equals( Object obj ) {
590 if (obj == this) return true;
591 if (obj instanceof SequencerCall) {
592 SequencerCall that = (SequencerCall)obj;
593 if (!this.sequencerName.equals(that.sequencerName)) return false;
594 if (!this.sequencedPropertyName.equals(that.sequencedPropertyName)) return false;
595 return true;
596 }
597 return false;
598 }
599 }
600
601 protected class RepositoryObserver extends NetChangeObserver {
602 protected RepositoryObserver() {
603 }
604
605
606
607
608
609
610 @Override
611 protected void notify( final NetChanges netChanges ) {
612 try {
613 getExecutorService().execute(new Runnable() {
614
615 public void run() {
616 processChange(netChanges);
617 }
618 });
619 } catch (RejectedExecutionException e) {
620
621 }
622 }
623 }
624 }