001 /* 002 * JBoss, Home of Professional Open Source. 003 * Copyright 2008, Red Hat Middleware LLC, and individual contributors 004 * as indicated by the @author tags. See the copyright.txt file in the 005 * distribution for a full listing of individual contributors. 006 * 007 * This is free software; you can redistribute it and/or modify it 008 * under the terms of the GNU Lesser General Public License as 009 * published by the Free Software Foundation; either version 2.1 of 010 * the License, or (at your option) any later version. 011 * 012 * This software is distributed in the hope that it will be useful, 013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 015 * Lesser General Public License for more details. 016 * 017 * You should have received a copy of the GNU Lesser General Public 018 * License along with this software; if not, write to the Free 019 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 020 * 02110-1301 USA, or see the FSF site: http://www.fsf.org. 021 */ 022 package org.jboss.dna.graph.connectors; 023 024 import java.util.Collection; 025 import java.util.HashSet; 026 import java.util.LinkedList; 027 import java.util.Set; 028 import java.util.concurrent.BlockingQueue; 029 import java.util.concurrent.LinkedBlockingQueue; 030 import java.util.concurrent.TimeUnit; 031 import java.util.concurrent.atomic.AtomicBoolean; 032 import java.util.concurrent.atomic.AtomicInteger; 033 import java.util.concurrent.atomic.AtomicLong; 034 import java.util.concurrent.locks.Condition; 035 import java.util.concurrent.locks.ReentrantLock; 036 import javax.transaction.xa.XAResource; 037 import net.jcip.annotations.GuardedBy; 038 import net.jcip.annotations.ThreadSafe; 039 import org.jboss.dna.common.util.CheckArg; 040 import org.jboss.dna.common.util.Logger; 041 import org.jboss.dna.graph.ExecutionContext; 042 import org.jboss.dna.graph.GraphI18n; 043 import org.jboss.dna.graph.cache.CachePolicy; 044 import org.jboss.dna.graph.commands.GraphCommand; 045 046 /** 047 * A reusable implementation of a managed pool of connections that is optimized for safe concurrent operations. 048 * 049 * @author Randall Hauch 050 */ 051 @ThreadSafe 052 public class RepositoryConnectionPool { 053 054 /** 055 * The core pool size for default-constructed pools is {@value} . 056 */ 057 public static final int DEFAULT_CORE_POOL_SIZE = 1; 058 059 /** 060 * The maximum pool size for default-constructed pools is {@value} . 061 */ 062 public static final int DEFAULT_MAXIMUM_POOL_SIZE = 10; 063 064 /** 065 * The keep-alive time for connections in default-constructed pools is {@value} seconds. 066 */ 067 public static final long DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS = 30; 068 069 /** 070 * Permission for checking shutdown 071 */ 072 private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); 073 074 /** 075 * The source that this pool uses to create new connections. 076 */ 077 private final RepositorySource source; 078 079 /** 080 * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and workers set. 081 */ 082 private final ReentrantLock mainLock = new ReentrantLock(); 083 084 /** 085 * Wait condition to support awaitTermination 086 */ 087 private final Condition termination = mainLock.newCondition(); 088 089 /** 090 * Set containing all connections that are available for use. 091 */ 092 @GuardedBy( "mainLock" ) 093 private final BlockingQueue<ConnectionWrapper> availableConnections = new LinkedBlockingQueue<ConnectionWrapper>(); 094 095 /** 096 * The connections that are currently in use. 097 */ 098 @GuardedBy( "mainLock" ) 099 private final Set<ConnectionWrapper> inUseConnections = new HashSet<ConnectionWrapper>(); 100 101 /** 102 * Timeout in nanoseconds for idle connections waiting to be used. Threads use this timeout only when there are more than 103 * corePoolSize present. Otherwise they wait forever to be used. 104 */ 105 private volatile long keepAliveTime; 106 107 /** 108 * The target pool size, updated only while holding mainLock, but volatile to allow concurrent readability even during 109 * updates. 110 */ 111 @GuardedBy( "mainLock" ) 112 private volatile int corePoolSize; 113 114 /** 115 * Maximum pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates. 116 */ 117 @GuardedBy( "mainLock" ) 118 private volatile int maximumPoolSize; 119 120 /** 121 * Current pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates. 122 */ 123 @GuardedBy( "mainLock" ) 124 private volatile int poolSize; 125 126 /** 127 * Lifecycle state, updated only while holding mainLock but volatile to allow concurrent readability even during updates. 128 */ 129 @GuardedBy( "mainLock" ) 130 private volatile int runState; 131 132 // Special values for runState 133 /** Normal, not-shutdown mode */ 134 static final int RUNNING = 0; 135 /** Controlled shutdown mode */ 136 static final int SHUTDOWN = 1; 137 /** Immediate shutdown mode */ 138 static final int STOP = 2; 139 /** Final state */ 140 static final int TERMINATED = 3; 141 142 /** 143 * Flag specifying whether a connection should be validated before returning it from the {@link #getConnection()} method. 144 */ 145 private final AtomicBoolean validateConnectionBeforeUse = new AtomicBoolean(false); 146 147 /** 148 * The time in nanoseconds that ping should wait before timing out and failing. 149 */ 150 private final AtomicLong pingTimeout = new AtomicLong(0); 151 152 /** 153 * The number of times an attempt to obtain a connection should fail with invalid connections before throwing an exception. 154 */ 155 private final AtomicInteger maxFailedAttemptsBeforeError = new AtomicInteger(10); 156 157 private final AtomicLong totalConnectionsCreated = new AtomicLong(0); 158 159 private final AtomicLong totalConnectionsUsed = new AtomicLong(0); 160 161 private final Logger logger = Logger.getLogger(this.getClass()); 162 163 /** 164 * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}. This constructor 165 * uses the {@link #DEFAULT_CORE_POOL_SIZE default core pool size}, {@link #DEFAULT_MAXIMUM_POOL_SIZE default maximum pool 166 * size}, and {@link #DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS default keep-alive time (in seconds)}. 167 * 168 * @param source the source for connections 169 * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid 170 */ 171 public RepositoryConnectionPool( RepositorySource source ) { 172 this(source, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS, TimeUnit.SECONDS); 173 } 174 175 /** 176 * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}. 177 * 178 * @param source the source for connections 179 * @param corePoolSize the number of connections to keep in the pool, even if they are idle. 180 * @param maximumPoolSize the maximum number of connections to allow in the pool. 181 * @param keepAliveTime when the number of connection is greater than the core, this is the maximum time that excess idle 182 * connections will be kept before terminating. 183 * @param unit the time unit for the keepAliveTime argument. 184 * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid 185 */ 186 public RepositoryConnectionPool( RepositorySource source, 187 int corePoolSize, 188 int maximumPoolSize, 189 long keepAliveTime, 190 TimeUnit unit ) { 191 CheckArg.isNonNegative(corePoolSize, "corePoolSize"); 192 CheckArg.isPositive(maximumPoolSize, "maximumPoolSize"); 193 CheckArg.isNonNegative(keepAliveTime, "keepAliveTime"); 194 CheckArg.isNotNull(source, "source"); 195 if (maximumPoolSize < corePoolSize) { 196 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text()); 197 } 198 this.source = source; 199 this.corePoolSize = corePoolSize; 200 this.maximumPoolSize = maximumPoolSize; 201 this.keepAliveTime = unit.toNanos(keepAliveTime); 202 this.setPingTimeout(100, TimeUnit.MILLISECONDS); 203 } 204 205 /** 206 * Get the {@link RepositorySource} that's used by this pool. 207 * 208 * @return the repository source; never null 209 */ 210 public final RepositorySource getRepositorySource() { 211 return source; 212 } 213 214 /** 215 * Get the name of this pool, which delegates to the connection factory. 216 * 217 * @return the name of the source 218 */ 219 protected String getSourceName() { 220 return source.getName(); 221 } 222 223 // ------------------------------------------------- 224 // Property settings ... 225 // ------------------------------------------------- 226 227 /** 228 * @return validateConnectionBeforeUse 229 */ 230 public boolean getValidateConnectionBeforeUse() { 231 return this.validateConnectionBeforeUse.get(); 232 } 233 234 /** 235 * @param validateConnectionBeforeUse Sets validateConnectionBeforeUse to the specified value. 236 */ 237 public void setValidateConnectionBeforeUse( boolean validateConnectionBeforeUse ) { 238 this.validateConnectionBeforeUse.set(validateConnectionBeforeUse); 239 } 240 241 /** 242 * @return pingTimeout 243 */ 244 public long getPingTimeoutInNanos() { 245 return this.pingTimeout.get(); 246 } 247 248 /** 249 * @param pingTimeout the time to wait for a ping to complete 250 * @param unit the time unit of the time argument 251 */ 252 public void setPingTimeout( long pingTimeout, 253 TimeUnit unit ) { 254 CheckArg.isNonNegative(pingTimeout, "time"); 255 this.pingTimeout.set(unit.toNanos(pingTimeout)); 256 } 257 258 /** 259 * @return maxFailedAttemptsBeforeError 260 */ 261 public int getMaxFailedAttemptsBeforeError() { 262 return this.maxFailedAttemptsBeforeError.get(); 263 } 264 265 /** 266 * @param maxFailedAttemptsBeforeError Sets maxFailedAttemptsBeforeError to the specified value. 267 */ 268 public void setMaxFailedAttemptsBeforeError( int maxFailedAttemptsBeforeError ) { 269 this.maxFailedAttemptsBeforeError.set(maxFailedAttemptsBeforeError); 270 } 271 272 /** 273 * Sets the time limit for which connections may remain idle before being closed. If there are more than the core number of 274 * connections currently in the pool, after waiting this amount of time without being used, excess threads will be terminated. 275 * This overrides any value set in the constructor. 276 * 277 * @param time the time to wait. A time value of zero will cause excess connections to terminate immediately after being 278 * returned. 279 * @param unit the time unit of the time argument 280 * @throws IllegalArgumentException if time less than zero 281 * @see #getKeepAliveTime 282 */ 283 public void setKeepAliveTime( long time, 284 TimeUnit unit ) { 285 CheckArg.isNonNegative(time, "time"); 286 this.keepAliveTime = unit.toNanos(time); 287 } 288 289 /** 290 * Returns the connection keep-alive time, which is the amount of time which connections in excess of the core pool size may 291 * remain idle before being closed. 292 * 293 * @param unit the desired time unit of the result 294 * @return the time limit 295 * @see #setKeepAliveTime 296 */ 297 public long getKeepAliveTime( TimeUnit unit ) { 298 assert unit != null; 299 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); 300 } 301 302 /** 303 * @return maximumPoolSize 304 */ 305 public int getMaximumPoolSize() { 306 return this.maximumPoolSize; 307 } 308 309 /** 310 * Sets the maximum allowed number of connections. This overrides any value set in the constructor. If the new value is 311 * smaller than the current value, excess existing but unused connections will be closed. 312 * 313 * @param maximumPoolSize the new maximum 314 * @throws IllegalArgumentException if maximumPoolSize less than zero or the {@link #getCorePoolSize() core pool size} 315 * @see #getMaximumPoolSize 316 */ 317 public void setMaximumPoolSize( int maximumPoolSize ) { 318 CheckArg.isPositive(maximumPoolSize, "maximum pool size"); 319 if (maximumPoolSize < corePoolSize) { 320 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text()); 321 } 322 final ReentrantLock mainLock = this.mainLock; 323 try { 324 mainLock.lock(); 325 int extra = this.maximumPoolSize - maximumPoolSize; 326 this.maximumPoolSize = maximumPoolSize; 327 if (extra > 0 && poolSize > maximumPoolSize) { 328 // Drain the extra connections from those available ... 329 drainUnusedConnections(extra); 330 } 331 } finally { 332 mainLock.unlock(); 333 } 334 } 335 336 /** 337 * Returns the core number of connections. 338 * 339 * @return the core number of connections 340 * @see #setCorePoolSize(int) 341 */ 342 public int getCorePoolSize() { 343 return this.corePoolSize; 344 } 345 346 /** 347 * Sets the core number of connections. This overrides any value set in the constructor. If the new value is smaller than the 348 * current value, excess existing and unused connections will be closed. If larger, new connections will, if needed, be 349 * created. 350 * 351 * @param corePoolSize the new core size 352 * @throws RepositorySourceException if there was an error obtaining the new connection 353 * @throws InterruptedException if the thread was interrupted during the operation 354 * @throws IllegalArgumentException if <tt>corePoolSize</tt> less than zero 355 * @see #getCorePoolSize() 356 */ 357 public void setCorePoolSize( int corePoolSize ) throws RepositorySourceException, InterruptedException { 358 CheckArg.isNonNegative(corePoolSize, "core pool size"); 359 if (maximumPoolSize < corePoolSize) { 360 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text()); 361 } 362 final ReentrantLock mainLock = this.mainLock; 363 try { 364 mainLock.lock(); 365 int extra = this.corePoolSize - corePoolSize; 366 this.corePoolSize = corePoolSize; 367 if (extra < 0) { 368 // Add connections ... 369 addConnectionsIfUnderCorePoolSize(); 370 } else if (extra > 0 && poolSize > corePoolSize) { 371 // Drain the extra connections from those available ... 372 drainUnusedConnections(extra); 373 } 374 } finally { 375 mainLock.unlock(); 376 } 377 } 378 379 // ------------------------------------------------- 380 // Statistics ... 381 // ------------------------------------------------- 382 383 /** 384 * Returns the current number of connections in the pool, including those that are checked out (in use) and those that are not 385 * being used. 386 * 387 * @return the number of connections 388 */ 389 public int getPoolSize() { 390 return poolSize; 391 } 392 393 /** 394 * Returns the approximate number of connections that are currently checked out from the pool. 395 * 396 * @return the number of checked-out connections 397 */ 398 public int getInUseCount() { 399 final ReentrantLock mainLock = this.mainLock; 400 try { 401 mainLock.lock(); 402 return this.inUseConnections.size(); 403 } finally { 404 mainLock.unlock(); 405 } 406 } 407 408 /** 409 * Get the total number of connections that have been created by this pool. 410 * 411 * @return the total number of connections created by this pool 412 */ 413 public long getTotalConnectionsCreated() { 414 return this.totalConnectionsCreated.get(); 415 } 416 417 /** 418 * Get the total number of times connections have been {@link #getConnection()} used. 419 * 420 * @return the total number 421 */ 422 public long getTotalConnectionsUsed() { 423 return this.totalConnectionsUsed.get(); 424 } 425 426 // ------------------------------------------------- 427 // State management methods ... 428 // ------------------------------------------------- 429 430 /** 431 * Starts a core connection, causing it to idly wait for use. This overrides the default policy of starting core connections 432 * only when they are {@link #getConnection() needed}. This method will return <tt>false</tt> if all core connections have 433 * already been started. 434 * 435 * @return true if a connection was started 436 * @throws RepositorySourceException if there was an error obtaining the new connection 437 * @throws InterruptedException if the thread was interrupted during the operation 438 */ 439 public boolean prestartCoreConnection() throws RepositorySourceException, InterruptedException { 440 final ReentrantLock mainLock = this.mainLock; 441 try { 442 mainLock.lock(); 443 return addConnectionIfUnderCorePoolSize(); 444 } finally { 445 mainLock.unlock(); 446 } 447 } 448 449 /** 450 * Starts all core connections, causing them to idly wait for use. This overrides the default policy of starting core 451 * connections only when they are {@link #getConnection() needed}. 452 * 453 * @return the number of connections started. 454 * @throws RepositorySourceException if there was an error obtaining the new connection 455 * @throws InterruptedException if the thread was interrupted during the operation 456 */ 457 public int prestartAllCoreConnections() throws RepositorySourceException, InterruptedException { 458 final ReentrantLock mainLock = this.mainLock; 459 try { 460 mainLock.lock(); 461 return addConnectionsIfUnderCorePoolSize(); 462 } finally { 463 mainLock.unlock(); 464 } 465 } 466 467 /** 468 * Initiates an orderly shutdown of the pool in which connections that are currently in use are allowed to be used and closed 469 * as normal, but no new connections will be created. Invocation has no additional effect if already shut down. 470 * <p> 471 * Once the pool has been shutdown, it may not be used to {@link #getConnection() get connections}. 472 * </p> 473 * 474 * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller 475 * is not permitted to modify because it does not hold {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>, 476 * or the security manager's <tt>checkAccess</tt> method denies access. 477 * @see #shutdownNow() 478 */ 479 public void shutdown() { 480 // Fail if caller doesn't have modifyThread permission. We 481 // explicitly check permissions directly because we can't trust 482 // implementations of SecurityManager to correctly override 483 // the "check access" methods such that our documented 484 // security policy is implemented. 485 SecurityManager security = System.getSecurityManager(); 486 if (security != null) java.security.AccessController.checkPermission(shutdownPerm); 487 488 this.logger.debug("Shutting down repository connection pool for {0}", getSourceName()); 489 boolean fullyTerminated = false; 490 final ReentrantLock mainLock = this.mainLock; 491 try { 492 mainLock.lock(); 493 int state = this.runState; 494 if (state == RUNNING) { 495 // don't override shutdownNow 496 this.runState = SHUTDOWN; 497 } 498 499 // Kill the maintenance thread ... 500 501 // Remove and close all available connections ... 502 if (!this.availableConnections.isEmpty()) { 503 // Drain the extra connections from those available ... 504 drainUnusedConnections(this.availableConnections.size()); 505 } 506 507 // If there are no connections being used, trigger full termination now ... 508 if (this.inUseConnections.isEmpty()) { 509 fullyTerminated = true; 510 this.logger.trace("Signalling termination of repository connection pool for {0}", getSourceName()); 511 runState = TERMINATED; 512 termination.signalAll(); 513 this.logger.debug("Terminated repository connection pool for {0}", getSourceName()); 514 } 515 // Otherwise the last connection that is closed will transition the runState to TERMINATED ... 516 } finally { 517 mainLock.unlock(); 518 } 519 if (fullyTerminated) terminated(); 520 } 521 522 /** 523 * Attempts to close all connections in the pool, including those connections currently in use, and prevent the use of other 524 * connections. 525 * 526 * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller 527 * is not permitted to modify because it does not hold {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>, 528 * or the security manager's <tt>checkAccess</tt> method denies access. 529 * @see #shutdown() 530 */ 531 public void shutdownNow() { 532 // Almost the same code as shutdown() 533 SecurityManager security = System.getSecurityManager(); 534 if (security != null) java.security.AccessController.checkPermission(shutdownPerm); 535 536 this.logger.debug("Shutting down (immediately) repository connection pool for {0}", getSourceName()); 537 boolean fullyTerminated = false; 538 final ReentrantLock mainLock = this.mainLock; 539 try { 540 mainLock.lock(); 541 int state = this.runState; 542 if (state != TERMINATED) { 543 // don't override shutdownNow 544 this.runState = STOP; 545 } 546 547 // Kill the maintenance thread ... 548 549 // Remove and close all available connections ... 550 if (!this.availableConnections.isEmpty()) { 551 // Drain the extra connections from those available ... 552 drainUnusedConnections(this.availableConnections.size()); 553 } 554 555 // If there are connections being used, close them now ... 556 if (!this.inUseConnections.isEmpty()) { 557 for (ConnectionWrapper connectionInUse : this.inUseConnections) { 558 this.logger.trace("Closing repository connection to {0}", getSourceName()); 559 connectionInUse.getOriginal().close(); 560 } 561 this.poolSize -= this.inUseConnections.size(); 562 // The last connection that is closed will transition the runState to TERMINATED ... 563 } else { 564 // There are no connections in use, so trigger full termination now ... 565 fullyTerminated = true; 566 this.logger.trace("Signalling termination of repository connection pool for {0}", getSourceName()); 567 runState = TERMINATED; 568 termination.signalAll(); 569 this.logger.debug("Terminated repository connection pool for {0}", getSourceName()); 570 } 571 572 } finally { 573 mainLock.unlock(); 574 } 575 if (fullyTerminated) terminated(); 576 } 577 578 /** 579 * Return whether this connection pool is running and is able to {@link #getConnection() provide connections}. Note that this 580 * method is effectively <code>!isShutdown()</code>. 581 * 582 * @return true if this pool is running, or false otherwise 583 * @see #isShutdown() 584 * @see #isTerminated() 585 * @see #isTerminating() 586 */ 587 public boolean isRunning() { 588 return runState == RUNNING; 589 } 590 591 /** 592 * Return whether this connection pool is in the process of shutting down or has already been shut down. A result of 593 * <code>true</code> signals that the pool may no longer be used. Note that this method is effectively 594 * <code>!isRunning()</code>. 595 * 596 * @return true if this pool has been shut down, or false otherwise 597 * @see #isShutdown() 598 * @see #isTerminated() 599 * @see #isTerminating() 600 */ 601 public boolean isShutdown() { 602 return runState != RUNNING; 603 } 604 605 /** 606 * Returns true if this pool is in the process of terminating after {@link #shutdown()} or {@link #shutdownNow()} has been 607 * called but has not completely terminated. This method may be useful for debugging. A return of <tt>true</tt> reported a 608 * sufficient period after shutdown may indicate that submitted tasks have ignored or suppressed interruption, causing this 609 * executor not to properly terminate. 610 * 611 * @return true if terminating but not yet terminated, or false otherwise 612 * @see #isTerminated() 613 */ 614 public boolean isTerminating() { 615 return runState == STOP; 616 } 617 618 /** 619 * Return true if this pool has completed its termination and no longer has any open connections. 620 * 621 * @return true if terminated, or false otherwise 622 * @see #isTerminating() 623 */ 624 public boolean isTerminated() { 625 return runState == TERMINATED; 626 } 627 628 /** 629 * Method that can be called after {@link #shutdown()} or {@link #shutdownNow()} to wait until all connections in use at the 630 * time those methods were called have been closed normally. This method accepts a maximum time duration, after which it will 631 * return even if all connections have not been closed. 632 * 633 * @param timeout the maximum time to wait for all connections to be closed and returned to the pool 634 * @param unit the time unit for <code>timeout</code> 635 * @return true if the pool was terminated in the supplied time (or was already terminated), or false if the timeout occurred 636 * before all the connections were closed 637 * @throws InterruptedException if the thread was interrupted 638 */ 639 public boolean awaitTermination( long timeout, 640 TimeUnit unit ) throws InterruptedException { 641 this.logger.trace("Awaiting termination"); 642 long nanos = unit.toNanos(timeout); 643 final ReentrantLock mainLock = this.mainLock; 644 try { 645 mainLock.lock(); 646 for (;;) { 647 // this.logger.trace("---> Run state = {0}; condition = {1}, {2} open", runState, termination, poolSize); 648 if (runState == TERMINATED) return true; 649 if (nanos <= 0) return false; 650 nanos = termination.awaitNanos(nanos); 651 //this.logger.trace("---> Done waiting: run state = {0}; condition = {1}, {2} open",runState,termination,poolSize) 652 // ; 653 } 654 } finally { 655 mainLock.unlock(); 656 this.logger.trace("Finished awaiting termination"); 657 } 658 } 659 660 /** 661 * Method invoked when the pool has terminated. Default implementation does nothing. Note: To properly nest multiple 662 * overridings, subclasses should generally invoke <tt>super.terminated</tt> within this method. 663 */ 664 protected void terminated() { 665 } 666 667 /** 668 * Invokes <tt>shutdown</tt> when this pool is no longer referenced. 669 */ 670 @Override 671 protected void finalize() { 672 shutdown(); 673 } 674 675 // ------------------------------------------------- 676 // Connection management methods ... 677 // ------------------------------------------------- 678 679 /** 680 * Get a connection from the pool. This method either returns an unused connection if one is available, creates a connection 681 * if there is still room in the pool, or blocks until a connection becomes available if the pool already contains the maximum 682 * number of connections and all connections are currently being used. 683 * 684 * @return a connection 685 * @throws RepositorySourceException if there is a problem obtaining a connection 686 * @throws IllegalStateException if the factory is not in a state to create or return connections 687 */ 688 public RepositoryConnection getConnection() throws RepositorySourceException { 689 int attemptsAllowed = this.maxFailedAttemptsBeforeError.get(); 690 ConnectionWrapper connection = null; 691 // Do this until we get a good connection ... 692 int attemptsRemaining = attemptsAllowed; 693 while (connection == null && attemptsRemaining > 0) { 694 --attemptsRemaining; 695 ReentrantLock mainLock = this.mainLock; 696 try { 697 mainLock.lock(); 698 // If we're shutting down the pool, then just close the connection ... 699 if (this.runState != RUNNING) { 700 throw new IllegalStateException(GraphI18n.repositoryConnectionPoolIsNotRunning.text()); 701 } 702 // If there are fewer total connections than the core size ... 703 if (this.poolSize < this.corePoolSize) { 704 // Immediately create a wrapped connection and return it ... 705 connection = newWrappedConnection(); 706 } 707 // Peek to see if there is a connection available ... 708 else if (this.availableConnections.peek() != null) { 709 // There is, so take it and return it ... 710 try { 711 connection = this.availableConnections.take(); 712 } catch (InterruptedException e) { 713 this.logger.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName()); 714 Thread.interrupted(); 715 throw new RepositorySourceException(getSourceName(), e); 716 } 717 } 718 // There is no connection available. If there are fewer total connections than the maximum size ... 719 else if (this.poolSize < this.maximumPoolSize) { 720 // Immediately create a wrapped connection and return it ... 721 connection = newWrappedConnection(); 722 } 723 if (connection != null) { 724 this.inUseConnections.add(connection); 725 } 726 } finally { 727 mainLock.unlock(); 728 } 729 if (connection == null) { 730 // There are not enough connections, so wait in line for the next available connection ... 731 this.logger.trace("Waiting for a repository connection from pool {0}", getSourceName()); 732 try { 733 connection = this.availableConnections.take(); 734 } catch (InterruptedException e) { 735 this.logger.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName()); 736 Thread.interrupted(); 737 throw new RepositorySourceException(getSourceName(), e); 738 } 739 mainLock = this.mainLock; 740 mainLock.lock(); 741 try { 742 if (connection != null) { 743 this.inUseConnections.add(connection); 744 } 745 } finally { 746 mainLock.unlock(); 747 } 748 this.logger.trace("Recieved a repository connection from pool {0}", getSourceName()); 749 } 750 if (connection != null && this.validateConnectionBeforeUse.get()) { 751 try { 752 connection = validateConnection(connection); 753 } catch (InterruptedException e) { 754 this.logger.trace("Cancelled validating a repository connection obtained from pool {0}", getSourceName()); 755 returnConnection(connection); 756 Thread.interrupted(); 757 throw new RepositorySourceException(getSourceName(), e); 758 } 759 } 760 } 761 if (connection == null) { 762 // We were unable to obtain a usable connection, so fail ... 763 throw new RepositorySourceException(GraphI18n.unableToObtainValidRepositoryAfterAttempts.text(attemptsAllowed)); 764 } 765 this.totalConnectionsUsed.incrementAndGet(); 766 return connection; 767 } 768 769 /** 770 * This method is automatically called by the {@link ConnectionWrapper} when it is {@link ConnectionWrapper#close() closed}. 771 * 772 * @param wrapper the wrapper to the connection that is being returned to the pool 773 */ 774 protected void returnConnection( ConnectionWrapper wrapper ) { 775 assert wrapper != null; 776 ConnectionWrapper wrapperToClose = null; 777 final ReentrantLock mainLock = this.mainLock; 778 try { 779 mainLock.lock(); 780 // Remove the connection from the in-use set ... 781 boolean removed = this.inUseConnections.remove(wrapper); 782 assert removed; 783 784 // If we're shutting down the pool, then just close the connection ... 785 if (this.runState != RUNNING) { 786 wrapperToClose = wrapper; 787 } 788 // If there are more connections than the maximum size... 789 else if (this.poolSize > this.maximumPoolSize) { 790 // Immediately close this connection ... 791 wrapperToClose = wrapper; 792 } 793 // Attempt to make the connection available (this should generally work, unless there is an upper limit 794 // to the number of available connections) ... 795 else if (!this.availableConnections.offer(new ConnectionWrapper(wrapper.getOriginal()))) { 796 // The pool of available connection is full, so release it ... 797 wrapperToClose = wrapper; 798 } 799 } finally { 800 mainLock.unlock(); 801 } 802 // Close the connection if we're supposed to (do it outside of the main lock)... 803 if (wrapperToClose != null) { 804 closeConnection(wrapperToClose); 805 } 806 } 807 808 /** 809 * Validate the supplied connection, returning the connection if valid or null if the connection is not valid. 810 * 811 * @param connection the connection to be validated; may not be null 812 * @return the validated connection, or null if the connection did not validate and was removed from the pool 813 * @throws InterruptedException if the thread is interrupted while validating the connection 814 */ 815 protected ConnectionWrapper validateConnection( ConnectionWrapper connection ) throws InterruptedException { 816 assert connection != null; 817 ConnectionWrapper invalidConnection = null; 818 try { 819 if (!connection.ping(this.pingTimeout.get(), TimeUnit.NANOSECONDS)) { 820 invalidConnection = connection; 821 } 822 } finally { 823 if (invalidConnection != null) { 824 connection = null; 825 returnConnection(invalidConnection); 826 } 827 } 828 return connection; 829 } 830 831 /** 832 * Obtain a new connection wrapped in a {@link ConnectionWrapper}. This method does not check whether creating the new 833 * connection would violate the {@link #maximumPoolSize maximum pool size} nor does it add the new connection to the 834 * {@link #availableConnections available connections} (as the caller may want it immediately), but it does increment the 835 * {@link #poolSize pool size}. 836 * 837 * @return the connection wrapper with a new connection 838 * @throws RepositorySourceException if there was an error obtaining the new connection 839 */ 840 @GuardedBy( "mainLock" ) 841 protected ConnectionWrapper newWrappedConnection() throws RepositorySourceException { 842 RepositoryConnection connection = this.source.getConnection(); 843 ++this.poolSize; 844 this.totalConnectionsCreated.incrementAndGet(); 845 return new ConnectionWrapper(connection); 846 } 847 848 /** 849 * Close a connection that is in the pool but no longer in the {@link #availableConnections available connections}. This 850 * method does decrement the {@link #poolSize pool size}. 851 * 852 * @param wrapper the wrapper for the connection to be closed 853 */ 854 protected void closeConnection( ConnectionWrapper wrapper ) { 855 assert wrapper != null; 856 RepositoryConnection original = wrapper.getOriginal(); 857 assert original != null; 858 try { 859 this.logger.debug("Closing repository connection to {0} ({1} open connections remain)", getSourceName(), poolSize); 860 original.close(); 861 } finally { 862 final ReentrantLock mainLock = this.mainLock; 863 try { 864 mainLock.lock(); 865 // No matter what reduce the pool size count 866 --this.poolSize; 867 // And if shutting down and this was the last connection being used... 868 if ((runState == SHUTDOWN || runState == STOP) && this.poolSize <= 0) { 869 // then signal anybody that has called "awaitTermination(...)" 870 this.logger.trace("Signalling termination of repository connection pool for {0}", getSourceName()); 871 this.runState = TERMINATED; 872 this.termination.signalAll(); 873 this.logger.trace("Terminated repository connection pool for {0}", getSourceName()); 874 875 // fall through to call terminate() outside of lock. 876 } 877 } finally { 878 mainLock.unlock(); 879 } 880 } 881 } 882 883 @GuardedBy( "mainLock" ) 884 protected int drainUnusedConnections( int count ) { 885 if (count <= 0) return 0; 886 this.logger.trace("Draining up to {0} unused repository connections to {1}", count, getSourceName()); 887 // Drain the extra connections from those available ... 888 Collection<ConnectionWrapper> extraConnections = new LinkedList<ConnectionWrapper>(); 889 this.availableConnections.drainTo(extraConnections, count); 890 for (ConnectionWrapper connection : extraConnections) { 891 this.logger.trace("Closing repository connection to {0}", getSourceName()); 892 connection.getOriginal().close(); 893 } 894 int numClosed = extraConnections.size(); 895 this.poolSize -= numClosed; 896 this.logger.trace("Drained {0} unused connections ({1} open connections remain)", numClosed, poolSize); 897 return numClosed; 898 } 899 900 @GuardedBy( "mainLock" ) 901 protected boolean addConnectionIfUnderCorePoolSize() throws RepositorySourceException { 902 // Add connection ... 903 if (this.poolSize < this.corePoolSize) { 904 this.availableConnections.offer(newWrappedConnection()); 905 this.logger.trace("Added connection to {0} in undersized pool", getSourceName()); 906 return true; 907 } 908 return false; 909 } 910 911 @GuardedBy( "mainLock" ) 912 protected int addConnectionsIfUnderCorePoolSize() throws RepositorySourceException { 913 // Add connections ... 914 int n = 0; 915 while (this.poolSize < this.corePoolSize) { 916 this.availableConnections.offer(newWrappedConnection()); 917 ++n; 918 } 919 this.logger.trace("Added {0} connection(s) to {1} in undersized pool", n, getSourceName()); 920 return n; 921 } 922 923 protected class ConnectionWrapper implements RepositoryConnection { 924 925 private final RepositoryConnection original; 926 private final long timeCreated; 927 private long lastUsed; 928 private boolean closed = false; 929 930 protected ConnectionWrapper( RepositoryConnection connection ) { 931 assert connection != null; 932 this.original = connection; 933 this.timeCreated = System.currentTimeMillis(); 934 } 935 936 /** 937 * @return original 938 */ 939 protected RepositoryConnection getOriginal() { 940 return this.original; 941 } 942 943 /** 944 * @return lastUsed 945 */ 946 public long getTimeLastUsed() { 947 return this.lastUsed; 948 } 949 950 /** 951 * @return timeCreated 952 */ 953 public long getTimeCreated() { 954 return this.timeCreated; 955 } 956 957 /** 958 * {@inheritDoc} 959 */ 960 public String getSourceName() { 961 return this.original.getSourceName(); 962 } 963 964 /** 965 * {@inheritDoc} 966 */ 967 public XAResource getXAResource() { 968 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text()); 969 return this.original.getXAResource(); 970 } 971 972 /** 973 * {@inheritDoc} 974 */ 975 public CachePolicy getDefaultCachePolicy() { 976 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text()); 977 return this.original.getDefaultCachePolicy(); 978 } 979 980 /** 981 * {@inheritDoc} 982 */ 983 public void execute( ExecutionContext context, 984 GraphCommand... commands ) throws RepositorySourceException { 985 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text()); 986 this.original.execute(context, commands); 987 } 988 989 /** 990 * {@inheritDoc} 991 */ 992 public boolean ping( long time, 993 TimeUnit unit ) throws InterruptedException { 994 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text()); 995 return this.original.ping(time, unit); 996 } 997 998 /** 999 * {@inheritDoc} 1000 */ 1001 public void close() { 1002 if (!closed) { 1003 this.lastUsed = System.currentTimeMillis(); 1004 this.original.close(); 1005 this.closed = true; 1006 returnConnection(this); 1007 } 1008 } 1009 1010 /** 1011 * {@inheritDoc} 1012 */ 1013 public void setListener( RepositorySourceListener listener ) { 1014 if (!closed) this.original.setListener(listener); 1015 } 1016 1017 } 1018 1019 }