View Javadoc

1   /*
2    * ModeShape (http://www.modeshape.org)
3    * See the COPYRIGHT.txt file distributed with this work for information
4    * regarding copyright ownership.  Some portions may be licensed
5    * to Red Hat, Inc. under one or more contributor license agreements.
6    * See the AUTHORS.txt file in the distribution for a full listing of 
7    * individual contributors. 
8    *
9    * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
10   * is licensed to you under the terms of the GNU Lesser General Public License as
11   * published by the Free Software Foundation; either version 2.1 of
12   * the License, or (at your option) any later version.
13   *
14   * ModeShape is distributed in the hope that it will be useful,
15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17   * Lesser General Public License for more details.
18   *
19   * You should have received a copy of the GNU Lesser General Public
20   * License along with this software; if not, write to the Free
21   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23   */
24  package org.modeshape.repository;
25  
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.HashMap;
29  import java.util.HashSet;
30  import java.util.LinkedList;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.locks.ReadWriteLock;
35  import java.util.concurrent.locks.ReentrantReadWriteLock;
36  import net.jcip.annotations.ThreadSafe;
37  import org.modeshape.common.util.CheckArg;
38  import org.modeshape.graph.ExecutionContext;
39  import org.modeshape.graph.Graph;
40  import org.modeshape.graph.Subgraph;
41  import org.modeshape.graph.connector.RepositoryConnection;
42  import org.modeshape.graph.connector.RepositoryConnectionFactory;
43  import org.modeshape.graph.connector.RepositoryConnectionPool;
44  import org.modeshape.graph.connector.RepositoryContext;
45  import org.modeshape.graph.connector.RepositorySource;
46  import org.modeshape.graph.observe.Observable;
47  import org.modeshape.graph.observe.ObservationBus;
48  import org.modeshape.graph.observe.Observer;
49  import org.modeshape.graph.property.Path;
50  import org.modeshape.repository.service.AbstractServiceAdministrator;
51  import org.modeshape.repository.service.ServiceAdministrator;
52  
53  /**
54   * A library of {@link RepositorySource} instances and the {@link RepositoryConnectionPool} used to manage the connections for
55   * each.
56   */
57  @ThreadSafe
58  public class RepositoryLibrary implements RepositoryConnectionFactory, Observable {
59  
60      /**
61       * The administrative component for this service.
62       * 
63       * @author Randall Hauch
64       */
65      protected class Administrator extends AbstractServiceAdministrator {
66  
67          protected Administrator() {
68              super(RepositoryI18n.repositoryServiceName, State.STARTED);
69          }
70  
71          /**
72           * {@inheritDoc}
73           */
74          @Override
75          protected void doStart( State fromState ) {
76              super.doStart(fromState);
77              RepositoryLibrary.this.start();
78          }
79  
80          /**
81           * {@inheritDoc}
82           */
83          @Override
84          protected void doShutdown( State fromState ) {
85              super.doShutdown(fromState);
86              RepositoryLibrary.this.shutdown();
87          }
88  
89          /**
90           * {@inheritDoc}
91           */
92          public boolean awaitTermination( long timeout,
93                                           TimeUnit unit ) throws InterruptedException {
94              return RepositoryLibrary.this.awaitTermination(timeout, unit);
95          }
96  
97          /**
98           * {@inheritDoc}
99           */
100         @Override
101         protected boolean doCheckIsTerminated() {
102             return RepositoryLibrary.this.isTerminated();
103         }
104 
105     }
106 
107     private final ServiceAdministrator administrator = new Administrator();
108     private final ReadWriteLock sourcesLock = new ReentrantReadWriteLock();
109     private final Map<String, RepositoryConnectionPool> pools = new HashMap<String, RepositoryConnectionPool>();
110     private RepositoryConnectionFactory delegate;
111     private final ExecutionContext executionContext;
112     private final ObservationBus observationBus;
113     private final RepositorySource configurationSource;
114     private final String configurationWorkspaceName;
115     private final Path pathToConfigurationRoot;
116 
117     /**
118      * Create a new manager instance.
119      * 
120      * @param configurationSource the {@link RepositorySource} that is the configuration repository
121      * @param configurationWorkspaceName the name of the workspace in the {@link RepositorySource} that is the configuration
122      *        repository, or null if the default workspace of the source should be used (if there is one)
123      * @param pathToSourcesConfigurationRoot the path of the node in the configuration source repository that should be treated by
124      *        this service as the root of the service's configuration
125      * @param context the execution context in which this service should run
126      * @param observationBus the {@link ObservationBus} instance that should be used for changes in the sources
127      * @throws IllegalArgumentException if any of the <code>configurationSource</code>,
128      *         <code>pathToSourcesConfigurationRoot</code>, <code>observationBus</code>, or <code>context</code> references are
129      *         null
130      */
131     public RepositoryLibrary( RepositorySource configurationSource,
132                               String configurationWorkspaceName,
133                               Path pathToSourcesConfigurationRoot,
134                               final ExecutionContext context,
135                               ObservationBus observationBus ) {
136         CheckArg.isNotNull(configurationSource, "configurationSource");
137         CheckArg.isNotNull(context, "context");
138         CheckArg.isNotNull(pathToSourcesConfigurationRoot, "pathToSourcesConfigurationRoot");
139         CheckArg.isNotNull(observationBus, "observationBus");
140         this.executionContext = context;
141         this.configurationSource = configurationSource;
142         this.configurationWorkspaceName = configurationWorkspaceName;
143         this.pathToConfigurationRoot = pathToSourcesConfigurationRoot;
144         this.observationBus = observationBus;
145     }
146 
147     /**
148      * Get the path to the top-level of the configuration root.
149      * 
150      * @return pathToConfigurationRoot
151      */
152     protected Path getPathToConfigurationRoot() {
153         return pathToConfigurationRoot;
154     }
155 
156     /**
157      * @return configurationSource
158      */
159     protected RepositorySource getConfigurationSource() {
160         return configurationSource;
161     }
162 
163     /**
164      * @return configurationWorkspaceName
165      */
166     protected String getConfigurationWorkspaceName() {
167         return configurationWorkspaceName;
168     }
169 
170     /**
171      * {@inheritDoc}
172      * <p>
173      * This can be used to register observers for all of the repository sources managed by this library. The supplied observer
174      * will receive all of the changes originating from these sources.
175      * </p>
176      * 
177      * @see org.modeshape.graph.observe.Observable#register(org.modeshape.graph.observe.Observer)
178      */
179     public boolean register( Observer observer ) {
180         if (observer == null) return false;
181         return observationBus.register(observer);
182     }
183 
184     /**
185      * {@inheritDoc}
186      * <p>
187      * This can be used to unregister observers for all of the repository sources managed by this library.
188      * </p>
189      * 
190      * @see org.modeshape.graph.observe.Observable#unregister(org.modeshape.graph.observe.Observer)
191      */
192     public boolean unregister( Observer observer ) {
193         return observationBus.unregister(observer);
194     }
195 
196     /**
197      * @return executionContextFactory
198      */
199     public ExecutionContext getExecutionContext() {
200         return executionContext;
201     }
202 
203     /**
204      * @return administrator
205      */
206     public ServiceAdministrator getAdministrator() {
207         return this.administrator;
208     }
209 
210     /**
211      * Utility method called by the administrator.
212      */
213     protected void start() {
214         // Do not establish connections to the pools; these will be established as needed
215 
216     }
217 
218     /**
219      * Utility method called by the administrator.
220      */
221     protected void shutdown() {
222         // Close all connections to the pools. This is done inside the pools write lock.
223         try {
224             this.sourcesLock.readLock().lock();
225             for (RepositoryConnectionPool pool : this.pools.values()) {
226                 // Shutdown the pool of connections ...
227                 pool.shutdown();
228                 // Now close the source (still allows in-use connections to be used) ...
229                 pool.getRepositorySource().close();
230             }
231         } finally {
232             this.sourcesLock.readLock().unlock();
233         }
234     }
235 
236     /**
237      * Utility method called by the administrator.
238      * 
239      * @param timeout
240      * @param unit
241      * @return true if all pools were terminated in the supplied time (or were already terminated), or false if the timeout
242      *         occurred before all the connections were closed
243      * @throws InterruptedException
244      */
245     protected boolean awaitTermination( long timeout,
246                                         TimeUnit unit ) throws InterruptedException {
247         // Check whether all source pools are shut down. This is done inside the pools write lock.
248         try {
249             this.sourcesLock.readLock().lock();
250             for (RepositoryConnectionPool pool : this.pools.values()) {
251                 if (!pool.awaitTermination(timeout, unit)) return false;
252             }
253             return true;
254         } finally {
255             this.sourcesLock.readLock().unlock();
256         }
257     }
258 
259     /**
260      * Returns true if this library is in the process of terminating after {@link ServiceAdministrator#shutdown()} has been called
261      * on the {@link #getAdministrator() administrator}, but the library has connections that have not yet normally been
262      * {@link RepositoryConnection#close() closed}. This method may be useful for debugging. A return of <tt>true</tt> reported a
263      * sufficient period after shutdown may indicate that connection users have ignored or suppressed interruption, causing this
264      * repository not to properly terminate.
265      * 
266      * @return true if terminating but not yet terminated, or false otherwise
267      * @see #isTerminated()
268      */
269     public boolean isTerminating() {
270         try {
271             this.sourcesLock.readLock().lock();
272             for (RepositoryConnectionPool pool : this.pools.values()) {
273                 if (pool.isTerminating()) return true;
274             }
275             return false;
276         } finally {
277             this.sourcesLock.readLock().unlock();
278         }
279     }
280 
281     /**
282      * Return true if this library has completed its termination and no longer has any open connections.
283      * 
284      * @return true if terminated, or false otherwise
285      * @see #isTerminating()
286      */
287     public boolean isTerminated() {
288         try {
289             this.sourcesLock.readLock().lock();
290             for (RepositoryConnectionPool pool : this.pools.values()) {
291                 if (!pool.isTerminated()) return false;
292             }
293             return true;
294         } finally {
295             this.sourcesLock.readLock().unlock();
296         }
297     }
298 
299     /**
300      * Get an unmodifiable collection of {@link RepositorySource} names.
301      * 
302      * @return the pools
303      */
304     public Collection<String> getSourceNames() {
305         try {
306             this.sourcesLock.readLock().lock();
307             return Collections.unmodifiableCollection(new HashSet<String>(this.pools.keySet()));
308         } finally {
309             this.sourcesLock.readLock().unlock();
310         }
311     }
312 
313     /**
314      * Get an unmodifiable collection of {@link RepositorySource} instances managed by this instance.
315      * 
316      * @return the pools
317      */
318     public Collection<RepositorySource> getSources() {
319         List<RepositorySource> sources = new LinkedList<RepositorySource>();
320         try {
321             this.sourcesLock.readLock().lock();
322             for (RepositoryConnectionPool pool : this.pools.values()) {
323                 sources.add(pool.getRepositorySource());
324             }
325             return Collections.unmodifiableCollection(sources);
326         } finally {
327             this.sourcesLock.readLock().unlock();
328         }
329     }
330 
331     /**
332      * Get the RepositorySource with the specified name managed by this instance.
333      * 
334      * @param sourceName the name of the source
335      * @return the source, or null if no such source exists in this instance
336      */
337     public RepositorySource getSource( String sourceName ) {
338         try {
339             this.sourcesLock.readLock().lock();
340             RepositoryConnectionPool existingPool = this.pools.get(sourceName);
341             return existingPool == null ? null : existingPool.getRepositorySource();
342         } finally {
343             this.sourcesLock.readLock().unlock();
344         }
345     }
346 
347     /**
348      * Get the connection pool managing the {@link RepositorySource} with the specified name managed by this instance.
349      * 
350      * @param sourceName the name of the source
351      * @return the pool, or null if no such pool exists in this instance
352      */
353     public RepositoryConnectionPool getConnectionPool( String sourceName ) {
354         try {
355             this.sourcesLock.readLock().lock();
356             return this.pools.get(sourceName);
357         } finally {
358             this.sourcesLock.readLock().unlock();
359         }
360     }
361 
362     /**
363      * Add the supplied source. This method returns false if the source is null.
364      * 
365      * @param source the source to add
366      * @return true if the source is added, or false if the reference is null or if there is already an existing source with the
367      *         supplied name.
368      */
369     public boolean addSource( RepositorySource source ) {
370         return addSource(source, false);
371     }
372 
373     /**
374      * Add the supplied source. This method returns false if the source is null.
375      * <p>
376      * If a source with the same name already exists, it will be replaced only if <code>replaceIfExisting</code> is true. If this
377      * is the case, then the existing source will be removed from the connection pool, and that pool will be
378      * {@link RepositoryConnectionPool#shutdown() shutdown} (allowing any in-use connections to be used and finished normally).
379      * </p>
380      * 
381      * @param source the source to add
382      * @param replaceIfExisting true if an existing source should be replaced, or false if this method should return false if
383      *        there is already an existing source with the supplied name.
384      * @return true if the source is added, or false if the reference is null or if there is already an existing source with the
385      *         supplied name.
386      */
387     public boolean addSource( RepositorySource source,
388                               boolean replaceIfExisting ) {
389         if (source == null) return false;
390         final String sourceName = source.getName();
391         if (!replaceIfExisting) {
392             // Don't want to replace existing, so make sure there isn't one already ...
393             try {
394                 this.sourcesLock.readLock().lock();
395                 if (this.pools.containsKey(sourceName)) return false;
396             } finally {
397                 this.sourcesLock.readLock().unlock();
398             }
399         }
400         // Create a repository context for this source ...
401         final ObservationBus observationBus = this.observationBus;
402         RepositoryContext repositoryContext = new RepositoryContext() {
403             /**
404              * {@inheritDoc}
405              * 
406              * @see org.modeshape.graph.connector.RepositoryContext#getExecutionContext()
407              */
408             public ExecutionContext getExecutionContext() {
409                 return RepositoryLibrary.this.getExecutionContext();
410             }
411 
412             /**
413              * {@inheritDoc}
414              * 
415              * @see org.modeshape.graph.connector.RepositoryContext#getRepositoryConnectionFactory()
416              */
417             public RepositoryConnectionFactory getRepositoryConnectionFactory() {
418                 return RepositoryLibrary.this;
419             }
420 
421             /**
422              * {@inheritDoc}
423              * 
424              * @see org.modeshape.graph.connector.RepositoryContext#getObserver()
425              */
426             public Observer getObserver() {
427                 return observationBus.hasObservers() ? observationBus : null;
428             }
429 
430             /**
431              * {@inheritDoc}
432              * 
433              * @see org.modeshape.graph.connector.RepositoryContext#getConfiguration(int)
434              */
435             public Subgraph getConfiguration( int depth ) {
436                 Subgraph result = null;
437                 RepositorySource configSource = getConfigurationSource();
438                 if (configSource != null) {
439                     Graph config = Graph.create(configSource, getExecutionContext());
440                     String workspaceName = getConfigurationWorkspaceName();
441                     if (workspaceName != null) {
442                         config.useWorkspace(workspaceName);
443                     }
444                     Path configPath = getPathToConfigurationRoot();
445                     Path sourcePath = getExecutionContext().getValueFactories().getPathFactory().create(configPath, sourceName);
446                     result = config.getSubgraphOfDepth(depth).at(sourcePath);
447                 }
448                 return result;
449             }
450         };
451         // Do this before we remove the existing pool ...
452         source.initialize(repositoryContext);
453         RepositoryConnectionPool pool = new RepositoryConnectionPool(source);
454         try {
455             this.sourcesLock.writeLock().lock();
456             // Need to first remove any existing one ...
457             RepositoryConnectionPool existingPool = this.pools.remove(sourceName);
458             if (existingPool != null) {
459                 // Then shut down the source gracefully (and don't wait) ...
460                 existingPool.shutdown();
461             }
462             this.pools.put(sourceName, pool);
463             return true;
464         } finally {
465             this.sourcesLock.writeLock().unlock();
466         }
467     }
468 
469     /**
470      * Remove from this library the supplied source (or a source with the same name as that supplied). This call shuts down the
471      * connections in the source in an orderly fashion, allowing those connection currently in use to be used and closed normally,
472      * but preventing further connections from being used.
473      * <p>
474      * This method can safely be called while the federation repository is in use.
475      * </p>
476      * 
477      * @param source the source to be removed
478      * @param timeToAwait the amount of time to wait while all of the source's connections are closed, or non-positive if the call
479      *        should not wait at all
480      * @param unit the time unit to be used for <code>timeToAwait</code>
481      * @return true if the source was removed, or false if the source was not a source for this repository.
482      * @throws InterruptedException if the thread is interrupted while awaiting closing of the connections
483      */
484     public boolean removeSource( RepositorySource source,
485                                  long timeToAwait,
486                                  TimeUnit unit ) throws InterruptedException {
487         // Use the name; don't use the object equality ...
488         return removeSource(source.getName(), timeToAwait, unit) != null;
489     }
490 
491     /**
492      * Remove from this library the source with the supplied name. This call shuts down the connections in the source in an
493      * orderly fashion, allowing those connection currently in use to be used and closed normally, but preventing further
494      * connections from being used. However, this method never waits until the connections are all closed, and is equivalent to
495      * calling <code>removeSource(name,0,TimeUnit.SECONDS)</code>.
496      * 
497      * @param name the name of the source to be removed
498      * @return the source with the supplied name that was removed, or null if no existing source matching the supplied name could
499      *         be found
500      * @see #removeSource(String, long, TimeUnit)
501      */
502     public RepositorySource removeSource( String name ) {
503         try {
504             this.sourcesLock.writeLock().lock();
505             RepositoryConnectionPool existingPool = this.pools.remove(name);
506             if (existingPool != null) {
507                 // Then shut down the source gracefully (and don't wait) ...
508                 existingPool.shutdown();
509                 return existingPool.getRepositorySource();
510             }
511         } finally {
512             this.sourcesLock.writeLock().unlock();
513         }
514         return null;
515     }
516 
517     /**
518      * Remove from this library the source with the supplied name. This call shuts down the connections in the source in an
519      * orderly fashion, allowing those connection currently in use to be used and closed normally, but preventing further
520      * connections from being used.
521      * 
522      * @param name the name of the source to be removed
523      * @param timeToAwait the amount of time to wait while all of the source's connections are closed, or non-positive if the call
524      *        should not wait at all
525      * @param unit the time unit to be used for <code>timeToAwait</code>
526      * @return the source with the supplied name that was removed, or null if no existing source matching the supplied name could
527      *         be found
528      * @throws InterruptedException if the thread is interrupted while awaiting closing of the connections
529      * @see #removeSource(String)
530      */
531     public RepositorySource removeSource( String name,
532                                           long timeToAwait,
533                                           TimeUnit unit ) throws InterruptedException {
534         try {
535             this.sourcesLock.writeLock().lock();
536             RepositoryConnectionPool existingPool = this.pools.remove(name);
537             if (existingPool != null) {
538                 // Then shut down the source gracefully (and don't wait) ...
539                 existingPool.shutdown();
540                 if (timeToAwait > 0L) existingPool.awaitTermination(timeToAwait, unit);
541                 return existingPool.getRepositorySource();
542             }
543         } finally {
544             this.sourcesLock.writeLock().unlock();
545         }
546         return null;
547     }
548 
549     /**
550      * {@inheritDoc}
551      * 
552      * @see org.modeshape.graph.connector.RepositoryConnectionFactory#createConnection(java.lang.String)
553      */
554     public RepositoryConnection createConnection( String sourceName ) {
555         try {
556             this.sourcesLock.readLock().lock();
557             RepositoryConnectionPool existingPool = this.pools.get(sourceName);
558             if (existingPool != null) return existingPool.getConnection();
559             RepositoryConnectionFactory delegate = this.delegate;
560             if (delegate != null) {
561                 return delegate.createConnection(sourceName);
562             }
563         } finally {
564             this.sourcesLock.readLock().unlock();
565         }
566         return null;
567     }
568 }