001 /*
002 * JBoss DNA (http://www.jboss.org/dna)
003 * See the COPYRIGHT.txt file distributed with this work for information
004 * regarding copyright ownership. Some portions may be licensed
005 * to Red Hat, Inc. under one or more contributor license agreements.
006 * See the AUTHORS.txt file in the distribution for a full listing of
007 * individual contributors.
008 *
009 * JBoss DNA is free software. Unless otherwise indicated, all code in JBoss DNA
010 * is licensed to you under the terms of the GNU Lesser General Public License as
011 * published by the Free Software Foundation; either version 2.1 of
012 * the License, or (at your option) any later version.
013 *
014 * JBoss DNA is distributed in the hope that it will be useful,
015 * but WITHOUT ANY WARRANTY; without even the implied warranty of
016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
017 * Lesser General Public License for more details.
018 *
019 * You should have received a copy of the GNU Lesser General Public
020 * License along with this software; if not, write to the Free
021 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
022 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
023 */
024 package org.jboss.dna.graph.connector;
025
026 import java.util.Collection;
027 import java.util.HashSet;
028 import java.util.LinkedList;
029 import java.util.Set;
030 import java.util.concurrent.BlockingQueue;
031 import java.util.concurrent.LinkedBlockingQueue;
032 import java.util.concurrent.TimeUnit;
033 import java.util.concurrent.atomic.AtomicBoolean;
034 import java.util.concurrent.atomic.AtomicInteger;
035 import java.util.concurrent.atomic.AtomicLong;
036 import java.util.concurrent.locks.Condition;
037 import java.util.concurrent.locks.ReentrantLock;
038 import javax.transaction.xa.XAResource;
039 import net.jcip.annotations.GuardedBy;
040 import net.jcip.annotations.ThreadSafe;
041 import org.jboss.dna.common.util.CheckArg;
042 import org.jboss.dna.common.util.Logger;
043 import org.jboss.dna.graph.ExecutionContext;
044 import org.jboss.dna.graph.GraphI18n;
045 import org.jboss.dna.graph.cache.CachePolicy;
046 import org.jboss.dna.graph.request.Request;
047
048 /**
049 * A reusable implementation of a managed pool of connections that is optimized for safe concurrent operations.
050 *
051 * @author Randall Hauch
052 */
053 @ThreadSafe
054 public class RepositoryConnectionPool {
055
056 /**
057 * The core pool size for default-constructed pools is {@value} .
058 */
059 public static final int DEFAULT_CORE_POOL_SIZE = 1;
060
061 /**
062 * The maximum pool size for default-constructed pools is {@value} .
063 */
064 public static final int DEFAULT_MAXIMUM_POOL_SIZE = 10;
065
066 /**
067 * The keep-alive time for connections in default-constructed pools is {@value} seconds.
068 */
069 public static final long DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS = 30;
070
071 /**
072 * Permission for checking shutdown
073 */
074 private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
075
076 /**
077 * The source that this pool uses to create new connections.
078 */
079 private final RepositorySource source;
080
081 /**
082 * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and workers set.
083 */
084 private final ReentrantLock mainLock = new ReentrantLock();
085
086 /**
087 * Wait condition to support awaitTermination
088 */
089 private final Condition termination = mainLock.newCondition();
090
091 /**
092 * Set containing all connections that are available for use.
093 */
094 @GuardedBy( "mainLock" )
095 private final BlockingQueue<ConnectionWrapper> availableConnections = new LinkedBlockingQueue<ConnectionWrapper>();
096
097 /**
098 * The connections that are currently in use.
099 */
100 @GuardedBy( "mainLock" )
101 private final Set<ConnectionWrapper> inUseConnections = new HashSet<ConnectionWrapper>();
102
103 /**
104 * Timeout in nanoseconds for idle connections waiting to be used. Threads use this timeout only when there are more than
105 * corePoolSize present. Otherwise they wait forever to be used.
106 */
107 private volatile long keepAliveTime;
108
109 /**
110 * The target pool size, updated only while holding mainLock, but volatile to allow concurrent readability even during
111 * updates.
112 */
113 @GuardedBy( "mainLock" )
114 private volatile int corePoolSize;
115
116 /**
117 * Maximum pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
118 */
119 @GuardedBy( "mainLock" )
120 private volatile int maximumPoolSize;
121
122 /**
123 * Current pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
124 */
125 @GuardedBy( "mainLock" )
126 private volatile int poolSize;
127
128 /**
129 * Lifecycle state, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
130 */
131 @GuardedBy( "mainLock" )
132 private volatile int runState;
133
134 // Special values for runState
135 /** Normal, not-shutdown mode */
136 static final int RUNNING = 0;
137 /** Controlled shutdown mode */
138 static final int SHUTDOWN = 1;
139 /** Immediate shutdown mode */
140 static final int STOP = 2;
141 /** Final state */
142 static final int TERMINATED = 3;
143
144 /**
145 * Flag specifying whether a connection should be validated before returning it from the {@link #getConnection()} method.
146 */
147 private final AtomicBoolean validateConnectionBeforeUse = new AtomicBoolean(false);
148
149 /**
150 * The time in nanoseconds that ping should wait before timing out and failing.
151 */
152 private final AtomicLong pingTimeout = new AtomicLong(0);
153
154 /**
155 * The number of times an attempt to obtain a connection should fail with invalid connections before throwing an exception.
156 */
157 private final AtomicInteger maxFailedAttemptsBeforeError = new AtomicInteger(10);
158
159 private final AtomicLong totalConnectionsCreated = new AtomicLong(0);
160
161 private final AtomicLong totalConnectionsUsed = new AtomicLong(0);
162
163 private final Logger logger = Logger.getLogger(this.getClass());
164
165 /**
166 * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}. This constructor
167 * uses the {@link #DEFAULT_CORE_POOL_SIZE default core pool size}, {@link #DEFAULT_MAXIMUM_POOL_SIZE default maximum pool
168 * size}, and {@link #DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS default keep-alive time (in seconds)}.
169 *
170 * @param source the source for connections
171 * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid
172 */
173 public RepositoryConnectionPool( RepositorySource source ) {
174 this(source, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS, TimeUnit.SECONDS);
175 }
176
177 /**
178 * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}.
179 *
180 * @param source the source for connections
181 * @param corePoolSize the number of connections to keep in the pool, even if they are idle.
182 * @param maximumPoolSize the maximum number of connections to allow in the pool.
183 * @param keepAliveTime when the number of connection is greater than the core, this is the maximum time that excess idle
184 * connections will be kept before terminating.
185 * @param unit the time unit for the keepAliveTime argument.
186 * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid
187 */
188 public RepositoryConnectionPool( RepositorySource source,
189 int corePoolSize,
190 int maximumPoolSize,
191 long keepAliveTime,
192 TimeUnit unit ) {
193 CheckArg.isNonNegative(corePoolSize, "corePoolSize");
194 CheckArg.isPositive(maximumPoolSize, "maximumPoolSize");
195 CheckArg.isNonNegative(keepAliveTime, "keepAliveTime");
196 CheckArg.isNotNull(source, "source");
197 if (maximumPoolSize < corePoolSize) {
198 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
199 }
200 this.source = source;
201 this.corePoolSize = corePoolSize;
202 this.maximumPoolSize = maximumPoolSize;
203 this.keepAliveTime = unit.toNanos(keepAliveTime);
204 this.setPingTimeout(100, TimeUnit.MILLISECONDS);
205 }
206
207 /**
208 * Get the {@link RepositorySource} that's used by this pool.
209 *
210 * @return the repository source; never null
211 */
212 public final RepositorySource getRepositorySource() {
213 return source;
214 }
215
216 /**
217 * Get the name of this pool, which delegates to the connection factory.
218 *
219 * @return the name of the source
220 */
221 protected String getSourceName() {
222 return source.getName();
223 }
224
225 // -------------------------------------------------
226 // Property settings ...
227 // -------------------------------------------------
228
229 /**
230 * @return validateConnectionBeforeUse
231 */
232 public boolean getValidateConnectionBeforeUse() {
233 return this.validateConnectionBeforeUse.get();
234 }
235
236 /**
237 * @param validateConnectionBeforeUse Sets validateConnectionBeforeUse to the specified value.
238 */
239 public void setValidateConnectionBeforeUse( boolean validateConnectionBeforeUse ) {
240 this.validateConnectionBeforeUse.set(validateConnectionBeforeUse);
241 }
242
243 /**
244 * @return pingTimeout
245 */
246 public long getPingTimeoutInNanos() {
247 return this.pingTimeout.get();
248 }
249
250 /**
251 * @param pingTimeout the time to wait for a ping to complete
252 * @param unit the time unit of the time argument
253 */
254 public void setPingTimeout( long pingTimeout,
255 TimeUnit unit ) {
256 CheckArg.isNonNegative(pingTimeout, "time");
257 this.pingTimeout.set(unit.toNanos(pingTimeout));
258 }
259
260 /**
261 * @return maxFailedAttemptsBeforeError
262 */
263 public int getMaxFailedAttemptsBeforeError() {
264 return this.maxFailedAttemptsBeforeError.get();
265 }
266
267 /**
268 * @param maxFailedAttemptsBeforeError Sets maxFailedAttemptsBeforeError to the specified value.
269 */
270 public void setMaxFailedAttemptsBeforeError( int maxFailedAttemptsBeforeError ) {
271 this.maxFailedAttemptsBeforeError.set(maxFailedAttemptsBeforeError);
272 }
273
274 /**
275 * Sets the time limit for which connections may remain idle before being closed. If there are more than the core number of
276 * connections currently in the pool, after waiting this amount of time without being used, excess threads will be terminated.
277 * This overrides any value set in the constructor.
278 *
279 * @param time the time to wait. A time value of zero will cause excess connections to terminate immediately after being
280 * returned.
281 * @param unit the time unit of the time argument
282 * @throws IllegalArgumentException if time less than zero
283 * @see #getKeepAliveTime
284 */
285 public void setKeepAliveTime( long time,
286 TimeUnit unit ) {
287 CheckArg.isNonNegative(time, "time");
288 this.keepAliveTime = unit.toNanos(time);
289 }
290
291 /**
292 * Returns the connection keep-alive time, which is the amount of time which connections in excess of the core pool size may
293 * remain idle before being closed.
294 *
295 * @param unit the desired time unit of the result
296 * @return the time limit
297 * @see #setKeepAliveTime
298 */
299 public long getKeepAliveTime( TimeUnit unit ) {
300 assert unit != null;
301 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
302 }
303
304 /**
305 * @return maximumPoolSize
306 */
307 public int getMaximumPoolSize() {
308 return this.maximumPoolSize;
309 }
310
311 /**
312 * Sets the maximum allowed number of connections. This overrides any value set in the constructor. If the new value is
313 * smaller than the current value, excess existing but unused connections will be closed.
314 *
315 * @param maximumPoolSize the new maximum
316 * @throws IllegalArgumentException if maximumPoolSize less than zero or the {@link #getCorePoolSize() core pool size}
317 * @see #getMaximumPoolSize
318 */
319 public void setMaximumPoolSize( int maximumPoolSize ) {
320 CheckArg.isPositive(maximumPoolSize, "maximum pool size");
321 if (maximumPoolSize < corePoolSize) {
322 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
323 }
324 final ReentrantLock mainLock = this.mainLock;
325 try {
326 mainLock.lock();
327 int extra = this.maximumPoolSize - maximumPoolSize;
328 this.maximumPoolSize = maximumPoolSize;
329 if (extra > 0 && poolSize > maximumPoolSize) {
330 // Drain the extra connections from those available ...
331 drainUnusedConnections(extra);
332 }
333 } finally {
334 mainLock.unlock();
335 }
336 }
337
338 /**
339 * Returns the core number of connections.
340 *
341 * @return the core number of connections
342 * @see #setCorePoolSize(int)
343 */
344 public int getCorePoolSize() {
345 return this.corePoolSize;
346 }
347
348 /**
349 * Sets the core number of connections. This overrides any value set in the constructor. If the new value is smaller than the
350 * current value, excess existing and unused connections will be closed. If larger, new connections will, if needed, be
351 * created.
352 *
353 * @param corePoolSize the new core size
354 * @throws RepositorySourceException if there was an error obtaining the new connection
355 * @throws InterruptedException if the thread was interrupted during the operation
356 * @throws IllegalArgumentException if <tt>corePoolSize</tt> less than zero
357 * @see #getCorePoolSize()
358 */
359 public void setCorePoolSize( int corePoolSize ) throws RepositorySourceException, InterruptedException {
360 CheckArg.isNonNegative(corePoolSize, "core pool size");
361 if (maximumPoolSize < corePoolSize) {
362 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
363 }
364 final ReentrantLock mainLock = this.mainLock;
365 try {
366 mainLock.lock();
367 int extra = this.corePoolSize - corePoolSize;
368 this.corePoolSize = corePoolSize;
369 if (extra < 0) {
370 // Add connections ...
371 addConnectionsIfUnderCorePoolSize();
372 } else if (extra > 0 && poolSize > corePoolSize) {
373 // Drain the extra connections from those available ...
374 drainUnusedConnections(extra);
375 }
376 } finally {
377 mainLock.unlock();
378 }
379 }
380
381 // -------------------------------------------------
382 // Statistics ...
383 // -------------------------------------------------
384
385 /**
386 * Returns the current number of connections in the pool, including those that are checked out (in use) and those that are not
387 * being used.
388 *
389 * @return the number of connections
390 */
391 public int getPoolSize() {
392 return poolSize;
393 }
394
395 /**
396 * Returns the approximate number of connections that are currently checked out from the pool.
397 *
398 * @return the number of checked-out connections
399 */
400 public int getInUseCount() {
401 final ReentrantLock mainLock = this.mainLock;
402 try {
403 mainLock.lock();
404 return this.inUseConnections.size();
405 } finally {
406 mainLock.unlock();
407 }
408 }
409
410 /**
411 * Get the total number of connections that have been created by this pool.
412 *
413 * @return the total number of connections created by this pool
414 */
415 public long getTotalConnectionsCreated() {
416 return this.totalConnectionsCreated.get();
417 }
418
419 /**
420 * Get the total number of times connections have been {@link #getConnection()} used.
421 *
422 * @return the total number
423 */
424 public long getTotalConnectionsUsed() {
425 return this.totalConnectionsUsed.get();
426 }
427
428 // -------------------------------------------------
429 // State management methods ...
430 // -------------------------------------------------
431
432 /**
433 * Starts a core connection, causing it to idly wait for use. This overrides the default policy of starting core connections
434 * only when they are {@link #getConnection() needed}. This method will return <tt>false</tt> if all core connections have
435 * already been started.
436 *
437 * @return true if a connection was started
438 * @throws RepositorySourceException if there was an error obtaining the new connection
439 * @throws InterruptedException if the thread was interrupted during the operation
440 */
441 public boolean prestartCoreConnection() throws RepositorySourceException, InterruptedException {
442 final ReentrantLock mainLock = this.mainLock;
443 try {
444 mainLock.lock();
445 return addConnectionIfUnderCorePoolSize();
446 } finally {
447 mainLock.unlock();
448 }
449 }
450
451 /**
452 * Starts all core connections, causing them to idly wait for use. This overrides the default policy of starting core
453 * connections only when they are {@link #getConnection() needed}.
454 *
455 * @return the number of connections started.
456 * @throws RepositorySourceException if there was an error obtaining the new connection
457 * @throws InterruptedException if the thread was interrupted during the operation
458 */
459 public int prestartAllCoreConnections() throws RepositorySourceException, InterruptedException {
460 final ReentrantLock mainLock = this.mainLock;
461 try {
462 mainLock.lock();
463 return addConnectionsIfUnderCorePoolSize();
464 } finally {
465 mainLock.unlock();
466 }
467 }
468
469 /**
470 * Initiates an orderly shutdown of the pool in which connections that are currently in use are allowed to be used and closed
471 * as normal, but no new connections will be created. Invocation has no additional effect if already shut down.
472 * <p>
473 * Once the pool has been shutdown, it may not be used to {@link #getConnection() get connections}.
474 * </p>
475 *
476 * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller
477 * is not permitted to modify because it does not hold {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
478 * or the security manager's <tt>checkAccess</tt> method denies access.
479 * @see #shutdownNow()
480 */
481 public void shutdown() {
482 // Fail if caller doesn't have modifyThread permission. We
483 // explicitly check permissions directly because we can't trust
484 // implementations of SecurityManager to correctly override
485 // the "check access" methods such that our documented
486 // security policy is implemented.
487 SecurityManager security = System.getSecurityManager();
488 if (security != null) java.security.AccessController.checkPermission(shutdownPerm);
489
490 this.logger.debug("Shutting down repository connection pool for {0}", getSourceName());
491 boolean fullyTerminated = false;
492 final ReentrantLock mainLock = this.mainLock;
493 try {
494 mainLock.lock();
495 int state = this.runState;
496 if (state == RUNNING) {
497 // don't override shutdownNow
498 this.runState = SHUTDOWN;
499 }
500
501 // Kill the maintenance thread ...
502
503 // Remove and close all available connections ...
504 if (!this.availableConnections.isEmpty()) {
505 // Drain the extra connections from those available ...
506 drainUnusedConnections(this.availableConnections.size());
507 }
508
509 // If there are no connections being used, trigger full termination now ...
510 if (this.inUseConnections.isEmpty()) {
511 fullyTerminated = true;
512 this.logger.trace("Signalling termination of repository connection pool for {0}", getSourceName());
513 runState = TERMINATED;
514 termination.signalAll();
515 this.logger.debug("Terminated repository connection pool for {0}", getSourceName());
516 }
517 // Otherwise the last connection that is closed will transition the runState to TERMINATED ...
518 } finally {
519 mainLock.unlock();
520 }
521 if (fullyTerminated) terminated();
522 }
523
524 /**
525 * Attempts to close all connections in the pool, including those connections currently in use, and prevent the use of other
526 * connections.
527 *
528 * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller
529 * is not permitted to modify because it does not hold {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
530 * or the security manager's <tt>checkAccess</tt> method denies access.
531 * @see #shutdown()
532 */
533 public void shutdownNow() {
534 // Almost the same code as shutdown()
535 SecurityManager security = System.getSecurityManager();
536 if (security != null) java.security.AccessController.checkPermission(shutdownPerm);
537
538 this.logger.debug("Shutting down (immediately) repository connection pool for {0}", getSourceName());
539 boolean fullyTerminated = false;
540 final ReentrantLock mainLock = this.mainLock;
541 try {
542 mainLock.lock();
543 int state = this.runState;
544 if (state != TERMINATED) {
545 // don't override shutdownNow
546 this.runState = STOP;
547 }
548
549 // Kill the maintenance thread ...
550
551 // Remove and close all available connections ...
552 if (!this.availableConnections.isEmpty()) {
553 // Drain the extra connections from those available ...
554 drainUnusedConnections(this.availableConnections.size());
555 }
556
557 // If there are connections being used, close them now ...
558 if (!this.inUseConnections.isEmpty()) {
559 for (ConnectionWrapper connectionInUse : this.inUseConnections) {
560 this.logger.trace("Closing repository connection to {0}", getSourceName());
561 connectionInUse.getOriginal().close();
562 }
563 this.poolSize -= this.inUseConnections.size();
564 // The last connection that is closed will transition the runState to TERMINATED ...
565 } else {
566 // There are no connections in use, so trigger full termination now ...
567 fullyTerminated = true;
568 this.logger.trace("Signalling termination of repository connection pool for {0}", getSourceName());
569 runState = TERMINATED;
570 termination.signalAll();
571 this.logger.debug("Terminated repository connection pool for {0}", getSourceName());
572 }
573
574 } finally {
575 mainLock.unlock();
576 }
577 if (fullyTerminated) terminated();
578 }
579
580 /**
581 * Return whether this connection pool is running and is able to {@link #getConnection() provide connections}. Note that this
582 * method is effectively <code>!isShutdown()</code>.
583 *
584 * @return true if this pool is running, or false otherwise
585 * @see #isShutdown()
586 * @see #isTerminated()
587 * @see #isTerminating()
588 */
589 public boolean isRunning() {
590 return runState == RUNNING;
591 }
592
593 /**
594 * Return whether this connection pool is in the process of shutting down or has already been shut down. A result of
595 * <code>true</code> signals that the pool may no longer be used. Note that this method is effectively
596 * <code>!isRunning()</code>.
597 *
598 * @return true if this pool has been shut down, or false otherwise
599 * @see #isShutdown()
600 * @see #isTerminated()
601 * @see #isTerminating()
602 */
603 public boolean isShutdown() {
604 return runState != RUNNING;
605 }
606
607 /**
608 * Returns true if this pool is in the process of terminating after {@link #shutdown()} or {@link #shutdownNow()} has been
609 * called but has not completely terminated. This method may be useful for debugging. A return of <tt>true</tt> reported a
610 * sufficient period after shutdown may indicate that submitted tasks have ignored or suppressed interruption, causing this
611 * executor not to properly terminate.
612 *
613 * @return true if terminating but not yet terminated, or false otherwise
614 * @see #isTerminated()
615 */
616 public boolean isTerminating() {
617 return runState == STOP;
618 }
619
620 /**
621 * Return true if this pool has completed its termination and no longer has any open connections.
622 *
623 * @return true if terminated, or false otherwise
624 * @see #isTerminating()
625 */
626 public boolean isTerminated() {
627 return runState == TERMINATED;
628 }
629
630 /**
631 * Method that can be called after {@link #shutdown()} or {@link #shutdownNow()} to wait until all connections in use at the
632 * time those methods were called have been closed normally. This method accepts a maximum time duration, after which it will
633 * return even if all connections have not been closed.
634 *
635 * @param timeout the maximum time to wait for all connections to be closed and returned to the pool
636 * @param unit the time unit for <code>timeout</code>
637 * @return true if the pool was terminated in the supplied time (or was already terminated), or false if the timeout occurred
638 * before all the connections were closed
639 * @throws InterruptedException if the thread was interrupted
640 */
641 public boolean awaitTermination( long timeout,
642 TimeUnit unit ) throws InterruptedException {
643 this.logger.trace("Awaiting termination");
644 long nanos = unit.toNanos(timeout);
645 final ReentrantLock mainLock = this.mainLock;
646 try {
647 mainLock.lock();
648 for (;;) {
649 // this.logger.trace("---> Run state = {0}; condition = {1}, {2} open", runState, termination, poolSize);
650 if (runState == TERMINATED) return true;
651 if (nanos <= 0) return false;
652 nanos = termination.awaitNanos(nanos);
653 //this.logger.trace("---> Done waiting: run state = {0}; condition = {1}, {2} open",runState,termination,poolSize)
654 // ;
655 }
656 } finally {
657 mainLock.unlock();
658 this.logger.trace("Finished awaiting termination");
659 }
660 }
661
662 /**
663 * Method invoked when the pool has terminated. Default implementation does nothing. Note: To properly nest multiple
664 * overridings, subclasses should generally invoke <tt>super.terminated</tt> within this method.
665 */
666 protected void terminated() {
667 }
668
669 /**
670 * Invokes <tt>shutdown</tt> when this pool is no longer referenced.
671 */
672 @Override
673 protected void finalize() {
674 shutdown();
675 }
676
677 // -------------------------------------------------
678 // Connection management methods ...
679 // -------------------------------------------------
680
681 /**
682 * Get a connection from the pool. This method either returns an unused connection if one is available, creates a connection
683 * if there is still room in the pool, or blocks until a connection becomes available if the pool already contains the maximum
684 * number of connections and all connections are currently being used.
685 *
686 * @return a connection
687 * @throws RepositorySourceException if there is a problem obtaining a connection
688 * @throws IllegalStateException if the factory is not in a state to create or return connections
689 */
690 public RepositoryConnection getConnection() throws RepositorySourceException {
691 int attemptsAllowed = this.maxFailedAttemptsBeforeError.get();
692 ConnectionWrapper connection = null;
693 // Do this until we get a good connection ...
694 int attemptsRemaining = attemptsAllowed;
695 while (connection == null && attemptsRemaining > 0) {
696 --attemptsRemaining;
697 ReentrantLock mainLock = this.mainLock;
698 try {
699 mainLock.lock();
700 // If we're shutting down the pool, then just close the connection ...
701 if (this.runState != RUNNING) {
702 throw new IllegalStateException(GraphI18n.repositoryConnectionPoolIsNotRunning.text());
703 }
704 // If there are fewer total connections than the core size ...
705 if (this.poolSize < this.corePoolSize) {
706 // Immediately create a wrapped connection and return it ...
707 connection = newWrappedConnection();
708 }
709 // Peek to see if there is a connection available ...
710 else if (this.availableConnections.peek() != null) {
711 // There is, so take it and return it ...
712 try {
713 connection = this.availableConnections.take();
714 } catch (InterruptedException e) {
715 this.logger.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName());
716 Thread.interrupted();
717 throw new RepositorySourceException(getSourceName(), e);
718 }
719 }
720 // There is no connection available. If there are fewer total connections than the maximum size ...
721 else if (this.poolSize < this.maximumPoolSize) {
722 // Immediately create a wrapped connection and return it ...
723 connection = newWrappedConnection();
724 }
725 if (connection != null) {
726 this.inUseConnections.add(connection);
727 }
728 } finally {
729 mainLock.unlock();
730 }
731 if (connection == null) {
732 // There are not enough connections, so wait in line for the next available connection ...
733 this.logger.trace("Waiting for a repository connection from pool {0}", getSourceName());
734 try {
735 connection = this.availableConnections.take();
736 } catch (InterruptedException e) {
737 this.logger.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName());
738 Thread.interrupted();
739 throw new RepositorySourceException(getSourceName(), e);
740 }
741 mainLock = this.mainLock;
742 mainLock.lock();
743 try {
744 if (connection != null) {
745 this.inUseConnections.add(connection);
746 }
747 } finally {
748 mainLock.unlock();
749 }
750 this.logger.trace("Recieved a repository connection from pool {0}", getSourceName());
751 }
752 if (connection != null && this.validateConnectionBeforeUse.get()) {
753 try {
754 connection = validateConnection(connection);
755 } catch (InterruptedException e) {
756 this.logger.trace("Cancelled validating a repository connection obtained from pool {0}", getSourceName());
757 returnConnection(connection);
758 Thread.interrupted();
759 throw new RepositorySourceException(getSourceName(), e);
760 }
761 }
762 }
763 if (connection == null) {
764 // We were unable to obtain a usable connection, so fail ...
765 throw new RepositorySourceException(GraphI18n.unableToObtainValidRepositoryAfterAttempts.text(attemptsAllowed));
766 }
767 this.totalConnectionsUsed.incrementAndGet();
768 return connection;
769 }
770
771 /**
772 * This method is automatically called by the {@link ConnectionWrapper} when it is {@link ConnectionWrapper#close() closed}.
773 *
774 * @param wrapper the wrapper to the connection that is being returned to the pool
775 */
776 protected void returnConnection( ConnectionWrapper wrapper ) {
777 assert wrapper != null;
778 ConnectionWrapper wrapperToClose = null;
779 final ReentrantLock mainLock = this.mainLock;
780 try {
781 mainLock.lock();
782 // Remove the connection from the in-use set ...
783 boolean removed = this.inUseConnections.remove(wrapper);
784 assert removed;
785
786 // If we're shutting down the pool, then just close the connection ...
787 if (this.runState != RUNNING) {
788 wrapperToClose = wrapper;
789 }
790 // If there are more connections than the maximum size...
791 else if (this.poolSize > this.maximumPoolSize) {
792 // Immediately close this connection ...
793 wrapperToClose = wrapper;
794 }
795 // Attempt to make the connection available (this should generally work, unless there is an upper limit
796 // to the number of available connections) ...
797 else if (!this.availableConnections.offer(new ConnectionWrapper(wrapper.getOriginal()))) {
798 // The pool of available connection is full, so release it ...
799 wrapperToClose = wrapper;
800 }
801 } finally {
802 mainLock.unlock();
803 }
804 // Close the connection if we're supposed to (do it outside of the main lock)...
805 if (wrapperToClose != null) {
806 closeConnection(wrapperToClose);
807 }
808 }
809
810 /**
811 * Validate the supplied connection, returning the connection if valid or null if the connection is not valid.
812 *
813 * @param connection the connection to be validated; may not be null
814 * @return the validated connection, or null if the connection did not validate and was removed from the pool
815 * @throws InterruptedException if the thread is interrupted while validating the connection
816 */
817 protected ConnectionWrapper validateConnection( ConnectionWrapper connection ) throws InterruptedException {
818 assert connection != null;
819 ConnectionWrapper invalidConnection = null;
820 try {
821 if (!connection.ping(this.pingTimeout.get(), TimeUnit.NANOSECONDS)) {
822 invalidConnection = connection;
823 }
824 } finally {
825 if (invalidConnection != null) {
826 connection = null;
827 returnConnection(invalidConnection);
828 }
829 }
830 return connection;
831 }
832
833 /**
834 * Obtain a new connection wrapped in a {@link ConnectionWrapper}. This method does not check whether creating the new
835 * connection would violate the {@link #maximumPoolSize maximum pool size} nor does it add the new connection to the
836 * {@link #availableConnections available connections} (as the caller may want it immediately), but it does increment the
837 * {@link #poolSize pool size}.
838 *
839 * @return the connection wrapper with a new connection
840 * @throws RepositorySourceException if there was an error obtaining the new connection
841 */
842 @GuardedBy( "mainLock" )
843 protected ConnectionWrapper newWrappedConnection() throws RepositorySourceException {
844 RepositoryConnection connection = this.source.getConnection();
845 ++this.poolSize;
846 this.totalConnectionsCreated.incrementAndGet();
847 return new ConnectionWrapper(connection);
848 }
849
850 /**
851 * Close a connection that is in the pool but no longer in the {@link #availableConnections available connections}. This
852 * method does decrement the {@link #poolSize pool size}.
853 *
854 * @param wrapper the wrapper for the connection to be closed
855 */
856 protected void closeConnection( ConnectionWrapper wrapper ) {
857 assert wrapper != null;
858 RepositoryConnection original = wrapper.getOriginal();
859 assert original != null;
860 try {
861 this.logger.debug("Closing repository connection to {0} ({1} open connections remain)", getSourceName(), poolSize);
862 original.close();
863 } finally {
864 final ReentrantLock mainLock = this.mainLock;
865 try {
866 mainLock.lock();
867 // No matter what reduce the pool size count
868 --this.poolSize;
869 // And if shutting down and this was the last connection being used...
870 if ((runState == SHUTDOWN || runState == STOP) && this.poolSize <= 0) {
871 // then signal anybody that has called "awaitTermination(...)"
872 this.logger.trace("Signalling termination of repository connection pool for {0}", getSourceName());
873 this.runState = TERMINATED;
874 this.termination.signalAll();
875 this.logger.trace("Terminated repository connection pool for {0}", getSourceName());
876
877 // fall through to call terminate() outside of lock.
878 }
879 } finally {
880 mainLock.unlock();
881 }
882 }
883 }
884
885 @GuardedBy( "mainLock" )
886 protected int drainUnusedConnections( int count ) {
887 if (count <= 0) return 0;
888 this.logger.trace("Draining up to {0} unused repository connections to {1}", count, getSourceName());
889 // Drain the extra connections from those available ...
890 Collection<ConnectionWrapper> extraConnections = new LinkedList<ConnectionWrapper>();
891 this.availableConnections.drainTo(extraConnections, count);
892 for (ConnectionWrapper connection : extraConnections) {
893 this.logger.trace("Closing repository connection to {0}", getSourceName());
894 connection.getOriginal().close();
895 }
896 int numClosed = extraConnections.size();
897 this.poolSize -= numClosed;
898 this.logger.trace("Drained {0} unused connections ({1} open connections remain)", numClosed, poolSize);
899 return numClosed;
900 }
901
902 @GuardedBy( "mainLock" )
903 protected boolean addConnectionIfUnderCorePoolSize() throws RepositorySourceException {
904 // Add connection ...
905 if (this.poolSize < this.corePoolSize) {
906 this.availableConnections.offer(newWrappedConnection());
907 this.logger.trace("Added connection to {0} in undersized pool", getSourceName());
908 return true;
909 }
910 return false;
911 }
912
913 @GuardedBy( "mainLock" )
914 protected int addConnectionsIfUnderCorePoolSize() throws RepositorySourceException {
915 // Add connections ...
916 int n = 0;
917 while (this.poolSize < this.corePoolSize) {
918 this.availableConnections.offer(newWrappedConnection());
919 ++n;
920 }
921 this.logger.trace("Added {0} connection(s) to {1} in undersized pool", n, getSourceName());
922 return n;
923 }
924
925 protected class ConnectionWrapper implements RepositoryConnection {
926
927 private final RepositoryConnection original;
928 private final long timeCreated;
929 private long lastUsed;
930 private boolean closed = false;
931
932 protected ConnectionWrapper( RepositoryConnection connection ) {
933 assert connection != null;
934 this.original = connection;
935 this.timeCreated = System.currentTimeMillis();
936 }
937
938 /**
939 * @return original
940 */
941 protected RepositoryConnection getOriginal() {
942 return this.original;
943 }
944
945 /**
946 * @return lastUsed
947 */
948 public long getTimeLastUsed() {
949 return this.lastUsed;
950 }
951
952 /**
953 * @return timeCreated
954 */
955 public long getTimeCreated() {
956 return this.timeCreated;
957 }
958
959 /**
960 * {@inheritDoc}
961 */
962 public String getSourceName() {
963 return this.original.getSourceName();
964 }
965
966 /**
967 * {@inheritDoc}
968 */
969 public XAResource getXAResource() {
970 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
971 return this.original.getXAResource();
972 }
973
974 /**
975 * {@inheritDoc}
976 */
977 public CachePolicy getDefaultCachePolicy() {
978 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
979 return this.original.getDefaultCachePolicy();
980 }
981
982 /**
983 * {@inheritDoc}
984 *
985 * @see org.jboss.dna.graph.connector.RepositoryConnection#execute(org.jboss.dna.graph.ExecutionContext,
986 * org.jboss.dna.graph.request.Request)
987 */
988 public void execute( ExecutionContext context,
989 Request request ) throws RepositorySourceException {
990 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
991 this.original.execute(context, request);
992 }
993
994 /**
995 * {@inheritDoc}
996 */
997 public boolean ping( long time,
998 TimeUnit unit ) throws InterruptedException {
999 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
1000 return this.original.ping(time, unit);
1001 }
1002
1003 /**
1004 * {@inheritDoc}
1005 */
1006 public void close() {
1007 if (!closed) {
1008 this.lastUsed = System.currentTimeMillis();
1009 this.original.close();
1010 this.closed = true;
1011 returnConnection(this);
1012 }
1013 }
1014 }
1015
1016 }