1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package org.modeshape.repository;
25
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.locks.ReadWriteLock;
35 import java.util.concurrent.locks.ReentrantReadWriteLock;
36 import net.jcip.annotations.ThreadSafe;
37 import org.modeshape.common.util.CheckArg;
38 import org.modeshape.graph.ExecutionContext;
39 import org.modeshape.graph.Graph;
40 import org.modeshape.graph.Subgraph;
41 import org.modeshape.graph.connector.RepositoryConnection;
42 import org.modeshape.graph.connector.RepositoryConnectionFactory;
43 import org.modeshape.graph.connector.RepositoryConnectionPool;
44 import org.modeshape.graph.connector.RepositoryContext;
45 import org.modeshape.graph.connector.RepositorySource;
46 import org.modeshape.graph.observe.Observable;
47 import org.modeshape.graph.observe.ObservationBus;
48 import org.modeshape.graph.observe.Observer;
49 import org.modeshape.graph.property.Path;
50 import org.modeshape.repository.service.AbstractServiceAdministrator;
51 import org.modeshape.repository.service.ServiceAdministrator;
52
53
54
55
56
57 @ThreadSafe
58 public class RepositoryLibrary implements RepositoryConnectionFactory, Observable {
59
60
61
62
63
64
65 protected class Administrator extends AbstractServiceAdministrator {
66
67 protected Administrator() {
68 super(RepositoryI18n.federationServiceName, State.STARTED);
69 }
70
71
72
73
74 @Override
75 protected void doStart( State fromState ) {
76 super.doStart(fromState);
77 RepositoryLibrary.this.start();
78 }
79
80
81
82
83 @Override
84 protected void doShutdown( State fromState ) {
85 super.doShutdown(fromState);
86 RepositoryLibrary.this.shutdown();
87 }
88
89
90
91
92 public boolean awaitTermination( long timeout,
93 TimeUnit unit ) throws InterruptedException {
94 return RepositoryLibrary.this.awaitTermination(timeout, unit);
95 }
96
97
98
99
100 @Override
101 protected boolean doCheckIsTerminated() {
102 return RepositoryLibrary.this.isTerminated();
103 }
104
105 }
106
107 private final ServiceAdministrator administrator = new Administrator();
108 private final ReadWriteLock sourcesLock = new ReentrantReadWriteLock();
109 private final Map<String, RepositoryConnectionPool> pools = new HashMap<String, RepositoryConnectionPool>();
110 private RepositoryConnectionFactory delegate;
111 private final ExecutionContext executionContext;
112 private final ObservationBus observationBus = new ObservationBus();
113 private final RepositorySource configurationSource;
114 private final String configurationWorkspaceName;
115 private final Path pathToConfigurationRoot;
116
117
118
119
120
121
122
123
124
125
126
127
128 public RepositoryLibrary( RepositorySource configurationSource,
129 String configurationWorkspaceName,
130 Path pathToSourcesConfigurationRoot,
131 final ExecutionContext context ) {
132 CheckArg.isNotNull(configurationSource, "configurationSource");
133 CheckArg.isNotNull(context, "context");
134 CheckArg.isNotNull(pathToSourcesConfigurationRoot, "pathToSourcesConfigurationRoot");
135 this.executionContext = context;
136 this.configurationSource = configurationSource;
137 this.configurationWorkspaceName = configurationWorkspaceName;
138 this.pathToConfigurationRoot = pathToSourcesConfigurationRoot;
139 }
140
141
142
143
144
145
146 protected Path getPathToConfigurationRoot() {
147 return pathToConfigurationRoot;
148 }
149
150
151
152
153 protected RepositorySource getConfigurationSource() {
154 return configurationSource;
155 }
156
157
158
159
160 protected String getConfigurationWorkspaceName() {
161 return configurationWorkspaceName;
162 }
163
164
165
166
167
168
169
170
171
172
173 public boolean register( Observer observer ) {
174 if (observer == null) return false;
175 return observationBus.register(observer);
176 }
177
178
179
180
181
182
183
184
185
186 public boolean unregister( Observer observer ) {
187 return observationBus.unregister(observer);
188 }
189
190
191
192
193 public ExecutionContext getExecutionContext() {
194 return executionContext;
195 }
196
197
198
199
200 public ServiceAdministrator getAdministrator() {
201 return this.administrator;
202 }
203
204
205
206
207 protected void start() {
208
209
210 }
211
212
213
214
215 protected void shutdown() {
216
217 try {
218 this.sourcesLock.readLock().lock();
219 for (RepositoryConnectionPool pool : this.pools.values()) {
220
221 pool.shutdown();
222
223 pool.getRepositorySource().close();
224 }
225 } finally {
226 this.sourcesLock.readLock().unlock();
227 }
228
229 this.observationBus.shutdown();
230 }
231
232
233
234
235
236
237
238
239
240
241 protected boolean awaitTermination( long timeout,
242 TimeUnit unit ) throws InterruptedException {
243
244 try {
245 this.sourcesLock.readLock().lock();
246 for (RepositoryConnectionPool pool : this.pools.values()) {
247 if (!pool.awaitTermination(timeout, unit)) return false;
248 }
249 return true;
250 } finally {
251 this.sourcesLock.readLock().unlock();
252 }
253 }
254
255
256
257
258
259
260
261
262
263
264
265 public boolean isTerminating() {
266 try {
267 this.sourcesLock.readLock().lock();
268 for (RepositoryConnectionPool pool : this.pools.values()) {
269 if (pool.isTerminating()) return true;
270 }
271 return false;
272 } finally {
273 this.sourcesLock.readLock().unlock();
274 }
275 }
276
277
278
279
280
281
282
283 public boolean isTerminated() {
284 try {
285 this.sourcesLock.readLock().lock();
286 for (RepositoryConnectionPool pool : this.pools.values()) {
287 if (!pool.isTerminated()) return false;
288 }
289 return true;
290 } finally {
291 this.sourcesLock.readLock().unlock();
292 }
293 }
294
295
296
297
298
299
300 public Collection<String> getSourceNames() {
301 try {
302 this.sourcesLock.readLock().lock();
303 return Collections.unmodifiableCollection(new HashSet<String>(this.pools.keySet()));
304 } finally {
305 this.sourcesLock.readLock().unlock();
306 }
307 }
308
309
310
311
312
313
314 public Collection<RepositorySource> getSources() {
315 List<RepositorySource> sources = new LinkedList<RepositorySource>();
316 try {
317 this.sourcesLock.readLock().lock();
318 for (RepositoryConnectionPool pool : this.pools.values()) {
319 sources.add(pool.getRepositorySource());
320 }
321 return Collections.unmodifiableCollection(sources);
322 } finally {
323 this.sourcesLock.readLock().unlock();
324 }
325 }
326
327
328
329
330
331
332
333 public RepositorySource getSource( String sourceName ) {
334 try {
335 this.sourcesLock.readLock().lock();
336 RepositoryConnectionPool existingPool = this.pools.get(sourceName);
337 return existingPool == null ? null : existingPool.getRepositorySource();
338 } finally {
339 this.sourcesLock.readLock().unlock();
340 }
341 }
342
343
344
345
346
347
348
349 public RepositoryConnectionPool getConnectionPool( String sourceName ) {
350 try {
351 this.sourcesLock.readLock().lock();
352 return this.pools.get(sourceName);
353 } finally {
354 this.sourcesLock.readLock().unlock();
355 }
356 }
357
358
359
360
361
362
363
364
365 public boolean addSource( RepositorySource source ) {
366 return addSource(source, false);
367 }
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383 public boolean addSource( RepositorySource source,
384 boolean replaceIfExisting ) {
385 if (source == null) return false;
386 final String sourceName = source.getName();
387 if (!replaceIfExisting) {
388
389 try {
390 this.sourcesLock.readLock().lock();
391 if (this.pools.containsKey(sourceName)) return false;
392 } finally {
393 this.sourcesLock.readLock().unlock();
394 }
395 }
396
397 final ObservationBus observationBus = this.observationBus;
398 RepositoryContext repositoryContext = new RepositoryContext() {
399
400
401
402
403
404 public ExecutionContext getExecutionContext() {
405 return RepositoryLibrary.this.getExecutionContext();
406 }
407
408
409
410
411
412
413 public RepositoryConnectionFactory getRepositoryConnectionFactory() {
414 return RepositoryLibrary.this;
415 }
416
417
418
419
420
421
422 public Observer getObserver() {
423 return observationBus.hasObservers() ? observationBus : null;
424 }
425
426
427
428
429
430
431 public Subgraph getConfiguration( int depth ) {
432 Subgraph result = null;
433 RepositorySource configSource = getConfigurationSource();
434 if (configSource != null) {
435 Graph config = Graph.create(configSource, getExecutionContext());
436 String workspaceName = getConfigurationWorkspaceName();
437 if (workspaceName != null) {
438 config.useWorkspace(workspaceName);
439 }
440 Path configPath = getPathToConfigurationRoot();
441 Path sourcePath = getExecutionContext().getValueFactories().getPathFactory().create(configPath, sourceName);
442 result = config.getSubgraphOfDepth(depth).at(sourcePath);
443 }
444 return result;
445 }
446 };
447
448 source.initialize(repositoryContext);
449 RepositoryConnectionPool pool = new RepositoryConnectionPool(source);
450 try {
451 this.sourcesLock.writeLock().lock();
452
453 RepositoryConnectionPool existingPool = this.pools.remove(sourceName);
454 if (existingPool != null) {
455
456 existingPool.shutdown();
457 }
458 this.pools.put(sourceName, pool);
459 return true;
460 } finally {
461 this.sourcesLock.writeLock().unlock();
462 }
463 }
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480 public boolean removeSource( RepositorySource source,
481 long timeToAwait,
482 TimeUnit unit ) throws InterruptedException {
483
484 return removeSource(source.getName(), timeToAwait, unit) != null;
485 }
486
487
488
489
490
491
492
493
494
495
496
497
498 public RepositorySource removeSource( String name ) {
499 try {
500 this.sourcesLock.writeLock().lock();
501 RepositoryConnectionPool existingPool = this.pools.remove(name);
502 if (existingPool != null) {
503
504 existingPool.shutdown();
505 return existingPool.getRepositorySource();
506 }
507 } finally {
508 this.sourcesLock.writeLock().unlock();
509 }
510 return null;
511 }
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527 public RepositorySource removeSource( String name,
528 long timeToAwait,
529 TimeUnit unit ) throws InterruptedException {
530 try {
531 this.sourcesLock.writeLock().lock();
532 RepositoryConnectionPool existingPool = this.pools.remove(name);
533 if (existingPool != null) {
534
535 existingPool.shutdown();
536 if (timeToAwait > 0L) existingPool.awaitTermination(timeToAwait, unit);
537 return existingPool.getRepositorySource();
538 }
539 } finally {
540 this.sourcesLock.writeLock().unlock();
541 }
542 return null;
543 }
544
545
546
547
548
549
550 public RepositoryConnection createConnection( String sourceName ) {
551 try {
552 this.sourcesLock.readLock().lock();
553 RepositoryConnectionPool existingPool = this.pools.get(sourceName);
554 if (existingPool != null) return existingPool.getConnection();
555 RepositoryConnectionFactory delegate = this.delegate;
556 if (delegate != null) {
557 return delegate.createConnection(sourceName);
558 }
559 } finally {
560 this.sourcesLock.readLock().unlock();
561 }
562 return null;
563 }
564 }