001 /*
002 * JBoss DNA (http://www.jboss.org/dna)
003 * See the COPYRIGHT.txt file distributed with this work for information
004 * regarding copyright ownership. Some portions may be licensed
005 * to Red Hat, Inc. under one or more contributor license agreements.
006 * See the AUTHORS.txt file in the distribution for a full listing of
007 * individual contributors.
008 *
009 * JBoss DNA is free software. Unless otherwise indicated, all code in JBoss DNA
010 * is licensed to you under the terms of the GNU Lesser General Public License as
011 * published by the Free Software Foundation; either version 2.1 of
012 * the License, or (at your option) any later version.
013 *
014 * JBoss DNA is distributed in the hope that it will be useful,
015 * but WITHOUT ANY WARRANTY; without even the implied warranty of
016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
017 * Lesser General Public License for more details.
018 *
019 * You should have received a copy of the GNU Lesser General Public
020 * License along with this software; if not, write to the Free
021 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
022 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
023 */
024 package org.jboss.dna.repository;
025
026 import java.util.Collection;
027 import java.util.Collections;
028 import java.util.HashSet;
029 import java.util.LinkedList;
030 import java.util.List;
031 import java.util.Set;
032 import java.util.concurrent.CopyOnWriteArrayList;
033 import java.util.concurrent.TimeUnit;
034 import java.util.concurrent.locks.ReadWriteLock;
035 import java.util.concurrent.locks.ReentrantReadWriteLock;
036 import net.jcip.annotations.ThreadSafe;
037 import org.jboss.dna.common.util.CheckArg;
038 import org.jboss.dna.graph.ExecutionContext;
039 import org.jboss.dna.graph.Graph;
040 import org.jboss.dna.graph.Subgraph;
041 import org.jboss.dna.graph.connector.RepositoryConnection;
042 import org.jboss.dna.graph.connector.RepositoryConnectionFactory;
043 import org.jboss.dna.graph.connector.RepositoryConnectionPool;
044 import org.jboss.dna.graph.connector.RepositoryContext;
045 import org.jboss.dna.graph.connector.RepositorySource;
046 import org.jboss.dna.graph.observe.ChangeObserver;
047 import org.jboss.dna.graph.observe.ChangeObservers;
048 import org.jboss.dna.graph.observe.Changes;
049 import org.jboss.dna.graph.observe.Observable;
050 import org.jboss.dna.graph.observe.Observer;
051 import org.jboss.dna.graph.property.Path;
052 import org.jboss.dna.repository.service.AbstractServiceAdministrator;
053 import org.jboss.dna.repository.service.ServiceAdministrator;
054
055 /**
056 * A library of {@link RepositorySource} instances and the {@link RepositoryConnectionPool} used to manage the connections for
057 * each.
058 *
059 * @author Randall Hauch
060 */
061 @ThreadSafe
062 public class RepositoryLibrary implements RepositoryConnectionFactory, Observable {
063
064 /**
065 * The administrative component for this service.
066 *
067 * @author Randall Hauch
068 */
069 protected class Administrator extends AbstractServiceAdministrator {
070
071 protected Administrator() {
072 super(RepositoryI18n.federationServiceName, State.STARTED);
073 }
074
075 /**
076 * {@inheritDoc}
077 */
078 @Override
079 protected void doStart( State fromState ) {
080 super.doStart(fromState);
081 RepositoryLibrary.this.start();
082 }
083
084 /**
085 * {@inheritDoc}
086 */
087 @Override
088 protected void doShutdown( State fromState ) {
089 super.doShutdown(fromState);
090 RepositoryLibrary.this.shutdown();
091 }
092
093 /**
094 * {@inheritDoc}
095 */
096 public boolean awaitTermination( long timeout,
097 TimeUnit unit ) throws InterruptedException {
098 return RepositoryLibrary.this.awaitTermination(timeout, unit);
099 }
100
101 /**
102 * {@inheritDoc}
103 */
104 @Override
105 protected boolean doCheckIsTerminated() {
106 return RepositoryLibrary.this.isTerminated();
107 }
108
109 }
110
111 private final ServiceAdministrator administrator = new Administrator();
112 private final ReadWriteLock sourcesLock = new ReentrantReadWriteLock();
113 private final CopyOnWriteArrayList<RepositoryConnectionPool> pools = new CopyOnWriteArrayList<RepositoryConnectionPool>();
114 private RepositoryConnectionFactory delegate;
115 private final ExecutionContext executionContext;
116 private final ObservationBus observationBus = new InMemoryObservationBus();
117 private final RepositorySource configurationSource;
118 private final String configurationWorkspaceName;
119 private final Path pathToConfigurationRoot;
120
121 /**
122 * Create a new manager instance.
123 *
124 * @param configurationSource the {@link RepositorySource} that is the configuration repository
125 * @param configurationWorkspaceName the name of the workspace in the {@link RepositorySource} that is the configuration
126 * repository, or null if the default workspace of the source should be used (if there is one)
127 * @param pathToSourcesConfigurationRoot the path of the node in the configuration source repository that should be treated by
128 * this service as the root of the service's configuration; if null, then "/dna:system" is used
129 * @param context the execution context in which this service should run
130 * @throws IllegalArgumentException if the <code>executionContextFactory</code> reference is null
131 */
132 public RepositoryLibrary( RepositorySource configurationSource,
133 String configurationWorkspaceName,
134 Path pathToSourcesConfigurationRoot,
135 final ExecutionContext context ) {
136 CheckArg.isNotNull(configurationSource, "configurationSource");
137 CheckArg.isNotNull(context, "context");
138 CheckArg.isNotNull(pathToSourcesConfigurationRoot, "pathToSourcesConfigurationRoot");
139 this.executionContext = context;
140 this.configurationSource = configurationSource;
141 this.configurationWorkspaceName = configurationWorkspaceName;
142 this.pathToConfigurationRoot = pathToSourcesConfigurationRoot;
143 }
144
145 /**
146 * Get the path to the top-level of the configuration root.
147 *
148 * @return pathToConfigurationRoot
149 */
150 protected Path getPathToConfigurationRoot() {
151 return pathToConfigurationRoot;
152 }
153
154 /**
155 * @return configurationSource
156 */
157 protected RepositorySource getConfigurationSource() {
158 return configurationSource;
159 }
160
161 /**
162 * @return configurationWorkspaceName
163 */
164 protected String getConfigurationWorkspaceName() {
165 return configurationWorkspaceName;
166 }
167
168 /**
169 * {@inheritDoc}
170 *
171 * @see org.jboss.dna.graph.observe.Observable#register(org.jboss.dna.graph.observe.ChangeObserver)
172 */
173 public boolean register( ChangeObserver observer ) {
174 return observationBus.register(observer);
175 }
176
177 /**
178 * {@inheritDoc}
179 *
180 * @see org.jboss.dna.graph.observe.Observable#unregister(org.jboss.dna.graph.observe.ChangeObserver)
181 */
182 public boolean unregister( ChangeObserver observer ) {
183 return observationBus.unregister(observer);
184 }
185
186 /**
187 * @return executionContextFactory
188 */
189 public ExecutionContext getExecutionContext() {
190 return executionContext;
191 }
192
193 /**
194 * @return administrator
195 */
196 public ServiceAdministrator getAdministrator() {
197 return this.administrator;
198 }
199
200 /**
201 * Utility method called by the administrator.
202 */
203 protected void start() {
204 // Do not establish connections to the pools; these will be established as needed
205
206 }
207
208 /**
209 * Utility method called by the administrator.
210 */
211 protected void shutdown() {
212 // Close all connections to the pools. This is done inside the pools write lock.
213 try {
214 this.sourcesLock.readLock().lock();
215 for (RepositoryConnectionPool pool : this.pools) {
216 pool.shutdown();
217 }
218 } finally {
219 this.sourcesLock.readLock().unlock();
220 }
221 // Remove all listeners ...
222 this.observationBus.shutdown();
223 }
224
225 /**
226 * Utility method called by the administrator.
227 *
228 * @param timeout
229 * @param unit
230 * @return true if all pools were terminated in the supplied time (or were already terminated), or false if the timeout
231 * occurred before all the connections were closed
232 * @throws InterruptedException
233 */
234 protected boolean awaitTermination( long timeout,
235 TimeUnit unit ) throws InterruptedException {
236 // Check whether all source pools are shut down. This is done inside the pools write lock.
237 try {
238 this.sourcesLock.readLock().lock();
239 for (RepositoryConnectionPool pool : this.pools) {
240 if (!pool.awaitTermination(timeout, unit)) return false;
241 }
242 return true;
243 } finally {
244 this.sourcesLock.readLock().unlock();
245 }
246 }
247
248 /**
249 * Returns true if this federated repository is in the process of terminating after {@link ServiceAdministrator#shutdown()}
250 * has been called on the {@link #getAdministrator() administrator}, but the federated repository has connections that have
251 * not yet normally been {@link RepositoryConnection#close() closed}. This method may be useful for debugging. A return of
252 * <tt>true</tt> reported a sufficient period after shutdown may indicate that connection users have ignored or suppressed
253 * interruption, causing this repository not to properly terminate.
254 *
255 * @return true if terminating but not yet terminated, or false otherwise
256 * @see #isTerminated()
257 */
258 public boolean isTerminating() {
259 try {
260 this.sourcesLock.readLock().lock();
261 for (RepositoryConnectionPool pool : this.pools) {
262 if (pool.isTerminating()) return true;
263 }
264 return false;
265 } finally {
266 this.sourcesLock.readLock().unlock();
267 }
268 }
269
270 /**
271 * Return true if this federated repository has completed its termination and no longer has any open connections.
272 *
273 * @return true if terminated, or false otherwise
274 * @see #isTerminating()
275 */
276 public boolean isTerminated() {
277 try {
278 this.sourcesLock.readLock().lock();
279 for (RepositoryConnectionPool pool : this.pools) {
280 if (!pool.isTerminated()) return false;
281 }
282 return true;
283 } finally {
284 this.sourcesLock.readLock().unlock();
285 }
286 }
287
288 /**
289 * Get an unmodifiable collection of {@link RepositorySource} names.
290 *
291 * @return the pools
292 */
293 public Collection<String> getSourceNames() {
294 Set<String> sourceNames = new HashSet<String>();
295 for (RepositoryConnectionPool pool : this.pools) {
296 sourceNames.add(pool.getRepositorySource().getName());
297 }
298 return Collections.unmodifiableCollection(sourceNames);
299 }
300
301 /**
302 * Get an unmodifiable collection of {@link RepositorySource} instances managed by this instance.
303 *
304 * @return the pools
305 */
306 public Collection<RepositorySource> getSources() {
307 List<RepositorySource> sources = new LinkedList<RepositorySource>();
308 for (RepositoryConnectionPool pool : this.pools) {
309 sources.add(pool.getRepositorySource());
310 }
311 return Collections.unmodifiableCollection(sources);
312 }
313
314 /**
315 * Get the RepositorySource with the specified name managed by this instance.
316 *
317 * @param sourceName the name of the source
318 * @return the source, or null if no such source exists in this instance
319 */
320 public RepositorySource getSource( String sourceName ) {
321 try {
322 this.sourcesLock.readLock().lock();
323 for (RepositoryConnectionPool existingPool : this.pools) {
324 RepositorySource source = existingPool.getRepositorySource();
325 if (source.getName().equals(sourceName)) return source;
326 }
327 } finally {
328 this.sourcesLock.readLock().unlock();
329 }
330 return null;
331 }
332
333 /**
334 * Get the connection pool managing the {@link RepositorySource} with the specified name managed by this instance.
335 *
336 * @param sourceName the name of the source
337 * @return the pool, or null if no such pool exists in this instance
338 */
339 public RepositoryConnectionPool getConnectionPool( String sourceName ) {
340 try {
341 this.sourcesLock.readLock().lock();
342 for (RepositoryConnectionPool existingPool : this.pools) {
343 RepositorySource source = existingPool.getRepositorySource();
344 if (source.getName().equals(sourceName)) return existingPool;
345 }
346 } finally {
347 this.sourcesLock.readLock().unlock();
348 }
349 return null;
350 }
351
352 /**
353 * Add the supplied federated source. This method returns false if the source is null.
354 *
355 * @param source the source to add
356 * @return true if the source is added, or false if the reference is null or if there is already an existing source with the
357 * supplied name.
358 */
359 public boolean addSource( RepositorySource source ) {
360 if (source == null) return false;
361 try {
362 this.sourcesLock.writeLock().lock();
363 final String sourceName = source.getName();
364 for (RepositoryConnectionPool existingPool : this.pools) {
365 if (existingPool.getRepositorySource().getName().equals(sourceName)) return false;
366 }
367 // Create a repository context for this source ...
368 final ObservationBus observationBus = this.observationBus;
369 RepositoryContext repositoryContext = new RepositoryContext() {
370 /**
371 * {@inheritDoc}
372 *
373 * @see org.jboss.dna.graph.connector.RepositoryContext#getExecutionContext()
374 */
375 public ExecutionContext getExecutionContext() {
376 return RepositoryLibrary.this.getExecutionContext();
377 }
378
379 /**
380 * {@inheritDoc}
381 *
382 * @see org.jboss.dna.graph.connector.RepositoryContext#getRepositoryConnectionFactory()
383 */
384 public RepositoryConnectionFactory getRepositoryConnectionFactory() {
385 return RepositoryLibrary.this;
386 }
387
388 /**
389 * {@inheritDoc}
390 *
391 * @see org.jboss.dna.graph.connector.RepositoryContext#getObserver()
392 */
393 public Observer getObserver() {
394 return observationBus.hasObservers() ? observationBus : null;
395 }
396
397 /**
398 * {@inheritDoc}
399 *
400 * @see org.jboss.dna.graph.connector.RepositoryContext#getConfiguration(int)
401 */
402 public Subgraph getConfiguration( int depth ) {
403 Subgraph result = null;
404 RepositorySource configSource = getConfigurationSource();
405 if (configSource != null) {
406 Graph config = Graph.create(configSource, getExecutionContext());
407 String workspaceName = getConfigurationWorkspaceName();
408 if (workspaceName != null) {
409 config.useWorkspace(workspaceName);
410 }
411 Path configPath = getPathToConfigurationRoot();
412 Path sourcePath = getExecutionContext().getValueFactories().getPathFactory().create(configPath,
413 sourceName);
414 result = config.getSubgraphOfDepth(depth).at(sourcePath);
415 }
416 return result;
417 }
418 };
419 source.initialize(repositoryContext);
420 RepositoryConnectionPool pool = new RepositoryConnectionPool(source);
421 this.pools.add(pool);
422 return true;
423 } finally {
424 this.sourcesLock.writeLock().unlock();
425 }
426 }
427
428 /**
429 * Remove from this federated repository the supplied source (or a source with the same name as that supplied). This call
430 * shuts down the connections in the source in an orderly fashion, allowing those connection currently in use to be used and
431 * closed normally, but preventing further connections from being used.
432 * <p>
433 * This method can safely be called while the federation repository is in use.
434 * </p>
435 *
436 * @param source the source to be removed
437 * @param timeToAwait the amount of time to wait while all of the source's connections are closed, or non-positive if the call
438 * should not wait at all
439 * @param unit the time unit to be used for <code>timeToAwait</code>
440 * @return true if the source was removed, or false if the source was not a source for this repository.
441 * @throws InterruptedException if the thread is interrupted while awaiting closing of the connections
442 */
443 public boolean removeSource( RepositorySource source,
444 long timeToAwait,
445 TimeUnit unit ) throws InterruptedException {
446 // Use the name; don't use the object equality ...
447 return removeSource(source.getName(), timeToAwait, unit) != null;
448 }
449
450 /**
451 * Remove from this federated repository the source with the supplied name. This call shuts down the connections in the source
452 * in an orderly fashion, allowing those connection currently in use to be used and closed normally, but preventing further
453 * connections from being used.
454 *
455 * @param name the name of the source to be removed
456 * @param timeToAwait the amount of time to wait while all of the source's connections are closed, or non-positive if the call
457 * should not wait at all
458 * @param unit the time unit to be used for <code>timeToAwait</code>
459 * @return the source with the supplied name that was removed, or null if no existing source matching the supplied name could
460 * be found
461 * @throws InterruptedException if the thread is interrupted while awaiting closing of the connections
462 */
463 public RepositorySource removeSource( String name,
464 long timeToAwait,
465 TimeUnit unit ) throws InterruptedException {
466 try {
467 this.sourcesLock.writeLock().lock();
468 for (RepositoryConnectionPool existingPool : this.pools) {
469 if (existingPool.getRepositorySource().getName().equals(name)) {
470 // Shut down the source ...
471 existingPool.shutdown();
472 if (timeToAwait > 0L) existingPool.awaitTermination(timeToAwait, unit);
473 }
474 return existingPool.getRepositorySource();
475 }
476 } finally {
477 this.sourcesLock.writeLock().unlock();
478 }
479 return null;
480 }
481
482 /**
483 * {@inheritDoc}
484 *
485 * @see org.jboss.dna.graph.connector.RepositoryConnectionFactory#createConnection(java.lang.String)
486 */
487 public RepositoryConnection createConnection( String sourceName ) {
488 try {
489 this.sourcesLock.readLock().lock();
490 for (RepositoryConnectionPool existingPool : this.pools) {
491 RepositorySource source = existingPool.getRepositorySource();
492 if (source.getName().equals(sourceName)) return existingPool.getConnection();
493 }
494 RepositoryConnectionFactory delegate = this.delegate;
495 if (delegate != null) {
496 return delegate.createConnection(sourceName);
497 }
498 } finally {
499 this.sourcesLock.readLock().unlock();
500 }
501 return null;
502 }
503
504 protected interface ObservationBus extends Observable, Observer {
505 boolean hasObservers();
506
507 void shutdown();
508 }
509
510 protected class InMemoryObservationBus implements ObservationBus {
511 private final ChangeObservers observers = new ChangeObservers();
512
513 protected InMemoryObservationBus() {
514 }
515
516 /**
517 * {@inheritDoc}
518 *
519 * @see org.jboss.dna.graph.observe.Observable#register(org.jboss.dna.graph.observe.ChangeObserver)
520 */
521 public boolean register( ChangeObserver observer ) {
522 return observers.register(observer);
523 }
524
525 /**
526 * {@inheritDoc}
527 *
528 * @see org.jboss.dna.graph.observe.Observable#unregister(org.jboss.dna.graph.observe.ChangeObserver)
529 */
530 public boolean unregister( ChangeObserver observer ) {
531 return observers.unregister(observer);
532 }
533
534 /**
535 * {@inheritDoc}
536 *
537 * @see org.jboss.dna.graph.observe.Observer#notify(org.jboss.dna.graph.observe.Changes)
538 */
539 public void notify( Changes changes ) {
540 if (changes != null) {
541 // Broadcast the changes to the registered observers ...
542 observers.broadcast(changes);
543 }
544 }
545
546 /**
547 * {@inheritDoc}
548 *
549 * @see org.jboss.dna.repository.RepositoryLibrary.ObservationBus#hasObservers()
550 */
551 public boolean hasObservers() {
552 return !observers.isEmpty();
553 }
554
555 /**
556 * {@inheritDoc}
557 *
558 * @see org.jboss.dna.repository.RepositoryLibrary.ObservationBus#shutdown()
559 */
560 public void shutdown() {
561 observers.shutdown();
562 }
563 }
564 }