001    /*
002     * JBoss DNA (http://www.jboss.org/dna)
003     * See the COPYRIGHT.txt file distributed with this work for information
004     * regarding copyright ownership.  Some portions may be licensed
005     * to Red Hat, Inc. under one or more contributor license agreements.
006     * See the AUTHORS.txt file in the distribution for a full listing of 
007     * individual contributors. 
008     *
009     * JBoss DNA is free software. Unless otherwise indicated, all code in JBoss DNA
010     * is licensed to you under the terms of the GNU Lesser General Public License as
011     * published by the Free Software Foundation; either version 2.1 of
012     * the License, or (at your option) any later version.
013     *
014     * JBoss DNA is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
017     * Lesser General Public License for more details.
018     *
019     * You should have received a copy of the GNU Lesser General Public
020     * License along with this software; if not, write to the Free
021     * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
022     * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
023     */
024    package org.jboss.dna.repository;
025    
026    import java.util.Collection;
027    import java.util.Collections;
028    import java.util.HashSet;
029    import java.util.LinkedList;
030    import java.util.List;
031    import java.util.Set;
032    import java.util.concurrent.CopyOnWriteArrayList;
033    import java.util.concurrent.TimeUnit;
034    import java.util.concurrent.locks.ReadWriteLock;
035    import java.util.concurrent.locks.ReentrantReadWriteLock;
036    import net.jcip.annotations.ThreadSafe;
037    import org.jboss.dna.common.util.CheckArg;
038    import org.jboss.dna.graph.ExecutionContext;
039    import org.jboss.dna.graph.Graph;
040    import org.jboss.dna.graph.Subgraph;
041    import org.jboss.dna.graph.connector.RepositoryConnection;
042    import org.jboss.dna.graph.connector.RepositoryConnectionFactory;
043    import org.jboss.dna.graph.connector.RepositoryConnectionPool;
044    import org.jboss.dna.graph.connector.RepositoryContext;
045    import org.jboss.dna.graph.connector.RepositorySource;
046    import org.jboss.dna.graph.observe.ChangeObserver;
047    import org.jboss.dna.graph.observe.ChangeObservers;
048    import org.jboss.dna.graph.observe.Changes;
049    import org.jboss.dna.graph.observe.Observable;
050    import org.jboss.dna.graph.observe.Observer;
051    import org.jboss.dna.graph.property.Path;
052    import org.jboss.dna.repository.service.AbstractServiceAdministrator;
053    import org.jboss.dna.repository.service.ServiceAdministrator;
054    
055    /**
056     * A library of {@link RepositorySource} instances and the {@link RepositoryConnectionPool} used to manage the connections for
057     * each.
058     * 
059     * @author Randall Hauch
060     */
061    @ThreadSafe
062    public class RepositoryLibrary implements RepositoryConnectionFactory, Observable {
063    
064        /**
065         * The administrative component for this service.
066         * 
067         * @author Randall Hauch
068         */
069        protected class Administrator extends AbstractServiceAdministrator {
070    
071            protected Administrator() {
072                super(RepositoryI18n.federationServiceName, State.STARTED);
073            }
074    
075            /**
076             * {@inheritDoc}
077             */
078            @Override
079            protected void doStart( State fromState ) {
080                super.doStart(fromState);
081                RepositoryLibrary.this.start();
082            }
083    
084            /**
085             * {@inheritDoc}
086             */
087            @Override
088            protected void doShutdown( State fromState ) {
089                super.doShutdown(fromState);
090                RepositoryLibrary.this.shutdown();
091            }
092    
093            /**
094             * {@inheritDoc}
095             */
096            public boolean awaitTermination( long timeout,
097                                             TimeUnit unit ) throws InterruptedException {
098                return RepositoryLibrary.this.awaitTermination(timeout, unit);
099            }
100    
101            /**
102             * {@inheritDoc}
103             */
104            @Override
105            protected boolean doCheckIsTerminated() {
106                return RepositoryLibrary.this.isTerminated();
107            }
108    
109        }
110    
111        private final ServiceAdministrator administrator = new Administrator();
112        private final ReadWriteLock sourcesLock = new ReentrantReadWriteLock();
113        private final CopyOnWriteArrayList<RepositoryConnectionPool> pools = new CopyOnWriteArrayList<RepositoryConnectionPool>();
114        private RepositoryConnectionFactory delegate;
115        private final ExecutionContext executionContext;
116        private final ObservationBus observationBus = new InMemoryObservationBus();
117        private final RepositorySource configurationSource;
118        private final String configurationWorkspaceName;
119        private final Path pathToConfigurationRoot;
120    
121        /**
122         * Create a new manager instance.
123         * 
124         * @param configurationSource the {@link RepositorySource} that is the configuration repository
125         * @param configurationWorkspaceName the name of the workspace in the {@link RepositorySource} that is the configuration
126         *        repository, or null if the default workspace of the source should be used (if there is one)
127         * @param pathToSourcesConfigurationRoot the path of the node in the configuration source repository that should be treated by
128         *        this service as the root of the service's configuration; if null, then "/dna:system" is used
129         * @param context the execution context in which this service should run
130         * @throws IllegalArgumentException if the <code>executionContextFactory</code> reference is null
131         */
132        public RepositoryLibrary( RepositorySource configurationSource,
133                                  String configurationWorkspaceName,
134                                  Path pathToSourcesConfigurationRoot,
135                                  final ExecutionContext context ) {
136            CheckArg.isNotNull(configurationSource, "configurationSource");
137            CheckArg.isNotNull(context, "context");
138            CheckArg.isNotNull(pathToSourcesConfigurationRoot, "pathToSourcesConfigurationRoot");
139            this.executionContext = context;
140            this.configurationSource = configurationSource;
141            this.configurationWorkspaceName = configurationWorkspaceName;
142            this.pathToConfigurationRoot = pathToSourcesConfigurationRoot;
143        }
144    
145        /**
146         * Get the path to the top-level of the configuration root.
147         * 
148         * @return pathToConfigurationRoot
149         */
150        protected Path getPathToConfigurationRoot() {
151            return pathToConfigurationRoot;
152        }
153    
154        /**
155         * @return configurationSource
156         */
157        protected RepositorySource getConfigurationSource() {
158            return configurationSource;
159        }
160    
161        /**
162         * @return configurationWorkspaceName
163         */
164        protected String getConfigurationWorkspaceName() {
165            return configurationWorkspaceName;
166        }
167    
168        /**
169         * {@inheritDoc}
170         * 
171         * @see org.jboss.dna.graph.observe.Observable#register(org.jboss.dna.graph.observe.ChangeObserver)
172         */
173        public boolean register( ChangeObserver observer ) {
174            return observationBus.register(observer);
175        }
176    
177        /**
178         * {@inheritDoc}
179         * 
180         * @see org.jboss.dna.graph.observe.Observable#unregister(org.jboss.dna.graph.observe.ChangeObserver)
181         */
182        public boolean unregister( ChangeObserver observer ) {
183            return observationBus.unregister(observer);
184        }
185    
186        /**
187         * @return executionContextFactory
188         */
189        public ExecutionContext getExecutionContext() {
190            return executionContext;
191        }
192    
193        /**
194         * @return administrator
195         */
196        public ServiceAdministrator getAdministrator() {
197            return this.administrator;
198        }
199    
200        /**
201         * Utility method called by the administrator.
202         */
203        protected void start() {
204            // Do not establish connections to the pools; these will be established as needed
205    
206        }
207    
208        /**
209         * Utility method called by the administrator.
210         */
211        protected void shutdown() {
212            // Close all connections to the pools. This is done inside the pools write lock.
213            try {
214                this.sourcesLock.readLock().lock();
215                for (RepositoryConnectionPool pool : this.pools) {
216                    pool.shutdown();
217                }
218            } finally {
219                this.sourcesLock.readLock().unlock();
220            }
221            // Remove all listeners ...
222            this.observationBus.shutdown();
223        }
224    
225        /**
226         * Utility method called by the administrator.
227         * 
228         * @param timeout
229         * @param unit
230         * @return true if all pools were terminated in the supplied time (or were already terminated), or false if the timeout
231         *         occurred before all the connections were closed
232         * @throws InterruptedException
233         */
234        protected boolean awaitTermination( long timeout,
235                                            TimeUnit unit ) throws InterruptedException {
236            // Check whether all source pools are shut down. This is done inside the pools write lock.
237            try {
238                this.sourcesLock.readLock().lock();
239                for (RepositoryConnectionPool pool : this.pools) {
240                    if (!pool.awaitTermination(timeout, unit)) return false;
241                }
242                return true;
243            } finally {
244                this.sourcesLock.readLock().unlock();
245            }
246        }
247    
248        /**
249         * Returns true if this federated repository is in the process of terminating after {@link ServiceAdministrator#shutdown()}
250         * has been called on the {@link #getAdministrator() administrator}, but the federated repository has connections that have
251         * not yet normally been {@link RepositoryConnection#close() closed}. This method may be useful for debugging. A return of
252         * <tt>true</tt> reported a sufficient period after shutdown may indicate that connection users have ignored or suppressed
253         * interruption, causing this repository not to properly terminate.
254         * 
255         * @return true if terminating but not yet terminated, or false otherwise
256         * @see #isTerminated()
257         */
258        public boolean isTerminating() {
259            try {
260                this.sourcesLock.readLock().lock();
261                for (RepositoryConnectionPool pool : this.pools) {
262                    if (pool.isTerminating()) return true;
263                }
264                return false;
265            } finally {
266                this.sourcesLock.readLock().unlock();
267            }
268        }
269    
270        /**
271         * Return true if this federated repository has completed its termination and no longer has any open connections.
272         * 
273         * @return true if terminated, or false otherwise
274         * @see #isTerminating()
275         */
276        public boolean isTerminated() {
277            try {
278                this.sourcesLock.readLock().lock();
279                for (RepositoryConnectionPool pool : this.pools) {
280                    if (!pool.isTerminated()) return false;
281                }
282                return true;
283            } finally {
284                this.sourcesLock.readLock().unlock();
285            }
286        }
287    
288        /**
289         * Get an unmodifiable collection of {@link RepositorySource} names.
290         * 
291         * @return the pools
292         */
293        public Collection<String> getSourceNames() {
294            Set<String> sourceNames = new HashSet<String>();
295            for (RepositoryConnectionPool pool : this.pools) {
296                sourceNames.add(pool.getRepositorySource().getName());
297            }
298            return Collections.unmodifiableCollection(sourceNames);
299        }
300    
301        /**
302         * Get an unmodifiable collection of {@link RepositorySource} instances managed by this instance.
303         * 
304         * @return the pools
305         */
306        public Collection<RepositorySource> getSources() {
307            List<RepositorySource> sources = new LinkedList<RepositorySource>();
308            for (RepositoryConnectionPool pool : this.pools) {
309                sources.add(pool.getRepositorySource());
310            }
311            return Collections.unmodifiableCollection(sources);
312        }
313    
314        /**
315         * Get the RepositorySource with the specified name managed by this instance.
316         * 
317         * @param sourceName the name of the source
318         * @return the source, or null if no such source exists in this instance
319         */
320        public RepositorySource getSource( String sourceName ) {
321            try {
322                this.sourcesLock.readLock().lock();
323                for (RepositoryConnectionPool existingPool : this.pools) {
324                    RepositorySource source = existingPool.getRepositorySource();
325                    if (source.getName().equals(sourceName)) return source;
326                }
327            } finally {
328                this.sourcesLock.readLock().unlock();
329            }
330            return null;
331        }
332    
333        /**
334         * Get the connection pool managing the {@link RepositorySource} with the specified name managed by this instance.
335         * 
336         * @param sourceName the name of the source
337         * @return the pool, or null if no such pool exists in this instance
338         */
339        public RepositoryConnectionPool getConnectionPool( String sourceName ) {
340            try {
341                this.sourcesLock.readLock().lock();
342                for (RepositoryConnectionPool existingPool : this.pools) {
343                    RepositorySource source = existingPool.getRepositorySource();
344                    if (source.getName().equals(sourceName)) return existingPool;
345                }
346            } finally {
347                this.sourcesLock.readLock().unlock();
348            }
349            return null;
350        }
351    
352        /**
353         * Add the supplied federated source. This method returns false if the source is null.
354         * 
355         * @param source the source to add
356         * @return true if the source is added, or false if the reference is null or if there is already an existing source with the
357         *         supplied name.
358         */
359        public boolean addSource( RepositorySource source ) {
360            if (source == null) return false;
361            try {
362                this.sourcesLock.writeLock().lock();
363                final String sourceName = source.getName();
364                for (RepositoryConnectionPool existingPool : this.pools) {
365                    if (existingPool.getRepositorySource().getName().equals(sourceName)) return false;
366                }
367                // Create a repository context for this source ...
368                final ObservationBus observationBus = this.observationBus;
369                RepositoryContext repositoryContext = new RepositoryContext() {
370                    /**
371                     * {@inheritDoc}
372                     * 
373                     * @see org.jboss.dna.graph.connector.RepositoryContext#getExecutionContext()
374                     */
375                    public ExecutionContext getExecutionContext() {
376                        return RepositoryLibrary.this.getExecutionContext();
377                    }
378    
379                    /**
380                     * {@inheritDoc}
381                     * 
382                     * @see org.jboss.dna.graph.connector.RepositoryContext#getRepositoryConnectionFactory()
383                     */
384                    public RepositoryConnectionFactory getRepositoryConnectionFactory() {
385                        return RepositoryLibrary.this;
386                    }
387    
388                    /**
389                     * {@inheritDoc}
390                     * 
391                     * @see org.jboss.dna.graph.connector.RepositoryContext#getObserver()
392                     */
393                    public Observer getObserver() {
394                        return observationBus.hasObservers() ? observationBus : null;
395                    }
396    
397                    /**
398                     * {@inheritDoc}
399                     * 
400                     * @see org.jboss.dna.graph.connector.RepositoryContext#getConfiguration(int)
401                     */
402                    public Subgraph getConfiguration( int depth ) {
403                        Subgraph result = null;
404                        RepositorySource configSource = getConfigurationSource();
405                        if (configSource != null) {
406                            Graph config = Graph.create(configSource, getExecutionContext());
407                            String workspaceName = getConfigurationWorkspaceName();
408                            if (workspaceName != null) {
409                                config.useWorkspace(workspaceName);
410                            }
411                            Path configPath = getPathToConfigurationRoot();
412                            Path sourcePath = getExecutionContext().getValueFactories().getPathFactory().create(configPath,
413                                                                                                                sourceName);
414                            result = config.getSubgraphOfDepth(depth).at(sourcePath);
415                        }
416                        return result;
417                    }
418                };
419                source.initialize(repositoryContext);
420                RepositoryConnectionPool pool = new RepositoryConnectionPool(source);
421                this.pools.add(pool);
422                return true;
423            } finally {
424                this.sourcesLock.writeLock().unlock();
425            }
426        }
427    
428        /**
429         * Remove from this federated repository the supplied source (or a source with the same name as that supplied). This call
430         * shuts down the connections in the source in an orderly fashion, allowing those connection currently in use to be used and
431         * closed normally, but preventing further connections from being used.
432         * <p>
433         * This method can safely be called while the federation repository is in use.
434         * </p>
435         * 
436         * @param source the source to be removed
437         * @param timeToAwait the amount of time to wait while all of the source's connections are closed, or non-positive if the call
438         *        should not wait at all
439         * @param unit the time unit to be used for <code>timeToAwait</code>
440         * @return true if the source was removed, or false if the source was not a source for this repository.
441         * @throws InterruptedException if the thread is interrupted while awaiting closing of the connections
442         */
443        public boolean removeSource( RepositorySource source,
444                                     long timeToAwait,
445                                     TimeUnit unit ) throws InterruptedException {
446            // Use the name; don't use the object equality ...
447            return removeSource(source.getName(), timeToAwait, unit) != null;
448        }
449    
450        /**
451         * Remove from this federated repository the source with the supplied name. This call shuts down the connections in the source
452         * in an orderly fashion, allowing those connection currently in use to be used and closed normally, but preventing further
453         * connections from being used.
454         * 
455         * @param name the name of the source to be removed
456         * @param timeToAwait the amount of time to wait while all of the source's connections are closed, or non-positive if the call
457         *        should not wait at all
458         * @param unit the time unit to be used for <code>timeToAwait</code>
459         * @return the source with the supplied name that was removed, or null if no existing source matching the supplied name could
460         *         be found
461         * @throws InterruptedException if the thread is interrupted while awaiting closing of the connections
462         */
463        public RepositorySource removeSource( String name,
464                                              long timeToAwait,
465                                              TimeUnit unit ) throws InterruptedException {
466            try {
467                this.sourcesLock.writeLock().lock();
468                for (RepositoryConnectionPool existingPool : this.pools) {
469                    if (existingPool.getRepositorySource().getName().equals(name)) {
470                        // Shut down the source ...
471                        existingPool.shutdown();
472                        if (timeToAwait > 0L) existingPool.awaitTermination(timeToAwait, unit);
473                    }
474                    return existingPool.getRepositorySource();
475                }
476            } finally {
477                this.sourcesLock.writeLock().unlock();
478            }
479            return null;
480        }
481    
482        /**
483         * {@inheritDoc}
484         * 
485         * @see org.jboss.dna.graph.connector.RepositoryConnectionFactory#createConnection(java.lang.String)
486         */
487        public RepositoryConnection createConnection( String sourceName ) {
488            try {
489                this.sourcesLock.readLock().lock();
490                for (RepositoryConnectionPool existingPool : this.pools) {
491                    RepositorySource source = existingPool.getRepositorySource();
492                    if (source.getName().equals(sourceName)) return existingPool.getConnection();
493                }
494                RepositoryConnectionFactory delegate = this.delegate;
495                if (delegate != null) {
496                    return delegate.createConnection(sourceName);
497                }
498            } finally {
499                this.sourcesLock.readLock().unlock();
500            }
501            return null;
502        }
503    
504        protected interface ObservationBus extends Observable, Observer {
505            boolean hasObservers();
506    
507            void shutdown();
508        }
509    
510        protected class InMemoryObservationBus implements ObservationBus {
511            private final ChangeObservers observers = new ChangeObservers();
512    
513            protected InMemoryObservationBus() {
514            }
515    
516            /**
517             * {@inheritDoc}
518             * 
519             * @see org.jboss.dna.graph.observe.Observable#register(org.jboss.dna.graph.observe.ChangeObserver)
520             */
521            public boolean register( ChangeObserver observer ) {
522                return observers.register(observer);
523            }
524    
525            /**
526             * {@inheritDoc}
527             * 
528             * @see org.jboss.dna.graph.observe.Observable#unregister(org.jboss.dna.graph.observe.ChangeObserver)
529             */
530            public boolean unregister( ChangeObserver observer ) {
531                return observers.unregister(observer);
532            }
533    
534            /**
535             * {@inheritDoc}
536             * 
537             * @see org.jboss.dna.graph.observe.Observer#notify(org.jboss.dna.graph.observe.Changes)
538             */
539            public void notify( Changes changes ) {
540                if (changes != null) {
541                    // Broadcast the changes to the registered observers ...
542                    observers.broadcast(changes);
543                }
544            }
545    
546            /**
547             * {@inheritDoc}
548             * 
549             * @see org.jboss.dna.repository.RepositoryLibrary.ObservationBus#hasObservers()
550             */
551            public boolean hasObservers() {
552                return !observers.isEmpty();
553            }
554    
555            /**
556             * {@inheritDoc}
557             * 
558             * @see org.jboss.dna.repository.RepositoryLibrary.ObservationBus#shutdown()
559             */
560            public void shutdown() {
561                observers.shutdown();
562            }
563        }
564    }