View Javadoc

1   /*
2    * ModeShape (http://www.modeshape.org)
3    * See the COPYRIGHT.txt file distributed with this work for information
4    * regarding copyright ownership.  Some portions may be licensed
5    * to Red Hat, Inc. under one or more contributor license agreements.
6    * See the AUTHORS.txt file in the distribution for a full listing of 
7    * individual contributors. 
8    *
9    * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
10   * is licensed to you under the terms of the GNU Lesser General Public License as
11   * published by the Free Software Foundation; either version 2.1 of
12   * the License, or (at your option) any later version.
13   *
14   * ModeShape is distributed in the hope that it will be useful,
15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17   * Lesser General Public License for more details.
18   *
19   * You should have received a copy of the GNU Lesser General Public
20   * License along with this software; if not, write to the Free
21   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23   */
24  package org.modeshape.graph.connector;
25  
26  import java.util.Collection;
27  import java.util.HashSet;
28  import java.util.LinkedList;
29  import java.util.Set;
30  import java.util.concurrent.BlockingQueue;
31  import java.util.concurrent.LinkedBlockingQueue;
32  import java.util.concurrent.TimeUnit;
33  import java.util.concurrent.atomic.AtomicBoolean;
34  import java.util.concurrent.atomic.AtomicInteger;
35  import java.util.concurrent.atomic.AtomicLong;
36  import java.util.concurrent.locks.Condition;
37  import java.util.concurrent.locks.ReentrantLock;
38  import javax.transaction.xa.XAResource;
39  import net.jcip.annotations.GuardedBy;
40  import net.jcip.annotations.ThreadSafe;
41  import org.modeshape.common.annotation.Category;
42  import org.modeshape.common.annotation.Description;
43  import org.modeshape.common.annotation.Label;
44  import org.modeshape.common.util.CheckArg;
45  import org.modeshape.common.util.Logger;
46  import org.modeshape.graph.ExecutionContext;
47  import org.modeshape.graph.GraphI18n;
48  import org.modeshape.graph.cache.CachePolicy;
49  import org.modeshape.graph.request.Request;
50  
51  /**
52   * A reusable implementation of a managed pool of connections that is optimized for safe concurrent operations.
53   */
54  @ThreadSafe
55  public class RepositoryConnectionPool {
56  
57      /**
58       * The default time to wait for ping when called from a client.
59       */
60      public static final long DEFAULT_TIME_TO_WAIT = 10;
61  
62      /**
63       * The default TimeUnit to wait for ping when called from a client.
64       */
65      public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
66  
67      /**
68       * The core pool size for default-constructed pools is {@value} .
69       */
70      public static final int DEFAULT_CORE_POOL_SIZE = 1;
71  
72      /**
73       * The maximum pool size for default-constructed pools is {@value} .
74       */
75      public static final int DEFAULT_MAXIMUM_POOL_SIZE = 10;
76  
77      /**
78       * The keep-alive time for connections in default-constructed pools is * * * * {@value} seconds.
79       */
80      public static final long DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS = 30;
81  
82      /**
83       * Permission for checking shutdown
84       */
85      private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
86  
87      /**
88       * The source that this pool uses to create new connections.
89       */
90      private final RepositorySource source;
91  
92      /**
93       * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and workers set.
94       */
95      private final ReentrantLock mainLock = new ReentrantLock();
96  
97      /**
98       * Wait condition to support awaitTermination
99       */
100     private final Condition termination = mainLock.newCondition();
101 
102     /**
103      * Set containing all connections that are available for use.
104      */
105     @GuardedBy( "mainLock" )
106     private final BlockingQueue<ConnectionWrapper> availableConnections = new LinkedBlockingQueue<ConnectionWrapper>();
107 
108     /**
109      * The connections that are currently in use.
110      */
111     @GuardedBy( "mainLock" )
112     private final Set<ConnectionWrapper> inUseConnections = new HashSet<ConnectionWrapper>();
113 
114     /**
115      * Timeout in nanoseconds for idle connections waiting to be used. Threads use this timeout only when there are more than
116      * corePoolSize present. Otherwise they wait forever to be used.
117      */
118     @Description( i18n = GraphI18n.class, value = "poolKeepAliveTimeDescription" )
119     @Label( i18n = GraphI18n.class, value = "poolKeepAliveTimeLabel" )
120     @Category( i18n = GraphI18n.class, value = "poolKeepAliveTimeCategory" )
121     private volatile long keepAliveTime;
122 
123     /**
124      * The target pool size, updated only while holding mainLock, but volatile to allow concurrent readability even during
125      * updates.
126      */
127     @Description( i18n = GraphI18n.class, value = "poolCorePoolSizeDescription" )
128     @Label( i18n = GraphI18n.class, value = "poolCorePoolSizeLabel" )
129     @Category( i18n = GraphI18n.class, value = "poolCorePoolSizeCategory" )
130     @GuardedBy( "mainLock" )
131     private volatile int corePoolSize;
132 
133     /**
134      * Maximum pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
135      */
136     @Description( i18n = GraphI18n.class, value = "poolMaxiumumPoolSizeDescription" )
137     @Label( i18n = GraphI18n.class, value = "poolMaxiumumPoolSizeLabel" )
138     @Category( i18n = GraphI18n.class, value = "poolMaxiumumPoolSizeCategory" )
139     @GuardedBy( "mainLock" )
140     private volatile int maximumPoolSize;
141 
142     /**
143      * Current pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
144      */
145     @GuardedBy( "mainLock" )
146     private volatile int poolSize;
147 
148     /**
149      * Lifecycle state, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
150      */
151     @GuardedBy( "mainLock" )
152     private volatile int runState;
153 
154     // Special values for runState
155     /** Normal, not-shutdown mode */
156     static final int RUNNING = 0;
157     /** Controlled shutdown mode */
158     static final int SHUTDOWN = 1;
159     /** Immediate shutdown mode */
160     static final int STOP = 2;
161     /** Final state */
162     static final int TERMINATED = 3;
163 
164     /**
165      * Flag specifying whether a connection should be validated before returning it from the {@link #getConnection()} method.
166      */
167     @Description( i18n = GraphI18n.class, value = "poolValidateConnectionBeforeUseDescription" )
168     @Label( i18n = GraphI18n.class, value = "poolValidateConnectionBeforeUseLabel" )
169     @Category( i18n = GraphI18n.class, value = "poolValidateConnectionBeforeUseCategory" )
170     private final AtomicBoolean validateConnectionBeforeUse = new AtomicBoolean(false);
171 
172     /**
173      * The time in nanoseconds that ping should wait before timing out and failing.
174      */
175     @Description( i18n = GraphI18n.class, value = "poolPingTimeoutDescription" )
176     @Label( i18n = GraphI18n.class, value = "poolPingTimeoutLabel" )
177     @Category( i18n = GraphI18n.class, value = "poolPingTimeoutCategory" )
178     private final AtomicLong pingTimeout = new AtomicLong(0);
179 
180     /**
181      * The number of times an attempt to obtain a connection should fail with invalid connections before throwing an exception.
182      */
183     @Description( i18n = GraphI18n.class, value = "poolMaximumFailedAttemptsBeforeErrorDescription" )
184     @Label( i18n = GraphI18n.class, value = "poolMaximumFailedAttemptsBeforeErrorLabel" )
185     @Category( i18n = GraphI18n.class, value = "poolMaximumFailedAttemptsBeforeErrorCategory" )
186     private final AtomicInteger maxFailedAttemptsBeforeError = new AtomicInteger(10);
187 
188     private final AtomicLong totalConnectionsCreated = new AtomicLong(0);
189 
190     private final AtomicLong totalConnectionsUsed = new AtomicLong(0);
191 
192     private static final Logger LOGGER = Logger.getLogger(RepositoryConnectionPool.class);
193 
194     /**
195      * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}. This constructor
196      * uses the {@link #DEFAULT_CORE_POOL_SIZE default core pool size}, {@link #DEFAULT_MAXIMUM_POOL_SIZE default maximum pool
197      * size}, and {@link #DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS default keep-alive time (in seconds)}.
198      * 
199      * @param source the source for connections
200      * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid
201      */
202     public RepositoryConnectionPool( RepositorySource source ) {
203         this(source, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS, TimeUnit.SECONDS);
204     }
205 
206     /**
207      * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}.
208      * 
209      * @param source the source for connections
210      * @param corePoolSize the number of connections to keep in the pool, even if they are idle.
211      * @param maximumPoolSize the maximum number of connections to allow in the pool.
212      * @param keepAliveTime when the number of connection is greater than the core, this is the maximum time that excess idle
213      *        connections will be kept before terminating.
214      * @param unit the time unit for the keepAliveTime argument.
215      * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid
216      */
217     public RepositoryConnectionPool( RepositorySource source,
218                                      int corePoolSize,
219                                      int maximumPoolSize,
220                                      long keepAliveTime,
221                                      TimeUnit unit ) {
222         CheckArg.isNonNegative(corePoolSize, "corePoolSize");
223         CheckArg.isPositive(maximumPoolSize, "maximumPoolSize");
224         CheckArg.isNonNegative(keepAliveTime, "keepAliveTime");
225         CheckArg.isNotNull(source, "source");
226         if (maximumPoolSize < corePoolSize) {
227             throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
228         }
229         this.source = source;
230         this.corePoolSize = corePoolSize;
231         this.maximumPoolSize = maximumPoolSize;
232         this.keepAliveTime = unit.toNanos(keepAliveTime);
233         this.setPingTimeout(100, TimeUnit.MILLISECONDS);
234     }
235 
236     /**
237      * Used to get and test a connection in the pool
238      * 
239      * @param time
240      * @param unit
241      * @return boolean
242      * @throws InterruptedException
243      */
244     public boolean ping( long time,
245                          TimeUnit unit ) throws InterruptedException {
246 
247         if (isRunning()) {
248             RepositoryConnection connection = null;
249             Throwable error = null;
250             try {
251                 connection = getConnection();
252                 return connection.ping(time, unit);
253             } catch (RuntimeException t) {
254                 error = t;
255                 throw t;
256             } catch (InterruptedException t) {
257                 error = t;
258                 throw t;
259             } finally {
260                 if (connection != null) {
261                     try {
262                         connection.close();
263                     } catch (RuntimeException e) {
264                         if (error == null) throw e;
265                     }
266                 }
267             }
268         }
269         return false;
270     }
271 
272     /**
273      * Used to get and test a connection in the pool with default wait and time unit values
274      * 
275      * @return boolean
276      * @throws InterruptedException
277      */
278     public boolean ping() throws InterruptedException {
279         return ping(DEFAULT_TIME_TO_WAIT, DEFAULT_TIME_UNIT);
280     }
281 
282     /**
283      * Get the {@link RepositorySource} that's used by this pool.
284      * 
285      * @return the repository source; never null
286      */
287     public final RepositorySource getRepositorySource() {
288         return source;
289     }
290 
291     /**
292      * Get the name of this pool, which delegates to the connection factory.
293      * 
294      * @return the name of the source
295      */
296     protected String getSourceName() {
297         return source.getName();
298     }
299 
300     // -------------------------------------------------
301     // Property settings ...
302     // -------------------------------------------------
303 
304     /**
305      * @return validateConnectionBeforeUse
306      */
307     public boolean getValidateConnectionBeforeUse() {
308         return this.validateConnectionBeforeUse.get();
309     }
310 
311     /**
312      * @param validateConnectionBeforeUse Sets validateConnectionBeforeUse to the specified value.
313      */
314     public void setValidateConnectionBeforeUse( boolean validateConnectionBeforeUse ) {
315         this.validateConnectionBeforeUse.set(validateConnectionBeforeUse);
316     }
317 
318     /**
319      * @return pingTimeout
320      */
321     public long getPingTimeoutInNanos() {
322         return this.pingTimeout.get();
323     }
324 
325     /**
326      * @param pingTimeout the time to wait for a ping to complete
327      * @param unit the time unit of the time argument
328      */
329     public void setPingTimeout( long pingTimeout,
330                                 TimeUnit unit ) {
331         CheckArg.isNonNegative(pingTimeout, "time");
332         this.pingTimeout.set(unit.toNanos(pingTimeout));
333     }
334 
335     /**
336      * The ping timeout, in seconds.
337      * 
338      * @return pingTimeout
339      */
340     public long getPingTimeout() {
341         return TimeUnit.NANOSECONDS.toSeconds(this.pingTimeout.get());
342     }
343 
344     /**
345      * Sets the ping timeout, in seconds.
346      * 
347      * @param pingTimeoutInSeconds the time to wait for a ping to complete
348      */
349     public void setPingTimeout( long pingTimeoutInSeconds ) {
350         setPingTimeout(pingTimeoutInSeconds, TimeUnit.SECONDS);
351     }
352 
353     /**
354      * @return maxFailedAttemptsBeforeError
355      */
356     public int getMaxFailedAttemptsBeforeError() {
357         return this.maxFailedAttemptsBeforeError.get();
358     }
359 
360     /**
361      * @param maxFailedAttemptsBeforeError Sets maxFailedAttemptsBeforeError to the specified value.
362      */
363     public void setMaxFailedAttemptsBeforeError( int maxFailedAttemptsBeforeError ) {
364         this.maxFailedAttemptsBeforeError.set(maxFailedAttemptsBeforeError);
365     }
366 
367     /**
368      * Sets the time limit for which connections may remain idle before being closed. If there are more than the core number of
369      * connections currently in the pool, after waiting this amount of time without being used, excess threads will be terminated.
370      * This overrides any value set in the constructor.
371      * 
372      * @param time the time to wait. A time value of zero will cause excess connections to terminate immediately after being
373      *        returned.
374      * @param unit the time unit of the time argument
375      * @throws IllegalArgumentException if time less than zero
376      * @see #getKeepAliveTime
377      */
378     public void setKeepAliveTime( long time,
379                                   TimeUnit unit ) {
380         CheckArg.isNonNegative(time, "time");
381         this.keepAliveTime = unit.toNanos(time);
382     }
383 
384     /**
385      * Returns the connection keep-alive time, which is the amount of time which connections in excess of the core pool size may
386      * remain idle before being closed.
387      * 
388      * @param unit the desired time unit of the result
389      * @return the time limit
390      * @see #setKeepAliveTime
391      */
392     public long getKeepAliveTime( TimeUnit unit ) {
393         assert unit != null;
394         return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
395     }
396 
397     /**
398      * Returns the connection keep-alive time, which is the amount of time which connections in excess of the core pool size may
399      * remain idle before being closed.
400      * 
401      * @return the time limit in seconds
402      * @see #setKeepAliveTime
403      */
404     public long getKeepAliveTime() {
405         return getKeepAliveTime(TimeUnit.SECONDS);
406     }
407 
408     /**
409      * Sets the time limit for which connections may remain idle before being closed. If there are more than the core number of
410      * connections currently in the pool, after waiting this amount of time without being used, excess threads will be terminated.
411      * This overrides any value set in the constructor.
412      * 
413      * @param time the time to wait, in seconds. A time value of zero will cause excess connections to terminate immediately after
414      *        being returned.
415      * @throws IllegalArgumentException if time less than zero
416      * @see #getKeepAliveTime
417      */
418     public void setKeepAliveTime( long time ) {
419         setKeepAliveTime(time, TimeUnit.SECONDS);
420     }
421 
422     /**
423      * @return maximumPoolSize
424      */
425     public int getMaximumPoolSize() {
426         return this.maximumPoolSize;
427     }
428 
429     /**
430      * Sets the maximum allowed number of connections. This overrides any value set in the constructor. If the new value is
431      * smaller than the current value, excess existing but unused connections will be closed.
432      * 
433      * @param maximumPoolSize the new maximum
434      * @throws IllegalArgumentException if maximumPoolSize less than zero or the {@link #getCorePoolSize() core pool size}
435      * @see #getMaximumPoolSize
436      */
437     public void setMaximumPoolSize( int maximumPoolSize ) {
438         CheckArg.isPositive(maximumPoolSize, "maximum pool size");
439         if (maximumPoolSize < corePoolSize) {
440             throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
441         }
442         final ReentrantLock mainLock = this.mainLock;
443         try {
444             mainLock.lock();
445             int extra = this.maximumPoolSize - maximumPoolSize;
446             this.maximumPoolSize = maximumPoolSize;
447             if (extra > 0 && poolSize > maximumPoolSize) {
448                 // Drain the extra connections from those available ...
449                 drainUnusedConnections(extra);
450             }
451         } finally {
452             mainLock.unlock();
453         }
454     }
455 
456     /**
457      * Returns the core number of connections.
458      * 
459      * @return the core number of connections
460      * @see #setCorePoolSize(int)
461      */
462     public int getCorePoolSize() {
463         return this.corePoolSize;
464     }
465 
466     /**
467      * Sets the core number of connections. This overrides any value set in the constructor. If the new value is smaller than the
468      * current value, excess existing and unused connections will be closed. If larger, new connections will, if needed, be
469      * created.
470      * 
471      * @param corePoolSize the new core size
472      * @throws RepositorySourceException if there was an error obtaining the new connection
473      * @throws InterruptedException if the thread was interrupted during the operation
474      * @throws IllegalArgumentException if <tt>corePoolSize</tt> less than zero
475      * @see #getCorePoolSize()
476      */
477     public void setCorePoolSize( int corePoolSize ) throws RepositorySourceException, InterruptedException {
478         CheckArg.isNonNegative(corePoolSize, "core pool size");
479         if (maximumPoolSize < corePoolSize) {
480             throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
481         }
482         final ReentrantLock mainLock = this.mainLock;
483         try {
484             mainLock.lock();
485             int extra = this.corePoolSize - corePoolSize;
486             this.corePoolSize = corePoolSize;
487             if (extra < 0) {
488                 // Add connections ...
489                 addConnectionsIfUnderCorePoolSize();
490             } else if (extra > 0 && poolSize > corePoolSize) {
491                 // Drain the extra connections from those available ...
492                 drainUnusedConnections(extra);
493             }
494         } finally {
495             mainLock.unlock();
496         }
497     }
498 
499     // -------------------------------------------------
500     // Statistics ...
501     // -------------------------------------------------
502 
503     /**
504      * Returns the current number of connections in the pool, including those that are checked out (in use) and those that are not
505      * being used.
506      * 
507      * @return the number of connections
508      */
509     public int getPoolSize() {
510         return poolSize;
511     }
512 
513     /**
514      * Returns the approximate number of connections that are currently checked out from the pool.
515      * 
516      * @return the number of checked-out connections
517      */
518     public int getInUseCount() {
519         final ReentrantLock mainLock = this.mainLock;
520         try {
521             mainLock.lock();
522             return this.inUseConnections.size();
523         } finally {
524             mainLock.unlock();
525         }
526     }
527 
528     /**
529      * Get the total number of connections that have been created by this pool.
530      * 
531      * @return the total number of connections created by this pool
532      */
533     public long getTotalConnectionsCreated() {
534         return this.totalConnectionsCreated.get();
535     }
536 
537     /**
538      * Get the total number of times connections have been {@link #getConnection()} used.
539      * 
540      * @return the total number
541      */
542     public long getTotalConnectionsUsed() {
543         return this.totalConnectionsUsed.get();
544     }
545 
546     // -------------------------------------------------
547     // State management methods ...
548     // -------------------------------------------------
549 
550     /**
551      * Starts a core connection, causing it to idly wait for use. This overrides the default policy of starting core connections
552      * only when they are {@link #getConnection() needed}. This method will return <tt>false</tt> if all core connections have
553      * already been started.
554      * 
555      * @return true if a connection was started
556      * @throws RepositorySourceException if there was an error obtaining the new connection
557      * @throws InterruptedException if the thread was interrupted during the operation
558      */
559     public boolean prestartCoreConnection() throws RepositorySourceException, InterruptedException {
560         final ReentrantLock mainLock = this.mainLock;
561         try {
562             mainLock.lock();
563             return addConnectionIfUnderCorePoolSize();
564         } finally {
565             mainLock.unlock();
566         }
567     }
568 
569     /**
570      * Starts all core connections, causing them to idly wait for use. This overrides the default policy of starting core
571      * connections only when they are {@link #getConnection() needed}.
572      * 
573      * @return the number of connections started.
574      * @throws RepositorySourceException if there was an error obtaining the new connection
575      * @throws InterruptedException if the thread was interrupted during the operation
576      */
577     public int prestartAllCoreConnections() throws RepositorySourceException, InterruptedException {
578         final ReentrantLock mainLock = this.mainLock;
579         try {
580             mainLock.lock();
581             return addConnectionsIfUnderCorePoolSize();
582         } finally {
583             mainLock.unlock();
584         }
585     }
586 
587     /**
588      * Initiates an orderly shutdown of the pool in which connections that are currently in use are allowed to be used and closed
589      * as normal, but no new connections will be created. Invocation has no additional effect if already shut down.
590      * <p>
591      * Once the pool has been shutdown, it may not be used to {@link #getConnection() get connections}.
592      * </p>
593      * 
594      * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller
595      *         is not permitted to modify because it does not hold {@link java.lang.RuntimePermission} <tt>("modifyThread")</tt>,
596      *         or the security manager's <tt>checkAccess</tt> method denies access.
597      * @see #shutdownNow()
598      */
599     public void shutdown() {
600         // Fail if caller doesn't have modifyThread permission. We
601         // explicitly check permissions directly because we can't trust
602         // implementations of SecurityManager to correctly override
603         // the "check access" methods such that our documented
604         // security policy is implemented.
605         SecurityManager security = System.getSecurityManager();
606         if (security != null) java.security.AccessController.checkPermission(shutdownPerm);
607 
608         LOGGER.debug("Shutting down repository connection pool for {0}", getSourceName());
609         boolean fullyTerminated = false;
610         final ReentrantLock mainLock = this.mainLock;
611         try {
612             mainLock.lock();
613             int state = this.runState;
614             if (state == RUNNING) {
615                 // don't override shutdownNow
616                 this.runState = SHUTDOWN;
617             }
618 
619             // Kill the maintenance thread ...
620 
621             // Remove and close all available connections ...
622             if (!this.availableConnections.isEmpty()) {
623                 // Drain the extra connections from those available ...
624                 drainUnusedConnections(this.availableConnections.size());
625             }
626 
627             // If there are no connections being used, trigger full termination
628             // now ...
629             if (this.inUseConnections.isEmpty()) {
630                 fullyTerminated = true;
631                 LOGGER.trace("Signalling termination of repository connection pool for {0}", getSourceName());
632                 runState = TERMINATED;
633                 termination.signalAll();
634                 LOGGER.debug("Terminated repository connection pool for {0}", getSourceName());
635             }
636             // Otherwise the last connection that is closed will transition the
637             // runState to TERMINATED ...
638         } finally {
639             mainLock.unlock();
640         }
641         if (fullyTerminated) terminated();
642     }
643 
644     /**
645      * Attempts to close all connections in the pool, including those connections currently in use, and prevent the use of other
646      * connections.
647      * 
648      * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller
649      *         is not permitted to modify because it does not hold {@link java.lang.RuntimePermission} <tt>("modifyThread")</tt>,
650      *         or the security manager's <tt>checkAccess</tt> method denies access.
651      * @see #shutdown()
652      */
653     public void shutdownNow() {
654         // Almost the same code as shutdown()
655         SecurityManager security = System.getSecurityManager();
656         if (security != null) java.security.AccessController.checkPermission(shutdownPerm);
657 
658         LOGGER.debug("Shutting down (immediately) repository connection pool for {0}", getSourceName());
659         boolean fullyTerminated = false;
660         final ReentrantLock mainLock = this.mainLock;
661         try {
662             mainLock.lock();
663             int state = this.runState;
664             if (state != TERMINATED) {
665                 // don't override shutdownNow
666                 this.runState = STOP;
667             }
668 
669             // Kill the maintenance thread ...
670 
671             // Remove and close all available connections ...
672             if (!this.availableConnections.isEmpty()) {
673                 // Drain the extra connections from those available ...
674                 drainUnusedConnections(this.availableConnections.size());
675             }
676 
677             // If there are connections being used, close them now ...
678             if (!this.inUseConnections.isEmpty()) {
679                 for (ConnectionWrapper connectionInUse : this.inUseConnections) {
680                     LOGGER.trace("Closing repository connection to {0}", getSourceName());
681                     connectionInUse.getOriginal().close();
682                 }
683                 this.poolSize -= this.inUseConnections.size();
684                 // The last connection that is closed will transition the
685                 // runState to TERMINATED ...
686             } else {
687                 // There are no connections in use, so trigger full termination
688                 // now ...
689                 fullyTerminated = true;
690                 LOGGER.trace("Signalling termination of repository connection pool for {0}", getSourceName());
691                 runState = TERMINATED;
692                 termination.signalAll();
693                 LOGGER.debug("Terminated repository connection pool for {0}", getSourceName());
694             }
695 
696         } finally {
697             mainLock.unlock();
698         }
699         if (fullyTerminated) terminated();
700     }
701 
702     /**
703      * Return whether this connection pool is running and is able to {@link #getConnection() provide connections}. Note that this
704      * method is effectively <code>!isShutdown()</code>.
705      * 
706      * @return true if this pool is running, or false otherwise
707      * @see #isShutdown()
708      * @see #isTerminated()
709      * @see #isTerminating()
710      */
711     public boolean isRunning() {
712         return runState == RUNNING;
713     }
714 
715     /**
716      * Return whether this connection pool is in the process of shutting down or has already been shut down. A result of
717      * <code>true</code> signals that the pool may no longer be used. Note that this method is effectively
718      * <code>!isRunning()</code>.
719      * 
720      * @return true if this pool has been shut down, or false otherwise
721      * @see #isShutdown()
722      * @see #isTerminated()
723      * @see #isTerminating()
724      */
725     public boolean isShutdown() {
726         return runState != RUNNING;
727     }
728 
729     /**
730      * Returns true if this pool is in the process of terminating after {@link #shutdown()} or {@link #shutdownNow()} has been
731      * called but has not completely terminated. This method may be useful for debugging. A return of <tt>true</tt> reported a
732      * sufficient period after shutdown may indicate that submitted tasks have ignored or suppressed interruption, causing this
733      * executor not to properly terminate.
734      * 
735      * @return true if terminating but not yet terminated, or false otherwise
736      * @see #isTerminated()
737      */
738     public boolean isTerminating() {
739         return runState == STOP;
740     }
741 
742     /**
743      * Return true if this pool has completed its termination and no longer has any open connections.
744      * 
745      * @return true if terminated, or false otherwise
746      * @see #isTerminating()
747      */
748     public boolean isTerminated() {
749         return runState == TERMINATED;
750     }
751 
752     /**
753      * Method that can be called after {@link #shutdown()} or {@link #shutdownNow()} to wait until all connections in use at the
754      * time those methods were called have been closed normally. This method accepts a maximum time duration, after which it will
755      * return even if all connections have not been closed.
756      * 
757      * @param timeout the maximum time to wait for all connections to be closed and returned to the pool
758      * @param unit the time unit for <code>timeout</code>
759      * @return true if the pool was terminated in the supplied time (or was already terminated), or false if the timeout occurred
760      *         before all the connections were closed
761      * @throws InterruptedException if the thread was interrupted
762      */
763     public boolean awaitTermination( long timeout,
764                                      TimeUnit unit ) throws InterruptedException {
765         LOGGER.trace("Awaiting termination");
766         long nanos = unit.toNanos(timeout);
767         final ReentrantLock mainLock = this.mainLock;
768         try {
769             mainLock.lock();
770             for (;;) {
771                 // LOGGER.trace("---> Run state = {0}; condition = {1}, {2} open",
772                 // runState, termination, poolSize);
773                 if (runState == TERMINATED) return true;
774                 if (nanos <= 0) return false;
775                 nanos = termination.awaitNanos(nanos);
776                 // LOGGER.trace("---> Done waiting: run state = {0}; condition = {1}, {2} open",runState,termination,poolSize)
777                 // ;
778             }
779         } finally {
780             mainLock.unlock();
781             LOGGER.trace("Finished awaiting termination");
782         }
783     }
784 
785     /**
786      * Method invoked when the pool has terminated. Default implementation does nothing. Note: To properly nest multiple
787      * overridings, subclasses should generally invoke <tt>super.terminated</tt> within this method.
788      */
789     protected void terminated() {
790     }
791 
792     /**
793      * Invokes <tt>shutdown</tt> when this pool is no longer referenced.
794      */
795     @Override
796     protected void finalize() {
797         shutdown();
798     }
799 
800     // -------------------------------------------------
801     // Connection management methods ...
802     // -------------------------------------------------
803 
804     /**
805      * Get a connection from the pool. This method either returns an unused connection if one is available, creates a connection
806      * if there is still room in the pool, or blocks until a connection becomes available if the pool already contains the maximum
807      * number of connections and all connections are currently being used.
808      * 
809      * @return a connection
810      * @throws RepositorySourceException if there is a problem obtaining a connection
811      * @throws IllegalStateException if the factory is not in a state to create or return connections
812      */
813     public RepositoryConnection getConnection() throws RepositorySourceException {
814         int attemptsAllowed = this.maxFailedAttemptsBeforeError.get();
815         ConnectionWrapper connection = null;
816         // Do this until we get a good connection ...
817         int attemptsRemaining = attemptsAllowed;
818         while (connection == null && attemptsRemaining > 0) {
819             --attemptsRemaining;
820             ReentrantLock mainLock = this.mainLock;
821             try {
822                 mainLock.lock();
823                 // If we're shutting down the pool, then just close the
824                 // connection ...
825                 if (this.runState != RUNNING) {
826                     throw new IllegalStateException(GraphI18n.repositoryConnectionPoolIsNotRunning.text());
827                 }
828                 // If there are fewer total connections than the core size ...
829                 if (this.poolSize < this.corePoolSize) {
830                     // Immediately create a wrapped connection and return it ...
831                     connection = newWrappedConnection();
832                 }
833                 // Peek to see if there is a connection available ...
834                 else if (this.availableConnections.peek() != null) {
835                     // There is, so take it and return it ...
836                     try {
837                         connection = this.availableConnections.take();
838                     } catch (InterruptedException e) {
839                         LOGGER.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName());
840                         Thread.interrupted();
841                         throw new RepositorySourceException(getSourceName(), e);
842                     }
843                 }
844                 // There is no connection available. If there are fewer total
845                 // connections than the maximum size ...
846                 else if (this.poolSize < this.maximumPoolSize) {
847                     // Immediately create a wrapped connection and return it ...
848                     connection = newWrappedConnection();
849                 }
850                 if (connection != null) {
851                     this.inUseConnections.add(connection);
852                 }
853             } finally {
854                 mainLock.unlock();
855             }
856             if (connection == null) {
857                 // There are not enough connections, so wait in line for the
858                 // next available connection ...
859                 LOGGER.trace("Waiting for a repository connection from pool {0}", getSourceName());
860                 try {
861                     connection = this.availableConnections.take();
862                 } catch (InterruptedException e) {
863                     LOGGER.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName());
864                     Thread.interrupted();
865                     throw new RepositorySourceException(getSourceName(), e);
866                 }
867                 mainLock = this.mainLock;
868                 mainLock.lock();
869                 try {
870                     if (connection != null) {
871                         this.inUseConnections.add(connection);
872                     }
873                 } finally {
874                     mainLock.unlock();
875                 }
876                 LOGGER.trace("Recieved a repository connection from pool {0}", getSourceName());
877             }
878             if (connection != null && this.validateConnectionBeforeUse.get()) {
879                 try {
880                     connection = validateConnection(connection);
881                 } catch (InterruptedException e) {
882                     LOGGER.trace("Cancelled validating a repository connection obtained from pool {0}", getSourceName());
883                     returnConnection(connection);
884                     Thread.interrupted();
885                     throw new RepositorySourceException(getSourceName(), e);
886                 }
887             }
888         }
889         if (connection == null) {
890             // We were unable to obtain a usable connection, so fail ...
891             throw new RepositorySourceException(GraphI18n.unableToObtainValidRepositoryAfterAttempts.text(attemptsAllowed));
892         }
893         this.totalConnectionsUsed.incrementAndGet();
894         return connection;
895     }
896 
897     /**
898      * This method is automatically called by the {@link ConnectionWrapper} when it is {@link ConnectionWrapper#close() closed}.
899      * 
900      * @param wrapper the wrapper to the connection that is being returned to the pool
901      */
902     protected void returnConnection( ConnectionWrapper wrapper ) {
903         assert wrapper != null;
904         ConnectionWrapper wrapperToClose = null;
905         final ReentrantLock mainLock = this.mainLock;
906         try {
907             mainLock.lock();
908             // Remove the connection from the in-use set ...
909             boolean removed = this.inUseConnections.remove(wrapper);
910             assert removed;
911 
912             // If we're shutting down the pool, then just close the connection
913             // ...
914             if (this.runState != RUNNING) {
915                 wrapperToClose = wrapper;
916             }
917             // If there are more connections than the maximum size...
918             else if (this.poolSize > this.maximumPoolSize) {
919                 // Immediately close this connection ...
920                 wrapperToClose = wrapper;
921             }
922             // Attempt to make the connection available (this should generally
923             // work, unless there is an upper limit
924             // to the number of available connections) ...
925             else if (!this.availableConnections.offer(new ConnectionWrapper(wrapper.getOriginal()))) {
926                 // The pool of available connection is full, so release it ...
927                 wrapperToClose = wrapper;
928             }
929         } finally {
930             mainLock.unlock();
931         }
932         // Close the connection if we're supposed to (do it outside of the main
933         // lock)...
934         if (wrapperToClose != null) {
935             closeConnection(wrapperToClose);
936         }
937     }
938 
939     /**
940      * Validate the supplied connection, returning the connection if valid or null if the connection is not valid.
941      * 
942      * @param connection the connection to be validated; may not be null
943      * @return the validated connection, or null if the connection did not validate and was removed from the pool
944      * @throws InterruptedException if the thread is interrupted while validating the connection
945      */
946     protected ConnectionWrapper validateConnection( ConnectionWrapper connection ) throws InterruptedException {
947         assert connection != null;
948         ConnectionWrapper invalidConnection = null;
949         try {
950             if (!connection.ping(this.pingTimeout.get(), TimeUnit.NANOSECONDS)) {
951                 invalidConnection = connection;
952             }
953         } finally {
954             if (invalidConnection != null) {
955                 try {
956                     mainLock.lock();
957                     connection = null;
958                     this.poolSize--;
959                     this.inUseConnections.remove(connection);
960                     // returnConnection(invalidConnection);
961                 } finally {
962                     mainLock.unlock();
963                 }
964             }
965         }
966         return connection;
967     }
968 
969     /**
970      * Obtain a new connection wrapped in a {@link ConnectionWrapper}. This method does not check whether creating the new
971      * connection would violate the {@link #maximumPoolSize maximum pool size} nor does it add the new connection to the
972      * {@link #availableConnections available connections} (as the caller may want it immediately), but it does increment the
973      * {@link #poolSize pool size}.
974      * 
975      * @return the connection wrapper with a new connection
976      * @throws RepositorySourceException if there was an error obtaining the new connection
977      */
978     @GuardedBy( "mainLock" )
979     protected ConnectionWrapper newWrappedConnection() throws RepositorySourceException {
980         RepositoryConnection connection = this.source.getConnection();
981         ++this.poolSize;
982         this.totalConnectionsCreated.incrementAndGet();
983         return new ConnectionWrapper(connection);
984     }
985 
986     /**
987      * Close a connection that is in the pool but no longer in the {@link #availableConnections available connections}. This
988      * method does decrement the {@link #poolSize pool size}.
989      * 
990      * @param wrapper the wrapper for the connection to be closed
991      */
992     protected void closeConnection( ConnectionWrapper wrapper ) {
993         assert wrapper != null;
994         RepositoryConnection original = wrapper.getOriginal();
995         assert original != null;
996         try {
997             LOGGER.debug("Closing repository connection to {0} ({1} open connections remain)", getSourceName(), poolSize);
998             original.close();
999         } finally {
1000             final ReentrantLock mainLock = this.mainLock;
1001             try {
1002                 mainLock.lock();
1003                 // No matter what reduce the pool size count
1004                 --this.poolSize;
1005                 // And if shutting down and this was the last connection being
1006                 // used...
1007                 if ((runState == SHUTDOWN || runState == STOP) && this.poolSize <= 0) {
1008                     // then signal anybody that has called
1009                     // "awaitTermination(...)"
1010                     LOGGER.trace("Signalling termination of repository connection pool for {0}", getSourceName());
1011                     this.runState = TERMINATED;
1012                     this.termination.signalAll();
1013                     LOGGER.trace("Terminated repository connection pool for {0}", getSourceName());
1014 
1015                     // fall through to call terminate() outside of lock.
1016                 }
1017             } finally {
1018                 mainLock.unlock();
1019             }
1020         }
1021     }
1022 
1023     @GuardedBy( "mainLock" )
1024     protected int drainUnusedConnections( int count ) {
1025         if (count <= 0) return 0;
1026         LOGGER.trace("Draining up to {0} unused repository connections to {1}", count, getSourceName());
1027         // Drain the extra connections from those available ...
1028         Collection<ConnectionWrapper> extraConnections = new LinkedList<ConnectionWrapper>();
1029         this.availableConnections.drainTo(extraConnections, count);
1030         for (ConnectionWrapper connection : extraConnections) {
1031             LOGGER.trace("Closing repository connection to {0}", getSourceName());
1032             connection.getOriginal().close();
1033         }
1034         int numClosed = extraConnections.size();
1035         this.poolSize -= numClosed;
1036         LOGGER.trace("Drained {0} unused connections ({1} open connections remain)", numClosed, poolSize);
1037         return numClosed;
1038     }
1039 
1040     @GuardedBy( "mainLock" )
1041     protected boolean addConnectionIfUnderCorePoolSize() throws RepositorySourceException {
1042         // Add connection ...
1043         if (this.poolSize < this.corePoolSize) {
1044             this.availableConnections.offer(newWrappedConnection());
1045             LOGGER.trace("Added connection to {0} in undersized pool", getSourceName());
1046             return true;
1047         }
1048         return false;
1049     }
1050 
1051     @GuardedBy( "mainLock" )
1052     protected int addConnectionsIfUnderCorePoolSize() throws RepositorySourceException {
1053         // Add connections ...
1054         int n = 0;
1055         while (this.poolSize < this.corePoolSize) {
1056             this.availableConnections.offer(newWrappedConnection());
1057             ++n;
1058         }
1059         LOGGER.trace("Added {0} connection(s) to {1} in undersized pool", n, getSourceName());
1060         return n;
1061     }
1062 
1063     protected class ConnectionWrapper implements RepositoryConnection {
1064 
1065         private final RepositoryConnection original;
1066         private final long timeCreated;
1067         private long lastUsed;
1068         private boolean closed = false;
1069 
1070         protected ConnectionWrapper( RepositoryConnection connection ) {
1071             assert connection != null;
1072             this.original = connection;
1073             this.timeCreated = System.currentTimeMillis();
1074         }
1075 
1076         /**
1077          * @return original
1078          */
1079         protected RepositoryConnection getOriginal() {
1080             return this.original;
1081         }
1082 
1083         /**
1084          * @return lastUsed
1085          */
1086         public long getTimeLastUsed() {
1087             return this.lastUsed;
1088         }
1089 
1090         /**
1091          * @return timeCreated
1092          */
1093         public long getTimeCreated() {
1094             return this.timeCreated;
1095         }
1096 
1097         /**
1098          * {@inheritDoc}
1099          */
1100         public String getSourceName() {
1101             return this.original.getSourceName();
1102         }
1103 
1104         /**
1105          * {@inheritDoc}
1106          */
1107         public XAResource getXAResource() {
1108             if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
1109             return this.original.getXAResource();
1110         }
1111 
1112         /**
1113          * {@inheritDoc}
1114          */
1115         public CachePolicy getDefaultCachePolicy() {
1116             if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
1117             return this.original.getDefaultCachePolicy();
1118         }
1119 
1120         /**
1121          * {@inheritDoc}
1122          * 
1123          * @see org.modeshape.graph.connector.RepositoryConnection#execute(org.modeshape.graph.ExecutionContext,
1124          *      org.modeshape.graph.request.Request)
1125          */
1126         public void execute( ExecutionContext context,
1127                              Request request ) throws RepositorySourceException {
1128             if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
1129             this.original.execute(context, request);
1130         }
1131 
1132         /**
1133          * {@inheritDoc}
1134          */
1135         public boolean ping( long time,
1136                              TimeUnit unit ) throws InterruptedException {
1137             if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
1138             return this.original.ping(time, unit);
1139         }
1140 
1141         /**
1142          * {@inheritDoc}
1143          */
1144         public void close() {
1145             if (!closed) {
1146                 this.lastUsed = System.currentTimeMillis();
1147                 this.closed = true;
1148                 returnConnection(this);
1149             }
1150         }
1151     }
1152 
1153 }