View Javadoc

1   /*
2    * ModeShape (http://www.modeshape.org)
3    * See the COPYRIGHT.txt file distributed with this work for information
4    * regarding copyright ownership.  Some portions may be licensed
5    * to Red Hat, Inc. under one or more contributor license agreements.
6    * See the AUTHORS.txt file in the distribution for a full listing of 
7    * individual contributors. 
8    *
9    * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
10   * is licensed to you under the terms of the GNU Lesser General Public License as
11   * published by the Free Software Foundation; either version 2.1 of
12   * the License, or (at your option) any later version.
13   *
14   * ModeShape is distributed in the hope that it will be useful,
15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17   * Lesser General Public License for more details.
18   *
19   * You should have received a copy of the GNU Lesser General Public
20   * License along with this software; if not, write to the Free
21   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23   */
24  package org.modeshape.repository;
25  
26  import java.util.Collection;
27  import java.util.Collections;
28  import java.util.HashMap;
29  import java.util.HashSet;
30  import java.util.LinkedList;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.locks.ReadWriteLock;
35  import java.util.concurrent.locks.ReentrantReadWriteLock;
36  import net.jcip.annotations.ThreadSafe;
37  import org.modeshape.common.util.CheckArg;
38  import org.modeshape.graph.ExecutionContext;
39  import org.modeshape.graph.Graph;
40  import org.modeshape.graph.Subgraph;
41  import org.modeshape.graph.connector.RepositoryConnection;
42  import org.modeshape.graph.connector.RepositoryConnectionFactory;
43  import org.modeshape.graph.connector.RepositoryConnectionPool;
44  import org.modeshape.graph.connector.RepositoryContext;
45  import org.modeshape.graph.connector.RepositorySource;
46  import org.modeshape.graph.observe.Observable;
47  import org.modeshape.graph.observe.ObservationBus;
48  import org.modeshape.graph.observe.Observer;
49  import org.modeshape.graph.property.Path;
50  import org.modeshape.repository.service.AbstractServiceAdministrator;
51  import org.modeshape.repository.service.ServiceAdministrator;
52  
53  /**
54   * A library of {@link RepositorySource} instances and the {@link RepositoryConnectionPool} used to manage the connections for
55   * each.
56   */
57  @ThreadSafe
58  public class RepositoryLibrary implements RepositoryConnectionFactory, Observable {
59  
60      /**
61       * The administrative component for this service.
62       * 
63       * @author Randall Hauch
64       */
65      protected class Administrator extends AbstractServiceAdministrator {
66  
67          protected Administrator() {
68              super(RepositoryI18n.federationServiceName, State.STARTED);
69          }
70  
71          /**
72           * {@inheritDoc}
73           */
74          @Override
75          protected void doStart( State fromState ) {
76              super.doStart(fromState);
77              RepositoryLibrary.this.start();
78          }
79  
80          /**
81           * {@inheritDoc}
82           */
83          @Override
84          protected void doShutdown( State fromState ) {
85              super.doShutdown(fromState);
86              RepositoryLibrary.this.shutdown();
87          }
88  
89          /**
90           * {@inheritDoc}
91           */
92          public boolean awaitTermination( long timeout,
93                                           TimeUnit unit ) throws InterruptedException {
94              return RepositoryLibrary.this.awaitTermination(timeout, unit);
95          }
96  
97          /**
98           * {@inheritDoc}
99           */
100         @Override
101         protected boolean doCheckIsTerminated() {
102             return RepositoryLibrary.this.isTerminated();
103         }
104 
105     }
106 
107     private final ServiceAdministrator administrator = new Administrator();
108     private final ReadWriteLock sourcesLock = new ReentrantReadWriteLock();
109     private final Map<String, RepositoryConnectionPool> pools = new HashMap<String, RepositoryConnectionPool>();
110     private RepositoryConnectionFactory delegate;
111     private final ExecutionContext executionContext;
112     private final ObservationBus observationBus = new ObservationBus();
113     private final RepositorySource configurationSource;
114     private final String configurationWorkspaceName;
115     private final Path pathToConfigurationRoot;
116 
117     /**
118      * Create a new manager instance.
119      * 
120      * @param configurationSource the {@link RepositorySource} that is the configuration repository
121      * @param configurationWorkspaceName the name of the workspace in the {@link RepositorySource} that is the configuration
122      *        repository, or null if the default workspace of the source should be used (if there is one)
123      * @param pathToSourcesConfigurationRoot the path of the node in the configuration source repository that should be treated by
124      *        this service as the root of the service's configuration; if null, then "/dna:system" is used
125      * @param context the execution context in which this service should run
126      * @throws IllegalArgumentException if the <code>executionContextFactory</code> reference is null
127      */
128     public RepositoryLibrary( RepositorySource configurationSource,
129                               String configurationWorkspaceName,
130                               Path pathToSourcesConfigurationRoot,
131                               final ExecutionContext context ) {
132         CheckArg.isNotNull(configurationSource, "configurationSource");
133         CheckArg.isNotNull(context, "context");
134         CheckArg.isNotNull(pathToSourcesConfigurationRoot, "pathToSourcesConfigurationRoot");
135         this.executionContext = context;
136         this.configurationSource = configurationSource;
137         this.configurationWorkspaceName = configurationWorkspaceName;
138         this.pathToConfigurationRoot = pathToSourcesConfigurationRoot;
139     }
140 
141     /**
142      * Get the path to the top-level of the configuration root.
143      * 
144      * @return pathToConfigurationRoot
145      */
146     protected Path getPathToConfigurationRoot() {
147         return pathToConfigurationRoot;
148     }
149 
150     /**
151      * @return configurationSource
152      */
153     protected RepositorySource getConfigurationSource() {
154         return configurationSource;
155     }
156 
157     /**
158      * @return configurationWorkspaceName
159      */
160     protected String getConfigurationWorkspaceName() {
161         return configurationWorkspaceName;
162     }
163 
164     /**
165      * {@inheritDoc}
166      * <p>
167      * This can be used to register observers for all of the repository sources managed by this library. The supplied observer
168      * will receive all of the changes originating from these sources.
169      * </p>
170      * 
171      * @see org.modeshape.graph.observe.Observable#register(org.modeshape.graph.observe.Observer)
172      */
173     public boolean register( Observer observer ) {
174         if (observer == null) return false;
175         return observationBus.register(observer);
176     }
177 
178     /**
179      * {@inheritDoc}
180      * <p>
181      * This can be used to unregister observers for all of the repository sources managed by this library.
182      * </p>
183      * 
184      * @see org.modeshape.graph.observe.Observable#unregister(org.modeshape.graph.observe.Observer)
185      */
186     public boolean unregister( Observer observer ) {
187         return observationBus.unregister(observer);
188     }
189 
190     /**
191      * @return executionContextFactory
192      */
193     public ExecutionContext getExecutionContext() {
194         return executionContext;
195     }
196 
197     /**
198      * @return administrator
199      */
200     public ServiceAdministrator getAdministrator() {
201         return this.administrator;
202     }
203 
204     /**
205      * Utility method called by the administrator.
206      */
207     protected void start() {
208         // Do not establish connections to the pools; these will be established as needed
209 
210     }
211 
212     /**
213      * Utility method called by the administrator.
214      */
215     protected void shutdown() {
216         // Close all connections to the pools. This is done inside the pools write lock.
217         try {
218             this.sourcesLock.readLock().lock();
219             for (RepositoryConnectionPool pool : this.pools.values()) {
220                 // Shutdown the pool of connections ...
221                 pool.shutdown();
222                 // Now close the source (still allows in-use connections to be used) ...
223                 pool.getRepositorySource().close();
224             }
225         } finally {
226             this.sourcesLock.readLock().unlock();
227         }
228         // Remove all listeners ...
229         this.observationBus.shutdown();
230     }
231 
232     /**
233      * Utility method called by the administrator.
234      * 
235      * @param timeout
236      * @param unit
237      * @return true if all pools were terminated in the supplied time (or were already terminated), or false if the timeout
238      *         occurred before all the connections were closed
239      * @throws InterruptedException
240      */
241     protected boolean awaitTermination( long timeout,
242                                         TimeUnit unit ) throws InterruptedException {
243         // Check whether all source pools are shut down. This is done inside the pools write lock.
244         try {
245             this.sourcesLock.readLock().lock();
246             for (RepositoryConnectionPool pool : this.pools.values()) {
247                 if (!pool.awaitTermination(timeout, unit)) return false;
248             }
249             return true;
250         } finally {
251             this.sourcesLock.readLock().unlock();
252         }
253     }
254 
255     /**
256      * Returns true if this library is in the process of terminating after {@link ServiceAdministrator#shutdown()} has been called
257      * on the {@link #getAdministrator() administrator}, but the library has connections that have not yet normally been
258      * {@link RepositoryConnection#close() closed}. This method may be useful for debugging. A return of <tt>true</tt> reported a
259      * sufficient period after shutdown may indicate that connection users have ignored or suppressed interruption, causing this
260      * repository not to properly terminate.
261      * 
262      * @return true if terminating but not yet terminated, or false otherwise
263      * @see #isTerminated()
264      */
265     public boolean isTerminating() {
266         try {
267             this.sourcesLock.readLock().lock();
268             for (RepositoryConnectionPool pool : this.pools.values()) {
269                 if (pool.isTerminating()) return true;
270             }
271             return false;
272         } finally {
273             this.sourcesLock.readLock().unlock();
274         }
275     }
276 
277     /**
278      * Return true if this library has completed its termination and no longer has any open connections.
279      * 
280      * @return true if terminated, or false otherwise
281      * @see #isTerminating()
282      */
283     public boolean isTerminated() {
284         try {
285             this.sourcesLock.readLock().lock();
286             for (RepositoryConnectionPool pool : this.pools.values()) {
287                 if (!pool.isTerminated()) return false;
288             }
289             return true;
290         } finally {
291             this.sourcesLock.readLock().unlock();
292         }
293     }
294 
295     /**
296      * Get an unmodifiable collection of {@link RepositorySource} names.
297      * 
298      * @return the pools
299      */
300     public Collection<String> getSourceNames() {
301         try {
302             this.sourcesLock.readLock().lock();
303             return Collections.unmodifiableCollection(new HashSet<String>(this.pools.keySet()));
304         } finally {
305             this.sourcesLock.readLock().unlock();
306         }
307     }
308 
309     /**
310      * Get an unmodifiable collection of {@link RepositorySource} instances managed by this instance.
311      * 
312      * @return the pools
313      */
314     public Collection<RepositorySource> getSources() {
315         List<RepositorySource> sources = new LinkedList<RepositorySource>();
316         try {
317             this.sourcesLock.readLock().lock();
318             for (RepositoryConnectionPool pool : this.pools.values()) {
319                 sources.add(pool.getRepositorySource());
320             }
321             return Collections.unmodifiableCollection(sources);
322         } finally {
323             this.sourcesLock.readLock().unlock();
324         }
325     }
326 
327     /**
328      * Get the RepositorySource with the specified name managed by this instance.
329      * 
330      * @param sourceName the name of the source
331      * @return the source, or null if no such source exists in this instance
332      */
333     public RepositorySource getSource( String sourceName ) {
334         try {
335             this.sourcesLock.readLock().lock();
336             RepositoryConnectionPool existingPool = this.pools.get(sourceName);
337             return existingPool == null ? null : existingPool.getRepositorySource();
338         } finally {
339             this.sourcesLock.readLock().unlock();
340         }
341     }
342 
343     /**
344      * Get the connection pool managing the {@link RepositorySource} with the specified name managed by this instance.
345      * 
346      * @param sourceName the name of the source
347      * @return the pool, or null if no such pool exists in this instance
348      */
349     public RepositoryConnectionPool getConnectionPool( String sourceName ) {
350         try {
351             this.sourcesLock.readLock().lock();
352             return this.pools.get(sourceName);
353         } finally {
354             this.sourcesLock.readLock().unlock();
355         }
356     }
357 
358     /**
359      * Add the supplied source. This method returns false if the source is null.
360      * 
361      * @param source the source to add
362      * @return true if the source is added, or false if the reference is null or if there is already an existing source with the
363      *         supplied name.
364      */
365     public boolean addSource( RepositorySource source ) {
366         return addSource(source, false);
367     }
368 
369     /**
370      * Add the supplied source. This method returns false if the source is null.
371      * <p>
372      * If a source with the same name already exists, it will be replaced only if <code>replaceIfExisting</code> is true. If this
373      * is the case, then the existing source will be removed from the connection pool, and that pool will be
374      * {@link RepositoryConnectionPool#shutdown() shutdown} (allowing any in-use connections to be used and finished normally).
375      * </p>
376      * 
377      * @param source the source to add
378      * @param replaceIfExisting true if an existing source should be replaced, or false if this method should return false if
379      *        there is already an existing source with the supplied name.
380      * @return true if the source is added, or false if the reference is null or if there is already an existing source with the
381      *         supplied name.
382      */
383     public boolean addSource( RepositorySource source,
384                               boolean replaceIfExisting ) {
385         if (source == null) return false;
386         final String sourceName = source.getName();
387         if (!replaceIfExisting) {
388             // Don't want to replace existing, so make sure there isn't one already ...
389             try {
390                 this.sourcesLock.readLock().lock();
391                 if (this.pools.containsKey(sourceName)) return false;
392             } finally {
393                 this.sourcesLock.readLock().unlock();
394             }
395         }
396         // Create a repository context for this source ...
397         final ObservationBus observationBus = this.observationBus;
398         RepositoryContext repositoryContext = new RepositoryContext() {
399             /**
400              * {@inheritDoc}
401              * 
402              * @see org.modeshape.graph.connector.RepositoryContext#getExecutionContext()
403              */
404             public ExecutionContext getExecutionContext() {
405                 return RepositoryLibrary.this.getExecutionContext();
406             }
407 
408             /**
409              * {@inheritDoc}
410              * 
411              * @see org.modeshape.graph.connector.RepositoryContext#getRepositoryConnectionFactory()
412              */
413             public RepositoryConnectionFactory getRepositoryConnectionFactory() {
414                 return RepositoryLibrary.this;
415             }
416 
417             /**
418              * {@inheritDoc}
419              * 
420              * @see org.modeshape.graph.connector.RepositoryContext#getObserver()
421              */
422             public Observer getObserver() {
423                 return observationBus.hasObservers() ? observationBus : null;
424             }
425 
426             /**
427              * {@inheritDoc}
428              * 
429              * @see org.modeshape.graph.connector.RepositoryContext#getConfiguration(int)
430              */
431             public Subgraph getConfiguration( int depth ) {
432                 Subgraph result = null;
433                 RepositorySource configSource = getConfigurationSource();
434                 if (configSource != null) {
435                     Graph config = Graph.create(configSource, getExecutionContext());
436                     String workspaceName = getConfigurationWorkspaceName();
437                     if (workspaceName != null) {
438                         config.useWorkspace(workspaceName);
439                     }
440                     Path configPath = getPathToConfigurationRoot();
441                     Path sourcePath = getExecutionContext().getValueFactories().getPathFactory().create(configPath, sourceName);
442                     result = config.getSubgraphOfDepth(depth).at(sourcePath);
443                 }
444                 return result;
445             }
446         };
447         // Do this before we remove the existing pool ...
448         source.initialize(repositoryContext);
449         RepositoryConnectionPool pool = new RepositoryConnectionPool(source);
450         try {
451             this.sourcesLock.writeLock().lock();
452             // Need to first remove any existing one ...
453             RepositoryConnectionPool existingPool = this.pools.remove(sourceName);
454             if (existingPool != null) {
455                 // Then shut down the source gracefully (and don't wait) ...
456                 existingPool.shutdown();
457             }
458             this.pools.put(sourceName, pool);
459             return true;
460         } finally {
461             this.sourcesLock.writeLock().unlock();
462         }
463     }
464 
465     /**
466      * Remove from this library the supplied source (or a source with the same name as that supplied). This call shuts down the
467      * connections in the source in an orderly fashion, allowing those connection currently in use to be used and closed normally,
468      * but preventing further connections from being used.
469      * <p>
470      * This method can safely be called while the federation repository is in use.
471      * </p>
472      * 
473      * @param source the source to be removed
474      * @param timeToAwait the amount of time to wait while all of the source's connections are closed, or non-positive if the call
475      *        should not wait at all
476      * @param unit the time unit to be used for <code>timeToAwait</code>
477      * @return true if the source was removed, or false if the source was not a source for this repository.
478      * @throws InterruptedException if the thread is interrupted while awaiting closing of the connections
479      */
480     public boolean removeSource( RepositorySource source,
481                                  long timeToAwait,
482                                  TimeUnit unit ) throws InterruptedException {
483         // Use the name; don't use the object equality ...
484         return removeSource(source.getName(), timeToAwait, unit) != null;
485     }
486 
487     /**
488      * Remove from this library the source with the supplied name. This call shuts down the connections in the source in an
489      * orderly fashion, allowing those connection currently in use to be used and closed normally, but preventing further
490      * connections from being used. However, this method never waits until the connections are all closed, and is equivalent to
491      * calling <code>removeSource(name,0,TimeUnit.SECONDS)</code>.
492      * 
493      * @param name the name of the source to be removed
494      * @return the source with the supplied name that was removed, or null if no existing source matching the supplied name could
495      *         be found
496      * @see #removeSource(String, long, TimeUnit)
497      */
498     public RepositorySource removeSource( String name ) {
499         try {
500             this.sourcesLock.writeLock().lock();
501             RepositoryConnectionPool existingPool = this.pools.remove(name);
502             if (existingPool != null) {
503                 // Then shut down the source gracefully (and don't wait) ...
504                 existingPool.shutdown();
505                 return existingPool.getRepositorySource();
506             }
507         } finally {
508             this.sourcesLock.writeLock().unlock();
509         }
510         return null;
511     }
512 
513     /**
514      * Remove from this library the source with the supplied name. This call shuts down the connections in the source in an
515      * orderly fashion, allowing those connection currently in use to be used and closed normally, but preventing further
516      * connections from being used.
517      * 
518      * @param name the name of the source to be removed
519      * @param timeToAwait the amount of time to wait while all of the source's connections are closed, or non-positive if the call
520      *        should not wait at all
521      * @param unit the time unit to be used for <code>timeToAwait</code>
522      * @return the source with the supplied name that was removed, or null if no existing source matching the supplied name could
523      *         be found
524      * @throws InterruptedException if the thread is interrupted while awaiting closing of the connections
525      * @see #removeSource(String)
526      */
527     public RepositorySource removeSource( String name,
528                                           long timeToAwait,
529                                           TimeUnit unit ) throws InterruptedException {
530         try {
531             this.sourcesLock.writeLock().lock();
532             RepositoryConnectionPool existingPool = this.pools.remove(name);
533             if (existingPool != null) {
534                 // Then shut down the source gracefully (and don't wait) ...
535                 existingPool.shutdown();
536                 if (timeToAwait > 0L) existingPool.awaitTermination(timeToAwait, unit);
537                 return existingPool.getRepositorySource();
538             }
539         } finally {
540             this.sourcesLock.writeLock().unlock();
541         }
542         return null;
543     }
544 
545     /**
546      * {@inheritDoc}
547      * 
548      * @see org.modeshape.graph.connector.RepositoryConnectionFactory#createConnection(java.lang.String)
549      */
550     public RepositoryConnection createConnection( String sourceName ) {
551         try {
552             this.sourcesLock.readLock().lock();
553             RepositoryConnectionPool existingPool = this.pools.get(sourceName);
554             if (existingPool != null) return existingPool.getConnection();
555             RepositoryConnectionFactory delegate = this.delegate;
556             if (delegate != null) {
557                 return delegate.createConnection(sourceName);
558             }
559         } finally {
560             this.sourcesLock.readLock().unlock();
561         }
562         return null;
563     }
564 }