001 /*
002 * JBoss, Home of Professional Open Source.
003 * Copyright 2008, Red Hat Middleware LLC, and individual contributors
004 * as indicated by the @author tags. See the copyright.txt file in the
005 * distribution for a full listing of individual contributors.
006 *
007 * This is free software; you can redistribute it and/or modify it
008 * under the terms of the GNU Lesser General Public License as
009 * published by the Free Software Foundation; either version 2.1 of
010 * the License, or (at your option) any later version.
011 *
012 * This software is distributed in the hope that it will be useful,
013 * but WITHOUT ANY WARRANTY; without even the implied warranty of
014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015 * Lesser General Public License for more details.
016 *
017 * You should have received a copy of the GNU Lesser General Public
018 * License along with this software; if not, write to the Free
019 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021 */
022 package org.jboss.dna.graph.connectors;
023
024 import java.util.Collection;
025 import java.util.HashSet;
026 import java.util.LinkedList;
027 import java.util.Set;
028 import java.util.concurrent.BlockingQueue;
029 import java.util.concurrent.LinkedBlockingQueue;
030 import java.util.concurrent.TimeUnit;
031 import java.util.concurrent.atomic.AtomicBoolean;
032 import java.util.concurrent.atomic.AtomicInteger;
033 import java.util.concurrent.atomic.AtomicLong;
034 import java.util.concurrent.locks.Condition;
035 import java.util.concurrent.locks.ReentrantLock;
036 import javax.transaction.xa.XAResource;
037 import net.jcip.annotations.GuardedBy;
038 import net.jcip.annotations.ThreadSafe;
039 import org.jboss.dna.common.util.CheckArg;
040 import org.jboss.dna.common.util.Logger;
041 import org.jboss.dna.graph.ExecutionContext;
042 import org.jboss.dna.graph.GraphI18n;
043 import org.jboss.dna.graph.cache.CachePolicy;
044 import org.jboss.dna.graph.requests.Request;
045
046 /**
047 * A reusable implementation of a managed pool of connections that is optimized for safe concurrent operations.
048 *
049 * @author Randall Hauch
050 */
051 @ThreadSafe
052 public class RepositoryConnectionPool {
053
054 /**
055 * The core pool size for default-constructed pools is {@value} .
056 */
057 public static final int DEFAULT_CORE_POOL_SIZE = 1;
058
059 /**
060 * The maximum pool size for default-constructed pools is {@value} .
061 */
062 public static final int DEFAULT_MAXIMUM_POOL_SIZE = 10;
063
064 /**
065 * The keep-alive time for connections in default-constructed pools is {@value} seconds.
066 */
067 public static final long DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS = 30;
068
069 /**
070 * Permission for checking shutdown
071 */
072 private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
073
074 /**
075 * The source that this pool uses to create new connections.
076 */
077 private final RepositorySource source;
078
079 /**
080 * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and workers set.
081 */
082 private final ReentrantLock mainLock = new ReentrantLock();
083
084 /**
085 * Wait condition to support awaitTermination
086 */
087 private final Condition termination = mainLock.newCondition();
088
089 /**
090 * Set containing all connections that are available for use.
091 */
092 @GuardedBy( "mainLock" )
093 private final BlockingQueue<ConnectionWrapper> availableConnections = new LinkedBlockingQueue<ConnectionWrapper>();
094
095 /**
096 * The connections that are currently in use.
097 */
098 @GuardedBy( "mainLock" )
099 private final Set<ConnectionWrapper> inUseConnections = new HashSet<ConnectionWrapper>();
100
101 /**
102 * Timeout in nanoseconds for idle connections waiting to be used. Threads use this timeout only when there are more than
103 * corePoolSize present. Otherwise they wait forever to be used.
104 */
105 private volatile long keepAliveTime;
106
107 /**
108 * The target pool size, updated only while holding mainLock, but volatile to allow concurrent readability even during
109 * updates.
110 */
111 @GuardedBy( "mainLock" )
112 private volatile int corePoolSize;
113
114 /**
115 * Maximum pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
116 */
117 @GuardedBy( "mainLock" )
118 private volatile int maximumPoolSize;
119
120 /**
121 * Current pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
122 */
123 @GuardedBy( "mainLock" )
124 private volatile int poolSize;
125
126 /**
127 * Lifecycle state, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
128 */
129 @GuardedBy( "mainLock" )
130 private volatile int runState;
131
132 // Special values for runState
133 /** Normal, not-shutdown mode */
134 static final int RUNNING = 0;
135 /** Controlled shutdown mode */
136 static final int SHUTDOWN = 1;
137 /** Immediate shutdown mode */
138 static final int STOP = 2;
139 /** Final state */
140 static final int TERMINATED = 3;
141
142 /**
143 * Flag specifying whether a connection should be validated before returning it from the {@link #getConnection()} method.
144 */
145 private final AtomicBoolean validateConnectionBeforeUse = new AtomicBoolean(false);
146
147 /**
148 * The time in nanoseconds that ping should wait before timing out and failing.
149 */
150 private final AtomicLong pingTimeout = new AtomicLong(0);
151
152 /**
153 * The number of times an attempt to obtain a connection should fail with invalid connections before throwing an exception.
154 */
155 private final AtomicInteger maxFailedAttemptsBeforeError = new AtomicInteger(10);
156
157 private final AtomicLong totalConnectionsCreated = new AtomicLong(0);
158
159 private final AtomicLong totalConnectionsUsed = new AtomicLong(0);
160
161 private final Logger logger = Logger.getLogger(this.getClass());
162
163 /**
164 * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}. This constructor
165 * uses the {@link #DEFAULT_CORE_POOL_SIZE default core pool size}, {@link #DEFAULT_MAXIMUM_POOL_SIZE default maximum pool
166 * size}, and {@link #DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS default keep-alive time (in seconds)}.
167 *
168 * @param source the source for connections
169 * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid
170 */
171 public RepositoryConnectionPool( RepositorySource source ) {
172 this(source, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS, TimeUnit.SECONDS);
173 }
174
175 /**
176 * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}.
177 *
178 * @param source the source for connections
179 * @param corePoolSize the number of connections to keep in the pool, even if they are idle.
180 * @param maximumPoolSize the maximum number of connections to allow in the pool.
181 * @param keepAliveTime when the number of connection is greater than the core, this is the maximum time that excess idle
182 * connections will be kept before terminating.
183 * @param unit the time unit for the keepAliveTime argument.
184 * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid
185 */
186 public RepositoryConnectionPool( RepositorySource source,
187 int corePoolSize,
188 int maximumPoolSize,
189 long keepAliveTime,
190 TimeUnit unit ) {
191 CheckArg.isNonNegative(corePoolSize, "corePoolSize");
192 CheckArg.isPositive(maximumPoolSize, "maximumPoolSize");
193 CheckArg.isNonNegative(keepAliveTime, "keepAliveTime");
194 CheckArg.isNotNull(source, "source");
195 if (maximumPoolSize < corePoolSize) {
196 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
197 }
198 this.source = source;
199 this.corePoolSize = corePoolSize;
200 this.maximumPoolSize = maximumPoolSize;
201 this.keepAliveTime = unit.toNanos(keepAliveTime);
202 this.setPingTimeout(100, TimeUnit.MILLISECONDS);
203 }
204
205 /**
206 * Get the {@link RepositorySource} that's used by this pool.
207 *
208 * @return the repository source; never null
209 */
210 public final RepositorySource getRepositorySource() {
211 return source;
212 }
213
214 /**
215 * Get the name of this pool, which delegates to the connection factory.
216 *
217 * @return the name of the source
218 */
219 protected String getSourceName() {
220 return source.getName();
221 }
222
223 // -------------------------------------------------
224 // Property settings ...
225 // -------------------------------------------------
226
227 /**
228 * @return validateConnectionBeforeUse
229 */
230 public boolean getValidateConnectionBeforeUse() {
231 return this.validateConnectionBeforeUse.get();
232 }
233
234 /**
235 * @param validateConnectionBeforeUse Sets validateConnectionBeforeUse to the specified value.
236 */
237 public void setValidateConnectionBeforeUse( boolean validateConnectionBeforeUse ) {
238 this.validateConnectionBeforeUse.set(validateConnectionBeforeUse);
239 }
240
241 /**
242 * @return pingTimeout
243 */
244 public long getPingTimeoutInNanos() {
245 return this.pingTimeout.get();
246 }
247
248 /**
249 * @param pingTimeout the time to wait for a ping to complete
250 * @param unit the time unit of the time argument
251 */
252 public void setPingTimeout( long pingTimeout,
253 TimeUnit unit ) {
254 CheckArg.isNonNegative(pingTimeout, "time");
255 this.pingTimeout.set(unit.toNanos(pingTimeout));
256 }
257
258 /**
259 * @return maxFailedAttemptsBeforeError
260 */
261 public int getMaxFailedAttemptsBeforeError() {
262 return this.maxFailedAttemptsBeforeError.get();
263 }
264
265 /**
266 * @param maxFailedAttemptsBeforeError Sets maxFailedAttemptsBeforeError to the specified value.
267 */
268 public void setMaxFailedAttemptsBeforeError( int maxFailedAttemptsBeforeError ) {
269 this.maxFailedAttemptsBeforeError.set(maxFailedAttemptsBeforeError);
270 }
271
272 /**
273 * Sets the time limit for which connections may remain idle before being closed. If there are more than the core number of
274 * connections currently in the pool, after waiting this amount of time without being used, excess threads will be terminated.
275 * This overrides any value set in the constructor.
276 *
277 * @param time the time to wait. A time value of zero will cause excess connections to terminate immediately after being
278 * returned.
279 * @param unit the time unit of the time argument
280 * @throws IllegalArgumentException if time less than zero
281 * @see #getKeepAliveTime
282 */
283 public void setKeepAliveTime( long time,
284 TimeUnit unit ) {
285 CheckArg.isNonNegative(time, "time");
286 this.keepAliveTime = unit.toNanos(time);
287 }
288
289 /**
290 * Returns the connection keep-alive time, which is the amount of time which connections in excess of the core pool size may
291 * remain idle before being closed.
292 *
293 * @param unit the desired time unit of the result
294 * @return the time limit
295 * @see #setKeepAliveTime
296 */
297 public long getKeepAliveTime( TimeUnit unit ) {
298 assert unit != null;
299 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
300 }
301
302 /**
303 * @return maximumPoolSize
304 */
305 public int getMaximumPoolSize() {
306 return this.maximumPoolSize;
307 }
308
309 /**
310 * Sets the maximum allowed number of connections. This overrides any value set in the constructor. If the new value is
311 * smaller than the current value, excess existing but unused connections will be closed.
312 *
313 * @param maximumPoolSize the new maximum
314 * @throws IllegalArgumentException if maximumPoolSize less than zero or the {@link #getCorePoolSize() core pool size}
315 * @see #getMaximumPoolSize
316 */
317 public void setMaximumPoolSize( int maximumPoolSize ) {
318 CheckArg.isPositive(maximumPoolSize, "maximum pool size");
319 if (maximumPoolSize < corePoolSize) {
320 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
321 }
322 final ReentrantLock mainLock = this.mainLock;
323 try {
324 mainLock.lock();
325 int extra = this.maximumPoolSize - maximumPoolSize;
326 this.maximumPoolSize = maximumPoolSize;
327 if (extra > 0 && poolSize > maximumPoolSize) {
328 // Drain the extra connections from those available ...
329 drainUnusedConnections(extra);
330 }
331 } finally {
332 mainLock.unlock();
333 }
334 }
335
336 /**
337 * Returns the core number of connections.
338 *
339 * @return the core number of connections
340 * @see #setCorePoolSize(int)
341 */
342 public int getCorePoolSize() {
343 return this.corePoolSize;
344 }
345
346 /**
347 * Sets the core number of connections. This overrides any value set in the constructor. If the new value is smaller than the
348 * current value, excess existing and unused connections will be closed. If larger, new connections will, if needed, be
349 * created.
350 *
351 * @param corePoolSize the new core size
352 * @throws RepositorySourceException if there was an error obtaining the new connection
353 * @throws InterruptedException if the thread was interrupted during the operation
354 * @throws IllegalArgumentException if <tt>corePoolSize</tt> less than zero
355 * @see #getCorePoolSize()
356 */
357 public void setCorePoolSize( int corePoolSize ) throws RepositorySourceException, InterruptedException {
358 CheckArg.isNonNegative(corePoolSize, "core pool size");
359 if (maximumPoolSize < corePoolSize) {
360 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
361 }
362 final ReentrantLock mainLock = this.mainLock;
363 try {
364 mainLock.lock();
365 int extra = this.corePoolSize - corePoolSize;
366 this.corePoolSize = corePoolSize;
367 if (extra < 0) {
368 // Add connections ...
369 addConnectionsIfUnderCorePoolSize();
370 } else if (extra > 0 && poolSize > corePoolSize) {
371 // Drain the extra connections from those available ...
372 drainUnusedConnections(extra);
373 }
374 } finally {
375 mainLock.unlock();
376 }
377 }
378
379 // -------------------------------------------------
380 // Statistics ...
381 // -------------------------------------------------
382
383 /**
384 * Returns the current number of connections in the pool, including those that are checked out (in use) and those that are not
385 * being used.
386 *
387 * @return the number of connections
388 */
389 public int getPoolSize() {
390 return poolSize;
391 }
392
393 /**
394 * Returns the approximate number of connections that are currently checked out from the pool.
395 *
396 * @return the number of checked-out connections
397 */
398 public int getInUseCount() {
399 final ReentrantLock mainLock = this.mainLock;
400 try {
401 mainLock.lock();
402 return this.inUseConnections.size();
403 } finally {
404 mainLock.unlock();
405 }
406 }
407
408 /**
409 * Get the total number of connections that have been created by this pool.
410 *
411 * @return the total number of connections created by this pool
412 */
413 public long getTotalConnectionsCreated() {
414 return this.totalConnectionsCreated.get();
415 }
416
417 /**
418 * Get the total number of times connections have been {@link #getConnection()} used.
419 *
420 * @return the total number
421 */
422 public long getTotalConnectionsUsed() {
423 return this.totalConnectionsUsed.get();
424 }
425
426 // -------------------------------------------------
427 // State management methods ...
428 // -------------------------------------------------
429
430 /**
431 * Starts a core connection, causing it to idly wait for use. This overrides the default policy of starting core connections
432 * only when they are {@link #getConnection() needed}. This method will return <tt>false</tt> if all core connections have
433 * already been started.
434 *
435 * @return true if a connection was started
436 * @throws RepositorySourceException if there was an error obtaining the new connection
437 * @throws InterruptedException if the thread was interrupted during the operation
438 */
439 public boolean prestartCoreConnection() throws RepositorySourceException, InterruptedException {
440 final ReentrantLock mainLock = this.mainLock;
441 try {
442 mainLock.lock();
443 return addConnectionIfUnderCorePoolSize();
444 } finally {
445 mainLock.unlock();
446 }
447 }
448
449 /**
450 * Starts all core connections, causing them to idly wait for use. This overrides the default policy of starting core
451 * connections only when they are {@link #getConnection() needed}.
452 *
453 * @return the number of connections started.
454 * @throws RepositorySourceException if there was an error obtaining the new connection
455 * @throws InterruptedException if the thread was interrupted during the operation
456 */
457 public int prestartAllCoreConnections() throws RepositorySourceException, InterruptedException {
458 final ReentrantLock mainLock = this.mainLock;
459 try {
460 mainLock.lock();
461 return addConnectionsIfUnderCorePoolSize();
462 } finally {
463 mainLock.unlock();
464 }
465 }
466
467 /**
468 * Initiates an orderly shutdown of the pool in which connections that are currently in use are allowed to be used and closed
469 * as normal, but no new connections will be created. Invocation has no additional effect if already shut down.
470 * <p>
471 * Once the pool has been shutdown, it may not be used to {@link #getConnection() get connections}.
472 * </p>
473 *
474 * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller
475 * is not permitted to modify because it does not hold {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
476 * or the security manager's <tt>checkAccess</tt> method denies access.
477 * @see #shutdownNow()
478 */
479 public void shutdown() {
480 // Fail if caller doesn't have modifyThread permission. We
481 // explicitly check permissions directly because we can't trust
482 // implementations of SecurityManager to correctly override
483 // the "check access" methods such that our documented
484 // security policy is implemented.
485 SecurityManager security = System.getSecurityManager();
486 if (security != null) java.security.AccessController.checkPermission(shutdownPerm);
487
488 this.logger.debug("Shutting down repository connection pool for {0}", getSourceName());
489 boolean fullyTerminated = false;
490 final ReentrantLock mainLock = this.mainLock;
491 try {
492 mainLock.lock();
493 int state = this.runState;
494 if (state == RUNNING) {
495 // don't override shutdownNow
496 this.runState = SHUTDOWN;
497 }
498
499 // Kill the maintenance thread ...
500
501 // Remove and close all available connections ...
502 if (!this.availableConnections.isEmpty()) {
503 // Drain the extra connections from those available ...
504 drainUnusedConnections(this.availableConnections.size());
505 }
506
507 // If there are no connections being used, trigger full termination now ...
508 if (this.inUseConnections.isEmpty()) {
509 fullyTerminated = true;
510 this.logger.trace("Signalling termination of repository connection pool for {0}", getSourceName());
511 runState = TERMINATED;
512 termination.signalAll();
513 this.logger.debug("Terminated repository connection pool for {0}", getSourceName());
514 }
515 // Otherwise the last connection that is closed will transition the runState to TERMINATED ...
516 } finally {
517 mainLock.unlock();
518 }
519 if (fullyTerminated) terminated();
520 }
521
522 /**
523 * Attempts to close all connections in the pool, including those connections currently in use, and prevent the use of other
524 * connections.
525 *
526 * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller
527 * is not permitted to modify because it does not hold {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
528 * or the security manager's <tt>checkAccess</tt> method denies access.
529 * @see #shutdown()
530 */
531 public void shutdownNow() {
532 // Almost the same code as shutdown()
533 SecurityManager security = System.getSecurityManager();
534 if (security != null) java.security.AccessController.checkPermission(shutdownPerm);
535
536 this.logger.debug("Shutting down (immediately) repository connection pool for {0}", getSourceName());
537 boolean fullyTerminated = false;
538 final ReentrantLock mainLock = this.mainLock;
539 try {
540 mainLock.lock();
541 int state = this.runState;
542 if (state != TERMINATED) {
543 // don't override shutdownNow
544 this.runState = STOP;
545 }
546
547 // Kill the maintenance thread ...
548
549 // Remove and close all available connections ...
550 if (!this.availableConnections.isEmpty()) {
551 // Drain the extra connections from those available ...
552 drainUnusedConnections(this.availableConnections.size());
553 }
554
555 // If there are connections being used, close them now ...
556 if (!this.inUseConnections.isEmpty()) {
557 for (ConnectionWrapper connectionInUse : this.inUseConnections) {
558 this.logger.trace("Closing repository connection to {0}", getSourceName());
559 connectionInUse.getOriginal().close();
560 }
561 this.poolSize -= this.inUseConnections.size();
562 // The last connection that is closed will transition the runState to TERMINATED ...
563 } else {
564 // There are no connections in use, so trigger full termination now ...
565 fullyTerminated = true;
566 this.logger.trace("Signalling termination of repository connection pool for {0}", getSourceName());
567 runState = TERMINATED;
568 termination.signalAll();
569 this.logger.debug("Terminated repository connection pool for {0}", getSourceName());
570 }
571
572 } finally {
573 mainLock.unlock();
574 }
575 if (fullyTerminated) terminated();
576 }
577
578 /**
579 * Return whether this connection pool is running and is able to {@link #getConnection() provide connections}. Note that this
580 * method is effectively <code>!isShutdown()</code>.
581 *
582 * @return true if this pool is running, or false otherwise
583 * @see #isShutdown()
584 * @see #isTerminated()
585 * @see #isTerminating()
586 */
587 public boolean isRunning() {
588 return runState == RUNNING;
589 }
590
591 /**
592 * Return whether this connection pool is in the process of shutting down or has already been shut down. A result of
593 * <code>true</code> signals that the pool may no longer be used. Note that this method is effectively
594 * <code>!isRunning()</code>.
595 *
596 * @return true if this pool has been shut down, or false otherwise
597 * @see #isShutdown()
598 * @see #isTerminated()
599 * @see #isTerminating()
600 */
601 public boolean isShutdown() {
602 return runState != RUNNING;
603 }
604
605 /**
606 * Returns true if this pool is in the process of terminating after {@link #shutdown()} or {@link #shutdownNow()} has been
607 * called but has not completely terminated. This method may be useful for debugging. A return of <tt>true</tt> reported a
608 * sufficient period after shutdown may indicate that submitted tasks have ignored or suppressed interruption, causing this
609 * executor not to properly terminate.
610 *
611 * @return true if terminating but not yet terminated, or false otherwise
612 * @see #isTerminated()
613 */
614 public boolean isTerminating() {
615 return runState == STOP;
616 }
617
618 /**
619 * Return true if this pool has completed its termination and no longer has any open connections.
620 *
621 * @return true if terminated, or false otherwise
622 * @see #isTerminating()
623 */
624 public boolean isTerminated() {
625 return runState == TERMINATED;
626 }
627
628 /**
629 * Method that can be called after {@link #shutdown()} or {@link #shutdownNow()} to wait until all connections in use at the
630 * time those methods were called have been closed normally. This method accepts a maximum time duration, after which it will
631 * return even if all connections have not been closed.
632 *
633 * @param timeout the maximum time to wait for all connections to be closed and returned to the pool
634 * @param unit the time unit for <code>timeout</code>
635 * @return true if the pool was terminated in the supplied time (or was already terminated), or false if the timeout occurred
636 * before all the connections were closed
637 * @throws InterruptedException if the thread was interrupted
638 */
639 public boolean awaitTermination( long timeout,
640 TimeUnit unit ) throws InterruptedException {
641 this.logger.trace("Awaiting termination");
642 long nanos = unit.toNanos(timeout);
643 final ReentrantLock mainLock = this.mainLock;
644 try {
645 mainLock.lock();
646 for (;;) {
647 // this.logger.trace("---> Run state = {0}; condition = {1}, {2} open", runState, termination, poolSize);
648 if (runState == TERMINATED) return true;
649 if (nanos <= 0) return false;
650 nanos = termination.awaitNanos(nanos);
651 //this.logger.trace("---> Done waiting: run state = {0}; condition = {1}, {2} open",runState,termination,poolSize)
652 // ;
653 }
654 } finally {
655 mainLock.unlock();
656 this.logger.trace("Finished awaiting termination");
657 }
658 }
659
660 /**
661 * Method invoked when the pool has terminated. Default implementation does nothing. Note: To properly nest multiple
662 * overridings, subclasses should generally invoke <tt>super.terminated</tt> within this method.
663 */
664 protected void terminated() {
665 }
666
667 /**
668 * Invokes <tt>shutdown</tt> when this pool is no longer referenced.
669 */
670 @Override
671 protected void finalize() {
672 shutdown();
673 }
674
675 // -------------------------------------------------
676 // Connection management methods ...
677 // -------------------------------------------------
678
679 /**
680 * Get a connection from the pool. This method either returns an unused connection if one is available, creates a connection
681 * if there is still room in the pool, or blocks until a connection becomes available if the pool already contains the maximum
682 * number of connections and all connections are currently being used.
683 *
684 * @return a connection
685 * @throws RepositorySourceException if there is a problem obtaining a connection
686 * @throws IllegalStateException if the factory is not in a state to create or return connections
687 */
688 public RepositoryConnection getConnection() throws RepositorySourceException {
689 int attemptsAllowed = this.maxFailedAttemptsBeforeError.get();
690 ConnectionWrapper connection = null;
691 // Do this until we get a good connection ...
692 int attemptsRemaining = attemptsAllowed;
693 while (connection == null && attemptsRemaining > 0) {
694 --attemptsRemaining;
695 ReentrantLock mainLock = this.mainLock;
696 try {
697 mainLock.lock();
698 // If we're shutting down the pool, then just close the connection ...
699 if (this.runState != RUNNING) {
700 throw new IllegalStateException(GraphI18n.repositoryConnectionPoolIsNotRunning.text());
701 }
702 // If there are fewer total connections than the core size ...
703 if (this.poolSize < this.corePoolSize) {
704 // Immediately create a wrapped connection and return it ...
705 connection = newWrappedConnection();
706 }
707 // Peek to see if there is a connection available ...
708 else if (this.availableConnections.peek() != null) {
709 // There is, so take it and return it ...
710 try {
711 connection = this.availableConnections.take();
712 } catch (InterruptedException e) {
713 this.logger.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName());
714 Thread.interrupted();
715 throw new RepositorySourceException(getSourceName(), e);
716 }
717 }
718 // There is no connection available. If there are fewer total connections than the maximum size ...
719 else if (this.poolSize < this.maximumPoolSize) {
720 // Immediately create a wrapped connection and return it ...
721 connection = newWrappedConnection();
722 }
723 if (connection != null) {
724 this.inUseConnections.add(connection);
725 }
726 } finally {
727 mainLock.unlock();
728 }
729 if (connection == null) {
730 // There are not enough connections, so wait in line for the next available connection ...
731 this.logger.trace("Waiting for a repository connection from pool {0}", getSourceName());
732 try {
733 connection = this.availableConnections.take();
734 } catch (InterruptedException e) {
735 this.logger.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName());
736 Thread.interrupted();
737 throw new RepositorySourceException(getSourceName(), e);
738 }
739 mainLock = this.mainLock;
740 mainLock.lock();
741 try {
742 if (connection != null) {
743 this.inUseConnections.add(connection);
744 }
745 } finally {
746 mainLock.unlock();
747 }
748 this.logger.trace("Recieved a repository connection from pool {0}", getSourceName());
749 }
750 if (connection != null && this.validateConnectionBeforeUse.get()) {
751 try {
752 connection = validateConnection(connection);
753 } catch (InterruptedException e) {
754 this.logger.trace("Cancelled validating a repository connection obtained from pool {0}", getSourceName());
755 returnConnection(connection);
756 Thread.interrupted();
757 throw new RepositorySourceException(getSourceName(), e);
758 }
759 }
760 }
761 if (connection == null) {
762 // We were unable to obtain a usable connection, so fail ...
763 throw new RepositorySourceException(GraphI18n.unableToObtainValidRepositoryAfterAttempts.text(attemptsAllowed));
764 }
765 this.totalConnectionsUsed.incrementAndGet();
766 return connection;
767 }
768
769 /**
770 * This method is automatically called by the {@link ConnectionWrapper} when it is {@link ConnectionWrapper#close() closed}.
771 *
772 * @param wrapper the wrapper to the connection that is being returned to the pool
773 */
774 protected void returnConnection( ConnectionWrapper wrapper ) {
775 assert wrapper != null;
776 ConnectionWrapper wrapperToClose = null;
777 final ReentrantLock mainLock = this.mainLock;
778 try {
779 mainLock.lock();
780 // Remove the connection from the in-use set ...
781 boolean removed = this.inUseConnections.remove(wrapper);
782 assert removed;
783
784 // If we're shutting down the pool, then just close the connection ...
785 if (this.runState != RUNNING) {
786 wrapperToClose = wrapper;
787 }
788 // If there are more connections than the maximum size...
789 else if (this.poolSize > this.maximumPoolSize) {
790 // Immediately close this connection ...
791 wrapperToClose = wrapper;
792 }
793 // Attempt to make the connection available (this should generally work, unless there is an upper limit
794 // to the number of available connections) ...
795 else if (!this.availableConnections.offer(new ConnectionWrapper(wrapper.getOriginal()))) {
796 // The pool of available connection is full, so release it ...
797 wrapperToClose = wrapper;
798 }
799 } finally {
800 mainLock.unlock();
801 }
802 // Close the connection if we're supposed to (do it outside of the main lock)...
803 if (wrapperToClose != null) {
804 closeConnection(wrapperToClose);
805 }
806 }
807
808 /**
809 * Validate the supplied connection, returning the connection if valid or null if the connection is not valid.
810 *
811 * @param connection the connection to be validated; may not be null
812 * @return the validated connection, or null if the connection did not validate and was removed from the pool
813 * @throws InterruptedException if the thread is interrupted while validating the connection
814 */
815 protected ConnectionWrapper validateConnection( ConnectionWrapper connection ) throws InterruptedException {
816 assert connection != null;
817 ConnectionWrapper invalidConnection = null;
818 try {
819 if (!connection.ping(this.pingTimeout.get(), TimeUnit.NANOSECONDS)) {
820 invalidConnection = connection;
821 }
822 } finally {
823 if (invalidConnection != null) {
824 connection = null;
825 returnConnection(invalidConnection);
826 }
827 }
828 return connection;
829 }
830
831 /**
832 * Obtain a new connection wrapped in a {@link ConnectionWrapper}. This method does not check whether creating the new
833 * connection would violate the {@link #maximumPoolSize maximum pool size} nor does it add the new connection to the
834 * {@link #availableConnections available connections} (as the caller may want it immediately), but it does increment the
835 * {@link #poolSize pool size}.
836 *
837 * @return the connection wrapper with a new connection
838 * @throws RepositorySourceException if there was an error obtaining the new connection
839 */
840 @GuardedBy( "mainLock" )
841 protected ConnectionWrapper newWrappedConnection() throws RepositorySourceException {
842 RepositoryConnection connection = this.source.getConnection();
843 ++this.poolSize;
844 this.totalConnectionsCreated.incrementAndGet();
845 return new ConnectionWrapper(connection);
846 }
847
848 /**
849 * Close a connection that is in the pool but no longer in the {@link #availableConnections available connections}. This
850 * method does decrement the {@link #poolSize pool size}.
851 *
852 * @param wrapper the wrapper for the connection to be closed
853 */
854 protected void closeConnection( ConnectionWrapper wrapper ) {
855 assert wrapper != null;
856 RepositoryConnection original = wrapper.getOriginal();
857 assert original != null;
858 try {
859 this.logger.debug("Closing repository connection to {0} ({1} open connections remain)", getSourceName(), poolSize);
860 original.close();
861 } finally {
862 final ReentrantLock mainLock = this.mainLock;
863 try {
864 mainLock.lock();
865 // No matter what reduce the pool size count
866 --this.poolSize;
867 // And if shutting down and this was the last connection being used...
868 if ((runState == SHUTDOWN || runState == STOP) && this.poolSize <= 0) {
869 // then signal anybody that has called "awaitTermination(...)"
870 this.logger.trace("Signalling termination of repository connection pool for {0}", getSourceName());
871 this.runState = TERMINATED;
872 this.termination.signalAll();
873 this.logger.trace("Terminated repository connection pool for {0}", getSourceName());
874
875 // fall through to call terminate() outside of lock.
876 }
877 } finally {
878 mainLock.unlock();
879 }
880 }
881 }
882
883 @GuardedBy( "mainLock" )
884 protected int drainUnusedConnections( int count ) {
885 if (count <= 0) return 0;
886 this.logger.trace("Draining up to {0} unused repository connections to {1}", count, getSourceName());
887 // Drain the extra connections from those available ...
888 Collection<ConnectionWrapper> extraConnections = new LinkedList<ConnectionWrapper>();
889 this.availableConnections.drainTo(extraConnections, count);
890 for (ConnectionWrapper connection : extraConnections) {
891 this.logger.trace("Closing repository connection to {0}", getSourceName());
892 connection.getOriginal().close();
893 }
894 int numClosed = extraConnections.size();
895 this.poolSize -= numClosed;
896 this.logger.trace("Drained {0} unused connections ({1} open connections remain)", numClosed, poolSize);
897 return numClosed;
898 }
899
900 @GuardedBy( "mainLock" )
901 protected boolean addConnectionIfUnderCorePoolSize() throws RepositorySourceException {
902 // Add connection ...
903 if (this.poolSize < this.corePoolSize) {
904 this.availableConnections.offer(newWrappedConnection());
905 this.logger.trace("Added connection to {0} in undersized pool", getSourceName());
906 return true;
907 }
908 return false;
909 }
910
911 @GuardedBy( "mainLock" )
912 protected int addConnectionsIfUnderCorePoolSize() throws RepositorySourceException {
913 // Add connections ...
914 int n = 0;
915 while (this.poolSize < this.corePoolSize) {
916 this.availableConnections.offer(newWrappedConnection());
917 ++n;
918 }
919 this.logger.trace("Added {0} connection(s) to {1} in undersized pool", n, getSourceName());
920 return n;
921 }
922
923 protected class ConnectionWrapper implements RepositoryConnection {
924
925 private final RepositoryConnection original;
926 private final long timeCreated;
927 private long lastUsed;
928 private boolean closed = false;
929
930 protected ConnectionWrapper( RepositoryConnection connection ) {
931 assert connection != null;
932 this.original = connection;
933 this.timeCreated = System.currentTimeMillis();
934 }
935
936 /**
937 * @return original
938 */
939 protected RepositoryConnection getOriginal() {
940 return this.original;
941 }
942
943 /**
944 * @return lastUsed
945 */
946 public long getTimeLastUsed() {
947 return this.lastUsed;
948 }
949
950 /**
951 * @return timeCreated
952 */
953 public long getTimeCreated() {
954 return this.timeCreated;
955 }
956
957 /**
958 * {@inheritDoc}
959 */
960 public String getSourceName() {
961 return this.original.getSourceName();
962 }
963
964 /**
965 * {@inheritDoc}
966 */
967 public XAResource getXAResource() {
968 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
969 return this.original.getXAResource();
970 }
971
972 /**
973 * {@inheritDoc}
974 */
975 public CachePolicy getDefaultCachePolicy() {
976 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
977 return this.original.getDefaultCachePolicy();
978 }
979
980 /**
981 * {@inheritDoc}
982 *
983 * @see org.jboss.dna.graph.connectors.RepositoryConnection#execute(org.jboss.dna.graph.ExecutionContext,
984 * org.jboss.dna.graph.requests.Request)
985 */
986 public void execute( ExecutionContext context,
987 Request request ) throws RepositorySourceException {
988 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
989 this.original.execute(context, request);
990 }
991
992 /**
993 * {@inheritDoc}
994 */
995 public boolean ping( long time,
996 TimeUnit unit ) throws InterruptedException {
997 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
998 return this.original.ping(time, unit);
999 }
1000
1001 /**
1002 * {@inheritDoc}
1003 */
1004 public void close() {
1005 if (!closed) {
1006 this.lastUsed = System.currentTimeMillis();
1007 this.original.close();
1008 this.closed = true;
1009 returnConnection(this);
1010 }
1011 }
1012
1013 /**
1014 * {@inheritDoc}
1015 */
1016 public void setListener( RepositorySourceListener listener ) {
1017 if (!closed) this.original.setListener(listener);
1018 }
1019
1020 }
1021
1022 }