001 /* 002 * JBoss DNA (http://www.jboss.org/dna) 003 * See the COPYRIGHT.txt file distributed with this work for information 004 * regarding copyright ownership. Some portions may be licensed 005 * to Red Hat, Inc. under one or more contributor license agreements. 006 * See the AUTHORS.txt file in the distribution for a full listing of 007 * individual contributors. 008 * 009 * JBoss DNA is free software. Unless otherwise indicated, all code in JBoss DNA 010 * is licensed to you under the terms of the GNU Lesser General Public License as 011 * published by the Free Software Foundation; either version 2.1 of 012 * the License, or (at your option) any later version. 013 * 014 * JBoss DNA is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 017 * Lesser General Public License for more details. 018 * 019 * You should have received a copy of the GNU Lesser General Public 020 * License along with this software; if not, write to the Free 021 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 022 * 02110-1301 USA, or see the FSF site: http://www.fsf.org. 023 */ 024 package org.jboss.dna.repository; 025 026 import java.util.Collection; 027 import java.util.Collections; 028 import java.util.HashSet; 029 import java.util.LinkedList; 030 import java.util.List; 031 import java.util.Set; 032 import java.util.concurrent.CopyOnWriteArrayList; 033 import java.util.concurrent.TimeUnit; 034 import java.util.concurrent.locks.ReadWriteLock; 035 import java.util.concurrent.locks.ReentrantReadWriteLock; 036 import net.jcip.annotations.ThreadSafe; 037 import org.jboss.dna.common.util.CheckArg; 038 import org.jboss.dna.graph.ExecutionContext; 039 import org.jboss.dna.graph.Graph; 040 import org.jboss.dna.graph.Subgraph; 041 import org.jboss.dna.graph.connector.RepositoryConnection; 042 import org.jboss.dna.graph.connector.RepositoryConnectionFactory; 043 import org.jboss.dna.graph.connector.RepositoryConnectionPool; 044 import org.jboss.dna.graph.connector.RepositoryContext; 045 import org.jboss.dna.graph.connector.RepositorySource; 046 import org.jboss.dna.graph.observe.ChangeObserver; 047 import org.jboss.dna.graph.observe.ChangeObservers; 048 import org.jboss.dna.graph.observe.Changes; 049 import org.jboss.dna.graph.observe.Observable; 050 import org.jboss.dna.graph.observe.Observer; 051 import org.jboss.dna.graph.property.Path; 052 import org.jboss.dna.repository.service.AbstractServiceAdministrator; 053 import org.jboss.dna.repository.service.ServiceAdministrator; 054 055 /** 056 * A library of {@link RepositorySource} instances and the {@link RepositoryConnectionPool} used to manage the connections for 057 * each. 058 * 059 * @author Randall Hauch 060 */ 061 @ThreadSafe 062 public class RepositoryLibrary implements RepositoryConnectionFactory, Observable { 063 064 /** 065 * The administrative component for this service. 066 * 067 * @author Randall Hauch 068 */ 069 protected class Administrator extends AbstractServiceAdministrator { 070 071 protected Administrator() { 072 super(RepositoryI18n.federationServiceName, State.STARTED); 073 } 074 075 /** 076 * {@inheritDoc} 077 */ 078 @Override 079 protected void doStart( State fromState ) { 080 super.doStart(fromState); 081 RepositoryLibrary.this.start(); 082 } 083 084 /** 085 * {@inheritDoc} 086 */ 087 @Override 088 protected void doShutdown( State fromState ) { 089 super.doShutdown(fromState); 090 RepositoryLibrary.this.shutdown(); 091 } 092 093 /** 094 * {@inheritDoc} 095 */ 096 public boolean awaitTermination( long timeout, 097 TimeUnit unit ) throws InterruptedException { 098 return RepositoryLibrary.this.awaitTermination(timeout, unit); 099 } 100 101 /** 102 * {@inheritDoc} 103 */ 104 @Override 105 protected boolean doCheckIsTerminated() { 106 return RepositoryLibrary.this.isTerminated(); 107 } 108 109 } 110 111 private final ServiceAdministrator administrator = new Administrator(); 112 private final ReadWriteLock sourcesLock = new ReentrantReadWriteLock(); 113 private final CopyOnWriteArrayList<RepositoryConnectionPool> pools = new CopyOnWriteArrayList<RepositoryConnectionPool>(); 114 private RepositoryConnectionFactory delegate; 115 private final ExecutionContext executionContext; 116 private final ObservationBus observationBus = new InMemoryObservationBus(); 117 private final RepositorySource configurationSource; 118 private final String configurationWorkspaceName; 119 private final Path pathToConfigurationRoot; 120 121 /** 122 * Create a new manager instance. 123 * 124 * @param configurationSource the {@link RepositorySource} that is the configuration repository 125 * @param configurationWorkspaceName the name of the workspace in the {@link RepositorySource} that is the configuration 126 * repository, or null if the default workspace of the source should be used (if there is one) 127 * @param pathToSourcesConfigurationRoot the path of the node in the configuration source repository that should be treated by 128 * this service as the root of the service's configuration; if null, then "/dna:system" is used 129 * @param context the execution context in which this service should run 130 * @throws IllegalArgumentException if the <code>executionContextFactory</code> reference is null 131 */ 132 public RepositoryLibrary( RepositorySource configurationSource, 133 String configurationWorkspaceName, 134 Path pathToSourcesConfigurationRoot, 135 final ExecutionContext context ) { 136 CheckArg.isNotNull(configurationSource, "configurationSource"); 137 CheckArg.isNotNull(context, "context"); 138 CheckArg.isNotNull(pathToSourcesConfigurationRoot, "pathToSourcesConfigurationRoot"); 139 this.executionContext = context; 140 this.configurationSource = configurationSource; 141 this.configurationWorkspaceName = configurationWorkspaceName; 142 this.pathToConfigurationRoot = pathToSourcesConfigurationRoot; 143 } 144 145 /** 146 * Get the path to the top-level of the configuration root. 147 * 148 * @return pathToConfigurationRoot 149 */ 150 protected Path getPathToConfigurationRoot() { 151 return pathToConfigurationRoot; 152 } 153 154 /** 155 * @return configurationSource 156 */ 157 protected RepositorySource getConfigurationSource() { 158 return configurationSource; 159 } 160 161 /** 162 * @return configurationWorkspaceName 163 */ 164 protected String getConfigurationWorkspaceName() { 165 return configurationWorkspaceName; 166 } 167 168 /** 169 * {@inheritDoc} 170 * 171 * @see org.jboss.dna.graph.observe.Observable#register(org.jboss.dna.graph.observe.ChangeObserver) 172 */ 173 public boolean register( ChangeObserver observer ) { 174 return observationBus.register(observer); 175 } 176 177 /** 178 * {@inheritDoc} 179 * 180 * @see org.jboss.dna.graph.observe.Observable#unregister(org.jboss.dna.graph.observe.ChangeObserver) 181 */ 182 public boolean unregister( ChangeObserver observer ) { 183 return observationBus.unregister(observer); 184 } 185 186 /** 187 * @return executionContextFactory 188 */ 189 public ExecutionContext getExecutionContext() { 190 return executionContext; 191 } 192 193 /** 194 * @return administrator 195 */ 196 public ServiceAdministrator getAdministrator() { 197 return this.administrator; 198 } 199 200 /** 201 * Utility method called by the administrator. 202 */ 203 protected void start() { 204 // Do not establish connections to the pools; these will be established as needed 205 206 } 207 208 /** 209 * Utility method called by the administrator. 210 */ 211 protected void shutdown() { 212 // Close all connections to the pools. This is done inside the pools write lock. 213 try { 214 this.sourcesLock.readLock().lock(); 215 for (RepositoryConnectionPool pool : this.pools) { 216 pool.shutdown(); 217 } 218 } finally { 219 this.sourcesLock.readLock().unlock(); 220 } 221 // Remove all listeners ... 222 this.observationBus.shutdown(); 223 } 224 225 /** 226 * Utility method called by the administrator. 227 * 228 * @param timeout 229 * @param unit 230 * @return true if all pools were terminated in the supplied time (or were already terminated), or false if the timeout 231 * occurred before all the connections were closed 232 * @throws InterruptedException 233 */ 234 protected boolean awaitTermination( long timeout, 235 TimeUnit unit ) throws InterruptedException { 236 // Check whether all source pools are shut down. This is done inside the pools write lock. 237 try { 238 this.sourcesLock.readLock().lock(); 239 for (RepositoryConnectionPool pool : this.pools) { 240 if (!pool.awaitTermination(timeout, unit)) return false; 241 } 242 return true; 243 } finally { 244 this.sourcesLock.readLock().unlock(); 245 } 246 } 247 248 /** 249 * Returns true if this federated repository is in the process of terminating after {@link ServiceAdministrator#shutdown()} 250 * has been called on the {@link #getAdministrator() administrator}, but the federated repository has connections that have 251 * not yet normally been {@link RepositoryConnection#close() closed}. This method may be useful for debugging. A return of 252 * <tt>true</tt> reported a sufficient period after shutdown may indicate that connection users have ignored or suppressed 253 * interruption, causing this repository not to properly terminate. 254 * 255 * @return true if terminating but not yet terminated, or false otherwise 256 * @see #isTerminated() 257 */ 258 public boolean isTerminating() { 259 try { 260 this.sourcesLock.readLock().lock(); 261 for (RepositoryConnectionPool pool : this.pools) { 262 if (pool.isTerminating()) return true; 263 } 264 return false; 265 } finally { 266 this.sourcesLock.readLock().unlock(); 267 } 268 } 269 270 /** 271 * Return true if this federated repository has completed its termination and no longer has any open connections. 272 * 273 * @return true if terminated, or false otherwise 274 * @see #isTerminating() 275 */ 276 public boolean isTerminated() { 277 try { 278 this.sourcesLock.readLock().lock(); 279 for (RepositoryConnectionPool pool : this.pools) { 280 if (!pool.isTerminated()) return false; 281 } 282 return true; 283 } finally { 284 this.sourcesLock.readLock().unlock(); 285 } 286 } 287 288 /** 289 * Get an unmodifiable collection of {@link RepositorySource} names. 290 * 291 * @return the pools 292 */ 293 public Collection<String> getSourceNames() { 294 Set<String> sourceNames = new HashSet<String>(); 295 for (RepositoryConnectionPool pool : this.pools) { 296 sourceNames.add(pool.getRepositorySource().getName()); 297 } 298 return Collections.unmodifiableCollection(sourceNames); 299 } 300 301 /** 302 * Get an unmodifiable collection of {@link RepositorySource} instances managed by this instance. 303 * 304 * @return the pools 305 */ 306 public Collection<RepositorySource> getSources() { 307 List<RepositorySource> sources = new LinkedList<RepositorySource>(); 308 for (RepositoryConnectionPool pool : this.pools) { 309 sources.add(pool.getRepositorySource()); 310 } 311 return Collections.unmodifiableCollection(sources); 312 } 313 314 /** 315 * Get the RepositorySource with the specified name managed by this instance. 316 * 317 * @param sourceName the name of the source 318 * @return the source, or null if no such source exists in this instance 319 */ 320 public RepositorySource getSource( String sourceName ) { 321 try { 322 this.sourcesLock.readLock().lock(); 323 for (RepositoryConnectionPool existingPool : this.pools) { 324 RepositorySource source = existingPool.getRepositorySource(); 325 if (source.getName().equals(sourceName)) return source; 326 } 327 } finally { 328 this.sourcesLock.readLock().unlock(); 329 } 330 return null; 331 } 332 333 /** 334 * Get the connection pool managing the {@link RepositorySource} with the specified name managed by this instance. 335 * 336 * @param sourceName the name of the source 337 * @return the pool, or null if no such pool exists in this instance 338 */ 339 public RepositoryConnectionPool getConnectionPool( String sourceName ) { 340 try { 341 this.sourcesLock.readLock().lock(); 342 for (RepositoryConnectionPool existingPool : this.pools) { 343 RepositorySource source = existingPool.getRepositorySource(); 344 if (source.getName().equals(sourceName)) return existingPool; 345 } 346 } finally { 347 this.sourcesLock.readLock().unlock(); 348 } 349 return null; 350 } 351 352 /** 353 * Add the supplied federated source. This method returns false if the source is null. 354 * 355 * @param source the source to add 356 * @return true if the source is added, or false if the reference is null or if there is already an existing source with the 357 * supplied name. 358 */ 359 public boolean addSource( RepositorySource source ) { 360 if (source == null) return false; 361 try { 362 this.sourcesLock.writeLock().lock(); 363 final String sourceName = source.getName(); 364 for (RepositoryConnectionPool existingPool : this.pools) { 365 if (existingPool.getRepositorySource().getName().equals(sourceName)) return false; 366 } 367 // Create a repository context for this source ... 368 final ObservationBus observationBus = this.observationBus; 369 RepositoryContext repositoryContext = new RepositoryContext() { 370 /** 371 * {@inheritDoc} 372 * 373 * @see org.jboss.dna.graph.connector.RepositoryContext#getExecutionContext() 374 */ 375 public ExecutionContext getExecutionContext() { 376 return RepositoryLibrary.this.getExecutionContext(); 377 } 378 379 /** 380 * {@inheritDoc} 381 * 382 * @see org.jboss.dna.graph.connector.RepositoryContext#getRepositoryConnectionFactory() 383 */ 384 public RepositoryConnectionFactory getRepositoryConnectionFactory() { 385 return RepositoryLibrary.this; 386 } 387 388 /** 389 * {@inheritDoc} 390 * 391 * @see org.jboss.dna.graph.connector.RepositoryContext#getObserver() 392 */ 393 public Observer getObserver() { 394 return observationBus.hasObservers() ? observationBus : null; 395 } 396 397 /** 398 * {@inheritDoc} 399 * 400 * @see org.jboss.dna.graph.connector.RepositoryContext#getConfiguration(int) 401 */ 402 public Subgraph getConfiguration( int depth ) { 403 Subgraph result = null; 404 RepositorySource configSource = getConfigurationSource(); 405 if (configSource != null) { 406 Graph config = Graph.create(configSource, getExecutionContext()); 407 String workspaceName = getConfigurationWorkspaceName(); 408 if (workspaceName != null) { 409 config.useWorkspace(workspaceName); 410 } 411 Path configPath = getPathToConfigurationRoot(); 412 Path sourcePath = getExecutionContext().getValueFactories().getPathFactory().create(configPath, 413 sourceName); 414 result = config.getSubgraphOfDepth(depth).at(sourcePath); 415 } 416 return result; 417 } 418 }; 419 source.initialize(repositoryContext); 420 RepositoryConnectionPool pool = new RepositoryConnectionPool(source); 421 this.pools.add(pool); 422 return true; 423 } finally { 424 this.sourcesLock.writeLock().unlock(); 425 } 426 } 427 428 /** 429 * Remove from this federated repository the supplied source (or a source with the same name as that supplied). This call 430 * shuts down the connections in the source in an orderly fashion, allowing those connection currently in use to be used and 431 * closed normally, but preventing further connections from being used. 432 * <p> 433 * This method can safely be called while the federation repository is in use. 434 * </p> 435 * 436 * @param source the source to be removed 437 * @param timeToAwait the amount of time to wait while all of the source's connections are closed, or non-positive if the call 438 * should not wait at all 439 * @param unit the time unit to be used for <code>timeToAwait</code> 440 * @return true if the source was removed, or false if the source was not a source for this repository. 441 * @throws InterruptedException if the thread is interrupted while awaiting closing of the connections 442 */ 443 public boolean removeSource( RepositorySource source, 444 long timeToAwait, 445 TimeUnit unit ) throws InterruptedException { 446 // Use the name; don't use the object equality ... 447 return removeSource(source.getName(), timeToAwait, unit) != null; 448 } 449 450 /** 451 * Remove from this federated repository the source with the supplied name. This call shuts down the connections in the source 452 * in an orderly fashion, allowing those connection currently in use to be used and closed normally, but preventing further 453 * connections from being used. 454 * 455 * @param name the name of the source to be removed 456 * @param timeToAwait the amount of time to wait while all of the source's connections are closed, or non-positive if the call 457 * should not wait at all 458 * @param unit the time unit to be used for <code>timeToAwait</code> 459 * @return the source with the supplied name that was removed, or null if no existing source matching the supplied name could 460 * be found 461 * @throws InterruptedException if the thread is interrupted while awaiting closing of the connections 462 */ 463 public RepositorySource removeSource( String name, 464 long timeToAwait, 465 TimeUnit unit ) throws InterruptedException { 466 try { 467 this.sourcesLock.writeLock().lock(); 468 for (RepositoryConnectionPool existingPool : this.pools) { 469 if (existingPool.getRepositorySource().getName().equals(name)) { 470 // Shut down the source ... 471 existingPool.shutdown(); 472 if (timeToAwait > 0L) existingPool.awaitTermination(timeToAwait, unit); 473 } 474 return existingPool.getRepositorySource(); 475 } 476 } finally { 477 this.sourcesLock.writeLock().unlock(); 478 } 479 return null; 480 } 481 482 /** 483 * {@inheritDoc} 484 * 485 * @see org.jboss.dna.graph.connector.RepositoryConnectionFactory#createConnection(java.lang.String) 486 */ 487 public RepositoryConnection createConnection( String sourceName ) { 488 try { 489 this.sourcesLock.readLock().lock(); 490 for (RepositoryConnectionPool existingPool : this.pools) { 491 RepositorySource source = existingPool.getRepositorySource(); 492 if (source.getName().equals(sourceName)) return existingPool.getConnection(); 493 } 494 RepositoryConnectionFactory delegate = this.delegate; 495 if (delegate != null) { 496 return delegate.createConnection(sourceName); 497 } 498 } finally { 499 this.sourcesLock.readLock().unlock(); 500 } 501 return null; 502 } 503 504 protected interface ObservationBus extends Observable, Observer { 505 boolean hasObservers(); 506 507 void shutdown(); 508 } 509 510 protected class InMemoryObservationBus implements ObservationBus { 511 private final ChangeObservers observers = new ChangeObservers(); 512 513 protected InMemoryObservationBus() { 514 } 515 516 /** 517 * {@inheritDoc} 518 * 519 * @see org.jboss.dna.graph.observe.Observable#register(org.jboss.dna.graph.observe.ChangeObserver) 520 */ 521 public boolean register( ChangeObserver observer ) { 522 return observers.register(observer); 523 } 524 525 /** 526 * {@inheritDoc} 527 * 528 * @see org.jboss.dna.graph.observe.Observable#unregister(org.jboss.dna.graph.observe.ChangeObserver) 529 */ 530 public boolean unregister( ChangeObserver observer ) { 531 return observers.unregister(observer); 532 } 533 534 /** 535 * {@inheritDoc} 536 * 537 * @see org.jboss.dna.graph.observe.Observer#notify(org.jboss.dna.graph.observe.Changes) 538 */ 539 public void notify( Changes changes ) { 540 if (changes != null) { 541 // Broadcast the changes to the registered observers ... 542 observers.broadcast(changes); 543 } 544 } 545 546 /** 547 * {@inheritDoc} 548 * 549 * @see org.jboss.dna.repository.RepositoryLibrary.ObservationBus#hasObservers() 550 */ 551 public boolean hasObservers() { 552 return !observers.isEmpty(); 553 } 554 555 /** 556 * {@inheritDoc} 557 * 558 * @see org.jboss.dna.repository.RepositoryLibrary.ObservationBus#shutdown() 559 */ 560 public void shutdown() { 561 observers.shutdown(); 562 } 563 } 564 }