001 /*
002 * JBoss, Home of Professional Open Source.
003 * Copyright 2008, Red Hat Middleware LLC, and individual contributors
004 * as indicated by the @author tags. See the copyright.txt file in the
005 * distribution for a full listing of individual contributors.
006 *
007 * This is free software; you can redistribute it and/or modify it
008 * under the terms of the GNU Lesser General Public License as
009 * published by the Free Software Foundation; either version 2.1 of
010 * the License, or (at your option) any later version.
011 *
012 * This software is distributed in the hope that it will be useful,
013 * but WITHOUT ANY WARRANTY; without even the implied warranty of
014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015 * Lesser General Public License for more details.
016 *
017 * You should have received a copy of the GNU Lesser General Public
018 * License along with this software; if not, write to the Free
019 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021 */
022 package org.jboss.dna.repository;
023
024 import java.util.Collection;
025 import java.util.Collections;
026 import java.util.HashSet;
027 import java.util.LinkedList;
028 import java.util.List;
029 import java.util.Set;
030 import java.util.concurrent.CopyOnWriteArrayList;
031 import java.util.concurrent.TimeUnit;
032 import java.util.concurrent.locks.ReadWriteLock;
033 import java.util.concurrent.locks.ReentrantReadWriteLock;
034 import net.jcip.annotations.ThreadSafe;
035 import org.jboss.dna.common.util.CheckArg;
036 import org.jboss.dna.graph.BasicExecutionContextFactory;
037 import org.jboss.dna.graph.ExecutionContext;
038 import org.jboss.dna.graph.ExecutionContextFactory;
039 import org.jboss.dna.graph.connectors.RepositoryConnection;
040 import org.jboss.dna.graph.connectors.RepositoryConnectionFactory;
041 import org.jboss.dna.graph.connectors.RepositoryConnectionPool;
042 import org.jboss.dna.graph.connectors.RepositoryContext;
043 import org.jboss.dna.graph.connectors.RepositorySource;
044 import org.jboss.dna.repository.services.AbstractServiceAdministrator;
045 import org.jboss.dna.repository.services.ServiceAdministrator;
046
047 /**
048 * A library of {@link RepositorySource} instances and the {@link RepositoryConnectionPool} used to manage the connections for
049 * each.
050 *
051 * @author Randall Hauch
052 */
053 @ThreadSafe
054 public class RepositoryLibrary implements RepositoryConnectionFactory {
055
056 /**
057 * The administrative component for this service.
058 *
059 * @author Randall Hauch
060 */
061 protected class Administrator extends AbstractServiceAdministrator {
062
063 protected Administrator() {
064 super(RepositoryI18n.federationServiceName, State.STARTED);
065 }
066
067 /**
068 * {@inheritDoc}
069 */
070 @Override
071 protected void doStart( State fromState ) {
072 super.doStart(fromState);
073 RepositoryLibrary.this.start();
074 }
075
076 /**
077 * {@inheritDoc}
078 */
079 @Override
080 protected void doShutdown( State fromState ) {
081 super.doShutdown(fromState);
082 RepositoryLibrary.this.shutdown();
083 }
084
085 /**
086 * {@inheritDoc}
087 */
088 public boolean awaitTermination( long timeout,
089 TimeUnit unit ) throws InterruptedException {
090 return RepositoryLibrary.this.awaitTermination(timeout, unit);
091 }
092
093 /**
094 * {@inheritDoc}
095 */
096 @Override
097 protected boolean doCheckIsTerminated() {
098 return RepositoryLibrary.this.isTerminated();
099 }
100
101 }
102
103 private final ServiceAdministrator administrator = new Administrator();
104 private final ReadWriteLock sourcesLock = new ReentrantReadWriteLock();
105 private final CopyOnWriteArrayList<RepositoryConnectionPool> pools = new CopyOnWriteArrayList<RepositoryConnectionPool>();
106 private RepositoryConnectionFactory delegate;
107 private final ExecutionContextFactory executionContextFactory;
108 private final RepositoryContext repositoryContext;
109
110 /**
111 * Create a new manager instance.
112 */
113 public RepositoryLibrary() {
114 this(new BasicExecutionContextFactory(), null);
115 }
116
117 /**
118 * Create a new manager instance.
119 *
120 * @param delegate the connection factory to which this instance should delegate in the event that a source is not found in
121 * this manager; may be null if there is no delegate
122 */
123 public RepositoryLibrary( RepositoryConnectionFactory delegate ) {
124 this(new BasicExecutionContextFactory(), delegate);
125 }
126
127 /**
128 * Create a new manager instance.
129 *
130 * @param executionContextFactory the execution context factory, used by sources to create {@link ExecutionContext} instances
131 * @throws IllegalArgumentException if the <code>executionContextFactory</code> reference is null
132 */
133 public RepositoryLibrary( ExecutionContextFactory executionContextFactory ) {
134 this(executionContextFactory, null);
135 }
136
137 /**
138 * Create a new manager instance.
139 *
140 * @param executionContextFactory the execution context factory, used by sources to create {@link ExecutionContext} instances
141 * @param delegate the connection factory to which this instance should delegate in the event that a source is not found in
142 * this manager; may be null if there is no delegate
143 * @throws IllegalArgumentException if the <code>executionContextFactory</code> reference is null
144 */
145 public RepositoryLibrary( ExecutionContextFactory executionContextFactory,
146 RepositoryConnectionFactory delegate ) {
147 CheckArg.isNotNull(executionContextFactory, "executionContextFactory");
148 this.delegate = delegate;
149 this.executionContextFactory = executionContextFactory;
150 this.repositoryContext = new RepositoryContext() {
151 /**
152 * {@inheritDoc}
153 *
154 * @see org.jboss.dna.graph.connectors.RepositoryContext#getExecutionContextFactory()
155 */
156 public ExecutionContextFactory getExecutionContextFactory() {
157 return RepositoryLibrary.this.getExecutionContextFactory();
158 }
159
160 /**
161 * {@inheritDoc}
162 *
163 * @see org.jboss.dna.graph.connectors.RepositoryContext#getRepositoryConnectionFactory()
164 */
165 public RepositoryConnectionFactory getRepositoryConnectionFactory() {
166 return RepositoryLibrary.this;
167 }
168 };
169 }
170
171 /**
172 * @return executionContextFactory
173 */
174 public ExecutionContextFactory getExecutionContextFactory() {
175 return executionContextFactory;
176 }
177
178 /**
179 * Get the delegate connection factory.
180 *
181 * @return the connection factory to which this instance should delegate in the event that a source is not found in this
182 * manager, or null if there is no delegate
183 */
184 public RepositoryConnectionFactory getDelegate() {
185 return delegate;
186 }
187
188 /**
189 * Set the delegate connection factory.
190 *
191 * @param delegate the connection factory to which this instance should delegate in the event that a source is not found in
192 * this manager; may be null if there is no delegate
193 */
194 public void setDelegate( RepositoryConnectionFactory delegate ) {
195 this.delegate = delegate;
196 }
197
198 /**
199 * @return administrator
200 */
201 public ServiceAdministrator getAdministrator() {
202 return this.administrator;
203 }
204
205 /**
206 * Utility method called by the administrator.
207 */
208 protected void start() {
209 // Do not establish connections to the pools; these will be established as needed
210
211 }
212
213 /**
214 * Utility method called by the administrator.
215 */
216 protected void shutdown() {
217 // Close all connections to the pools. This is done inside the pools write lock.
218 try {
219 this.sourcesLock.readLock().lock();
220 for (RepositoryConnectionPool pool : this.pools) {
221 pool.shutdown();
222 }
223 } finally {
224 this.sourcesLock.readLock().unlock();
225 }
226 }
227
228 /**
229 * Utility method called by the administrator.
230 *
231 * @param timeout
232 * @param unit
233 * @return true if all pools were terminated in the supplied time (or were already terminated), or false if the timeout
234 * occurred before all the connections were closed
235 * @throws InterruptedException
236 */
237 protected boolean awaitTermination( long timeout,
238 TimeUnit unit ) throws InterruptedException {
239 // Check whether all source pools are shut down. This is done inside the pools write lock.
240 try {
241 this.sourcesLock.readLock().lock();
242 for (RepositoryConnectionPool pool : this.pools) {
243 if (!pool.awaitTermination(timeout, unit)) return false;
244 }
245 return true;
246 } finally {
247 this.sourcesLock.readLock().unlock();
248 }
249 }
250
251 /**
252 * Returns true if this federated repository is in the process of terminating after {@link ServiceAdministrator#shutdown()}
253 * has been called on the {@link #getAdministrator() administrator}, but the federated repository has connections that have
254 * not yet normally been {@link RepositoryConnection#close() closed}. This method may be useful for debugging. A return of
255 * <tt>true</tt> reported a sufficient period after shutdown may indicate that connection users have ignored or suppressed
256 * interruption, causing this repository not to properly terminate.
257 *
258 * @return true if terminating but not yet terminated, or false otherwise
259 * @see #isTerminated()
260 */
261 public boolean isTerminating() {
262 try {
263 this.sourcesLock.readLock().lock();
264 for (RepositoryConnectionPool pool : this.pools) {
265 if (pool.isTerminating()) return true;
266 }
267 return false;
268 } finally {
269 this.sourcesLock.readLock().unlock();
270 }
271 }
272
273 /**
274 * Return true if this federated repository has completed its termination and no longer has any open connections.
275 *
276 * @return true if terminated, or false otherwise
277 * @see #isTerminating()
278 */
279 public boolean isTerminated() {
280 try {
281 this.sourcesLock.readLock().lock();
282 for (RepositoryConnectionPool pool : this.pools) {
283 if (!pool.isTerminated()) return false;
284 }
285 return true;
286 } finally {
287 this.sourcesLock.readLock().unlock();
288 }
289 }
290
291 /**
292 * Get an unmodifiable collection of {@link RepositorySource} names.
293 *
294 * @return the pools
295 */
296 public Collection<String> getSourceNames() {
297 Set<String> sourceNames = new HashSet<String>();
298 for (RepositoryConnectionPool pool : this.pools) {
299 sourceNames.add(pool.getRepositorySource().getName());
300 }
301 return Collections.unmodifiableCollection(sourceNames);
302 }
303
304 /**
305 * Get an unmodifiable collection of {@link RepositorySource} instances managed by this instance.
306 *
307 * @return the pools
308 */
309 public Collection<RepositorySource> getSources() {
310 List<RepositorySource> sources = new LinkedList<RepositorySource>();
311 for (RepositoryConnectionPool pool : this.pools) {
312 sources.add(pool.getRepositorySource());
313 }
314 return Collections.unmodifiableCollection(sources);
315 }
316
317 /**
318 * Get the RepositorySource with the specified name managed by this instance.
319 *
320 * @param sourceName the name of the source
321 * @return the source, or null if no such source exists in this instance
322 */
323 public RepositorySource getSource( String sourceName ) {
324 try {
325 this.sourcesLock.readLock().lock();
326 for (RepositoryConnectionPool existingPool : this.pools) {
327 RepositorySource source = existingPool.getRepositorySource();
328 if (source.getName().equals(sourceName)) return source;
329 }
330 } finally {
331 this.sourcesLock.readLock().unlock();
332 }
333 return null;
334 }
335
336 /**
337 * Get the connection pool managing the {@link RepositorySource} with the specified name managed by this instance.
338 *
339 * @param sourceName the name of the source
340 * @return the pool, or null if no such pool exists in this instance
341 */
342 public RepositoryConnectionPool getConnectionPool( String sourceName ) {
343 try {
344 this.sourcesLock.readLock().lock();
345 for (RepositoryConnectionPool existingPool : this.pools) {
346 RepositorySource source = existingPool.getRepositorySource();
347 if (source.getName().equals(sourceName)) return existingPool;
348 }
349 } finally {
350 this.sourcesLock.readLock().unlock();
351 }
352 return null;
353 }
354
355 /**
356 * Add the supplied federated source. This method returns false if the source is null.
357 *
358 * @param source the source to add
359 * @return true if the source is added, or false if the reference is null or if there is already an existing source with the
360 * supplied name.
361 */
362 public boolean addSource( RepositorySource source ) {
363 if (source == null) return false;
364 try {
365 this.sourcesLock.writeLock().lock();
366 for (RepositoryConnectionPool existingPool : this.pools) {
367 if (existingPool.getRepositorySource().getName().equals(source.getName())) return false;
368 }
369 source.initialize(repositoryContext);
370 RepositoryConnectionPool pool = new RepositoryConnectionPool(source);
371 this.pools.add(pool);
372 return true;
373 } finally {
374 this.sourcesLock.writeLock().unlock();
375 }
376 }
377
378 /**
379 * Remove from this federated repository the supplied source (or a source with the same name as that supplied). This call
380 * shuts down the connections in the source in an orderly fashion, allowing those connection currently in use to be used and
381 * closed normally, but preventing further connections from being used.
382 * <p>
383 * This method can safely be called while the federation repository is in use.
384 * </p>
385 *
386 * @param source the source to be removed
387 * @param timeToAwait the amount of time to wait while all of the source's connections are closed, or non-positive if the call
388 * should not wait at all
389 * @param unit the time unit to be used for <code>timeToAwait</code>
390 * @return true if the source was removed, or false if the source was not a source for this repository.
391 * @throws InterruptedException if the thread is interrupted while awaiting closing of the connections
392 */
393 public boolean removeSource( RepositorySource source,
394 long timeToAwait,
395 TimeUnit unit ) throws InterruptedException {
396 // Use the name; don't use the object equality ...
397 return removeSource(source.getName(), timeToAwait, unit) != null;
398 }
399
400 /**
401 * Remove from this federated repository the source with the supplied name. This call shuts down the connections in the source
402 * in an orderly fashion, allowing those connection currently in use to be used and closed normally, but preventing further
403 * connections from being used.
404 *
405 * @param name the name of the source to be removed
406 * @param timeToAwait the amount of time to wait while all of the source's connections are closed, or non-positive if the call
407 * should not wait at all
408 * @param unit the time unit to be used for <code>timeToAwait</code>
409 * @return the source with the supplied name that was removed, or null if no existing source matching the supplied name could
410 * be found
411 * @throws InterruptedException if the thread is interrupted while awaiting closing of the connections
412 */
413 public RepositorySource removeSource( String name,
414 long timeToAwait,
415 TimeUnit unit ) throws InterruptedException {
416 try {
417 this.sourcesLock.writeLock().lock();
418 for (RepositoryConnectionPool existingPool : this.pools) {
419 if (existingPool.getRepositorySource().getName().equals(name)) {
420 // Shut down the source ...
421 existingPool.shutdown();
422 if (timeToAwait > 0L) existingPool.awaitTermination(timeToAwait, unit);
423 }
424 return existingPool.getRepositorySource();
425 }
426 } finally {
427 this.sourcesLock.writeLock().unlock();
428 }
429 return null;
430 }
431
432 /**
433 * {@inheritDoc}
434 *
435 * @see org.jboss.dna.graph.connectors.RepositoryConnectionFactory#createConnection(java.lang.String)
436 */
437 public RepositoryConnection createConnection( String sourceName ) {
438 try {
439 this.sourcesLock.readLock().lock();
440 for (RepositoryConnectionPool existingPool : this.pools) {
441 RepositorySource source = existingPool.getRepositorySource();
442 if (source.getName().equals(sourceName)) return existingPool.getConnection();
443 }
444 RepositoryConnectionFactory delegate = this.delegate;
445 if (delegate != null) {
446 return delegate.createConnection(sourceName);
447 }
448 } finally {
449 this.sourcesLock.readLock().unlock();
450 }
451 return null;
452 }
453 }