1 /*
2 * ModeShape (http://www.modeshape.org)
3 * See the COPYRIGHT.txt file distributed with this work for information
4 * regarding copyright ownership. Some portions may be licensed
5 * to Red Hat, Inc. under one or more contributor license agreements.
6 * See the AUTHORS.txt file in the distribution for a full listing of
7 * individual contributors.
8 *
9 * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
10 * is licensed to you under the terms of the GNU Lesser General Public License as
11 * published by the Free Software Foundation; either version 2.1 of
12 * the License, or (at your option) any later version.
13 *
14 * ModeShape is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Lesser General Public License for more details.
18 *
19 * You should have received a copy of the GNU Lesser General Public
20 * License along with this software; if not, write to the Free
21 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23 */
24 package org.modeshape.repository;
25
26 import java.util.Collection;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.LinkedList;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.locks.ReadWriteLock;
35 import java.util.concurrent.locks.ReentrantReadWriteLock;
36 import net.jcip.annotations.ThreadSafe;
37 import org.modeshape.common.util.CheckArg;
38 import org.modeshape.graph.ExecutionContext;
39 import org.modeshape.graph.Graph;
40 import org.modeshape.graph.Subgraph;
41 import org.modeshape.graph.connector.RepositoryConnection;
42 import org.modeshape.graph.connector.RepositoryConnectionFactory;
43 import org.modeshape.graph.connector.RepositoryConnectionPool;
44 import org.modeshape.graph.connector.RepositoryContext;
45 import org.modeshape.graph.connector.RepositorySource;
46 import org.modeshape.graph.observe.Observable;
47 import org.modeshape.graph.observe.ObservationBus;
48 import org.modeshape.graph.observe.Observer;
49 import org.modeshape.graph.property.Path;
50 import org.modeshape.repository.service.AbstractServiceAdministrator;
51 import org.modeshape.repository.service.ServiceAdministrator;
52
53 /**
54 * A library of {@link RepositorySource} instances and the {@link RepositoryConnectionPool} used to manage the connections for
55 * each.
56 */
57 @ThreadSafe
58 public class RepositoryLibrary implements RepositoryConnectionFactory, Observable {
59
60 /**
61 * The administrative component for this service.
62 *
63 * @author Randall Hauch
64 */
65 protected class Administrator extends AbstractServiceAdministrator {
66
67 protected Administrator() {
68 super(RepositoryI18n.repositoryServiceName, State.STARTED);
69 }
70
71 /**
72 * {@inheritDoc}
73 */
74 @Override
75 protected void doStart( State fromState ) {
76 super.doStart(fromState);
77 RepositoryLibrary.this.start();
78 }
79
80 /**
81 * {@inheritDoc}
82 */
83 @Override
84 protected void doShutdown( State fromState ) {
85 super.doShutdown(fromState);
86 RepositoryLibrary.this.shutdown();
87 }
88
89 /**
90 * {@inheritDoc}
91 */
92 public boolean awaitTermination( long timeout,
93 TimeUnit unit ) throws InterruptedException {
94 return RepositoryLibrary.this.awaitTermination(timeout, unit);
95 }
96
97 /**
98 * {@inheritDoc}
99 */
100 @Override
101 protected boolean doCheckIsTerminated() {
102 return RepositoryLibrary.this.isTerminated();
103 }
104
105 }
106
107 private final ServiceAdministrator administrator = new Administrator();
108 private final ReadWriteLock sourcesLock = new ReentrantReadWriteLock();
109 private final Map<String, RepositoryConnectionPool> pools = new HashMap<String, RepositoryConnectionPool>();
110 private RepositoryConnectionFactory delegate;
111 private final ExecutionContext executionContext;
112 private final ObservationBus observationBus;
113 private final RepositorySource configurationSource;
114 private final String configurationWorkspaceName;
115 private final Path pathToConfigurationRoot;
116
117 /**
118 * Create a new manager instance.
119 *
120 * @param configurationSource the {@link RepositorySource} that is the configuration repository
121 * @param configurationWorkspaceName the name of the workspace in the {@link RepositorySource} that is the configuration
122 * repository, or null if the default workspace of the source should be used (if there is one)
123 * @param pathToSourcesConfigurationRoot the path of the node in the configuration source repository that should be treated by
124 * this service as the root of the service's configuration
125 * @param context the execution context in which this service should run
126 * @param observationBus the {@link ObservationBus} instance that should be used for changes in the sources
127 * @throws IllegalArgumentException if any of the <code>configurationSource</code>,
128 * <code>pathToSourcesConfigurationRoot</code>, <code>observationBus</code>, or <code>context</code> references are
129 * null
130 */
131 public RepositoryLibrary( RepositorySource configurationSource,
132 String configurationWorkspaceName,
133 Path pathToSourcesConfigurationRoot,
134 final ExecutionContext context,
135 ObservationBus observationBus ) {
136 CheckArg.isNotNull(configurationSource, "configurationSource");
137 CheckArg.isNotNull(context, "context");
138 CheckArg.isNotNull(pathToSourcesConfigurationRoot, "pathToSourcesConfigurationRoot");
139 CheckArg.isNotNull(observationBus, "observationBus");
140 this.executionContext = context;
141 this.configurationSource = configurationSource;
142 this.configurationWorkspaceName = configurationWorkspaceName;
143 this.pathToConfigurationRoot = pathToSourcesConfigurationRoot;
144 this.observationBus = observationBus;
145 }
146
147 /**
148 * Get the path to the top-level of the configuration root.
149 *
150 * @return pathToConfigurationRoot
151 */
152 protected Path getPathToConfigurationRoot() {
153 return pathToConfigurationRoot;
154 }
155
156 /**
157 * @return configurationSource
158 */
159 protected RepositorySource getConfigurationSource() {
160 return configurationSource;
161 }
162
163 /**
164 * @return configurationWorkspaceName
165 */
166 protected String getConfigurationWorkspaceName() {
167 return configurationWorkspaceName;
168 }
169
170 /**
171 * {@inheritDoc}
172 * <p>
173 * This can be used to register observers for all of the repository sources managed by this library. The supplied observer
174 * will receive all of the changes originating from these sources.
175 * </p>
176 *
177 * @see org.modeshape.graph.observe.Observable#register(org.modeshape.graph.observe.Observer)
178 */
179 public boolean register( Observer observer ) {
180 if (observer == null) return false;
181 return observationBus.register(observer);
182 }
183
184 /**
185 * {@inheritDoc}
186 * <p>
187 * This can be used to unregister observers for all of the repository sources managed by this library.
188 * </p>
189 *
190 * @see org.modeshape.graph.observe.Observable#unregister(org.modeshape.graph.observe.Observer)
191 */
192 public boolean unregister( Observer observer ) {
193 return observationBus.unregister(observer);
194 }
195
196 /**
197 * @return executionContextFactory
198 */
199 public ExecutionContext getExecutionContext() {
200 return executionContext;
201 }
202
203 /**
204 * @return administrator
205 */
206 public ServiceAdministrator getAdministrator() {
207 return this.administrator;
208 }
209
210 /**
211 * Utility method called by the administrator.
212 */
213 protected void start() {
214 // Do not establish connections to the pools; these will be established as needed
215
216 }
217
218 /**
219 * Utility method called by the administrator.
220 */
221 protected void shutdown() {
222 // Close all connections to the pools. This is done inside the pools write lock.
223 try {
224 this.sourcesLock.readLock().lock();
225 for (RepositoryConnectionPool pool : this.pools.values()) {
226 // Shutdown the pool of connections ...
227 pool.shutdown();
228 // Now close the source (still allows in-use connections to be used) ...
229 pool.getRepositorySource().close();
230 }
231 } finally {
232 this.sourcesLock.readLock().unlock();
233 }
234 }
235
236 /**
237 * Utility method called by the administrator.
238 *
239 * @param timeout
240 * @param unit
241 * @return true if all pools were terminated in the supplied time (or were already terminated), or false if the timeout
242 * occurred before all the connections were closed
243 * @throws InterruptedException
244 */
245 protected boolean awaitTermination( long timeout,
246 TimeUnit unit ) throws InterruptedException {
247 // Check whether all source pools are shut down. This is done inside the pools write lock.
248 try {
249 this.sourcesLock.readLock().lock();
250 for (RepositoryConnectionPool pool : this.pools.values()) {
251 if (!pool.awaitTermination(timeout, unit)) return false;
252 }
253 return true;
254 } finally {
255 this.sourcesLock.readLock().unlock();
256 }
257 }
258
259 /**
260 * Returns true if this library is in the process of terminating after {@link ServiceAdministrator#shutdown()} has been called
261 * on the {@link #getAdministrator() administrator}, but the library has connections that have not yet normally been
262 * {@link RepositoryConnection#close() closed}. This method may be useful for debugging. A return of <tt>true</tt> reported a
263 * sufficient period after shutdown may indicate that connection users have ignored or suppressed interruption, causing this
264 * repository not to properly terminate.
265 *
266 * @return true if terminating but not yet terminated, or false otherwise
267 * @see #isTerminated()
268 */
269 public boolean isTerminating() {
270 try {
271 this.sourcesLock.readLock().lock();
272 for (RepositoryConnectionPool pool : this.pools.values()) {
273 if (pool.isTerminating()) return true;
274 }
275 return false;
276 } finally {
277 this.sourcesLock.readLock().unlock();
278 }
279 }
280
281 /**
282 * Return true if this library has completed its termination and no longer has any open connections.
283 *
284 * @return true if terminated, or false otherwise
285 * @see #isTerminating()
286 */
287 public boolean isTerminated() {
288 try {
289 this.sourcesLock.readLock().lock();
290 for (RepositoryConnectionPool pool : this.pools.values()) {
291 if (!pool.isTerminated()) return false;
292 }
293 return true;
294 } finally {
295 this.sourcesLock.readLock().unlock();
296 }
297 }
298
299 /**
300 * Get an unmodifiable collection of {@link RepositorySource} names.
301 *
302 * @return the pools
303 */
304 public Collection<String> getSourceNames() {
305 try {
306 this.sourcesLock.readLock().lock();
307 return Collections.unmodifiableCollection(new HashSet<String>(this.pools.keySet()));
308 } finally {
309 this.sourcesLock.readLock().unlock();
310 }
311 }
312
313 /**
314 * Get an unmodifiable collection of {@link RepositorySource} instances managed by this instance.
315 *
316 * @return the pools
317 */
318 public Collection<RepositorySource> getSources() {
319 List<RepositorySource> sources = new LinkedList<RepositorySource>();
320 try {
321 this.sourcesLock.readLock().lock();
322 for (RepositoryConnectionPool pool : this.pools.values()) {
323 sources.add(pool.getRepositorySource());
324 }
325 return Collections.unmodifiableCollection(sources);
326 } finally {
327 this.sourcesLock.readLock().unlock();
328 }
329 }
330
331 /**
332 * Get the RepositorySource with the specified name managed by this instance.
333 *
334 * @param sourceName the name of the source
335 * @return the source, or null if no such source exists in this instance
336 */
337 public RepositorySource getSource( String sourceName ) {
338 try {
339 this.sourcesLock.readLock().lock();
340 RepositoryConnectionPool existingPool = this.pools.get(sourceName);
341 return existingPool == null ? null : existingPool.getRepositorySource();
342 } finally {
343 this.sourcesLock.readLock().unlock();
344 }
345 }
346
347 /**
348 * Get the connection pool managing the {@link RepositorySource} with the specified name managed by this instance.
349 *
350 * @param sourceName the name of the source
351 * @return the pool, or null if no such pool exists in this instance
352 */
353 public RepositoryConnectionPool getConnectionPool( String sourceName ) {
354 try {
355 this.sourcesLock.readLock().lock();
356 return this.pools.get(sourceName);
357 } finally {
358 this.sourcesLock.readLock().unlock();
359 }
360 }
361
362 /**
363 * Add the supplied source. This method returns false if the source is null.
364 *
365 * @param source the source to add
366 * @return true if the source is added, or false if the reference is null or if there is already an existing source with the
367 * supplied name.
368 */
369 public boolean addSource( RepositorySource source ) {
370 return addSource(source, false);
371 }
372
373 /**
374 * Add the supplied source. This method returns false if the source is null.
375 * <p>
376 * If a source with the same name already exists, it will be replaced only if <code>replaceIfExisting</code> is true. If this
377 * is the case, then the existing source will be removed from the connection pool, and that pool will be
378 * {@link RepositoryConnectionPool#shutdown() shutdown} (allowing any in-use connections to be used and finished normally).
379 * </p>
380 *
381 * @param source the source to add
382 * @param replaceIfExisting true if an existing source should be replaced, or false if this method should return false if
383 * there is already an existing source with the supplied name.
384 * @return true if the source is added, or false if the reference is null or if there is already an existing source with the
385 * supplied name.
386 */
387 public boolean addSource( RepositorySource source,
388 boolean replaceIfExisting ) {
389 if (source == null) return false;
390 final String sourceName = source.getName();
391 if (!replaceIfExisting) {
392 // Don't want to replace existing, so make sure there isn't one already ...
393 try {
394 this.sourcesLock.readLock().lock();
395 if (this.pools.containsKey(sourceName)) return false;
396 } finally {
397 this.sourcesLock.readLock().unlock();
398 }
399 }
400 // Create a repository context for this source ...
401 final ObservationBus observationBus = this.observationBus;
402 RepositoryContext repositoryContext = new RepositoryContext() {
403 /**
404 * {@inheritDoc}
405 *
406 * @see org.modeshape.graph.connector.RepositoryContext#getExecutionContext()
407 */
408 public ExecutionContext getExecutionContext() {
409 return RepositoryLibrary.this.getExecutionContext();
410 }
411
412 /**
413 * {@inheritDoc}
414 *
415 * @see org.modeshape.graph.connector.RepositoryContext#getRepositoryConnectionFactory()
416 */
417 public RepositoryConnectionFactory getRepositoryConnectionFactory() {
418 return RepositoryLibrary.this;
419 }
420
421 /**
422 * {@inheritDoc}
423 *
424 * @see org.modeshape.graph.connector.RepositoryContext#getObserver()
425 */
426 public Observer getObserver() {
427 return observationBus.hasObservers() ? observationBus : null;
428 }
429
430 /**
431 * {@inheritDoc}
432 *
433 * @see org.modeshape.graph.connector.RepositoryContext#getConfiguration(int)
434 */
435 public Subgraph getConfiguration( int depth ) {
436 Subgraph result = null;
437 RepositorySource configSource = getConfigurationSource();
438 if (configSource != null) {
439 Graph config = Graph.create(configSource, getExecutionContext());
440 String workspaceName = getConfigurationWorkspaceName();
441 if (workspaceName != null) {
442 config.useWorkspace(workspaceName);
443 }
444 Path configPath = getPathToConfigurationRoot();
445 Path sourcePath = getExecutionContext().getValueFactories().getPathFactory().create(configPath, sourceName);
446 result = config.getSubgraphOfDepth(depth).at(sourcePath);
447 }
448 return result;
449 }
450 };
451 // Do this before we remove the existing pool ...
452 source.initialize(repositoryContext);
453 RepositoryConnectionPool pool = new RepositoryConnectionPool(source);
454 try {
455 this.sourcesLock.writeLock().lock();
456 // Need to first remove any existing one ...
457 RepositoryConnectionPool existingPool = this.pools.remove(sourceName);
458 if (existingPool != null) {
459 // Then shut down the source gracefully (and don't wait) ...
460 existingPool.shutdown();
461 }
462 this.pools.put(sourceName, pool);
463 return true;
464 } finally {
465 this.sourcesLock.writeLock().unlock();
466 }
467 }
468
469 /**
470 * Remove from this library the supplied source (or a source with the same name as that supplied). This call shuts down the
471 * connections in the source in an orderly fashion, allowing those connection currently in use to be used and closed normally,
472 * but preventing further connections from being used.
473 * <p>
474 * This method can safely be called while the federation repository is in use.
475 * </p>
476 *
477 * @param source the source to be removed
478 * @param timeToAwait the amount of time to wait while all of the source's connections are closed, or non-positive if the call
479 * should not wait at all
480 * @param unit the time unit to be used for <code>timeToAwait</code>
481 * @return true if the source was removed, or false if the source was not a source for this repository.
482 * @throws InterruptedException if the thread is interrupted while awaiting closing of the connections
483 */
484 public boolean removeSource( RepositorySource source,
485 long timeToAwait,
486 TimeUnit unit ) throws InterruptedException {
487 // Use the name; don't use the object equality ...
488 return removeSource(source.getName(), timeToAwait, unit) != null;
489 }
490
491 /**
492 * Remove from this library the source with the supplied name. This call shuts down the connections in the source in an
493 * orderly fashion, allowing those connection currently in use to be used and closed normally, but preventing further
494 * connections from being used. However, this method never waits until the connections are all closed, and is equivalent to
495 * calling <code>removeSource(name,0,TimeUnit.SECONDS)</code>.
496 *
497 * @param name the name of the source to be removed
498 * @return the source with the supplied name that was removed, or null if no existing source matching the supplied name could
499 * be found
500 * @see #removeSource(String, long, TimeUnit)
501 */
502 public RepositorySource removeSource( String name ) {
503 try {
504 this.sourcesLock.writeLock().lock();
505 RepositoryConnectionPool existingPool = this.pools.remove(name);
506 if (existingPool != null) {
507 // Then shut down the source gracefully (and don't wait) ...
508 existingPool.shutdown();
509 return existingPool.getRepositorySource();
510 }
511 } finally {
512 this.sourcesLock.writeLock().unlock();
513 }
514 return null;
515 }
516
517 /**
518 * Remove from this library the source with the supplied name. This call shuts down the connections in the source in an
519 * orderly fashion, allowing those connection currently in use to be used and closed normally, but preventing further
520 * connections from being used.
521 *
522 * @param name the name of the source to be removed
523 * @param timeToAwait the amount of time to wait while all of the source's connections are closed, or non-positive if the call
524 * should not wait at all
525 * @param unit the time unit to be used for <code>timeToAwait</code>
526 * @return the source with the supplied name that was removed, or null if no existing source matching the supplied name could
527 * be found
528 * @throws InterruptedException if the thread is interrupted while awaiting closing of the connections
529 * @see #removeSource(String)
530 */
531 public RepositorySource removeSource( String name,
532 long timeToAwait,
533 TimeUnit unit ) throws InterruptedException {
534 try {
535 this.sourcesLock.writeLock().lock();
536 RepositoryConnectionPool existingPool = this.pools.remove(name);
537 if (existingPool != null) {
538 // Then shut down the source gracefully (and don't wait) ...
539 existingPool.shutdown();
540 if (timeToAwait > 0L) existingPool.awaitTermination(timeToAwait, unit);
541 return existingPool.getRepositorySource();
542 }
543 } finally {
544 this.sourcesLock.writeLock().unlock();
545 }
546 return null;
547 }
548
549 /**
550 * {@inheritDoc}
551 *
552 * @see org.modeshape.graph.connector.RepositoryConnectionFactory#createConnection(java.lang.String)
553 */
554 public RepositoryConnection createConnection( String sourceName ) {
555 try {
556 this.sourcesLock.readLock().lock();
557 RepositoryConnectionPool existingPool = this.pools.get(sourceName);
558 if (existingPool != null) return existingPool.getConnection();
559 RepositoryConnectionFactory delegate = this.delegate;
560 if (delegate != null) {
561 return delegate.createConnection(sourceName);
562 }
563 } finally {
564 this.sourcesLock.readLock().unlock();
565 }
566 return null;
567 }
568 }