View Javadoc

1   /*
2    * ModeShape (http://www.modeshape.org)
3    * See the COPYRIGHT.txt file distributed with this work for information
4    * regarding copyright ownership.  Some portions may be licensed
5    * to Red Hat, Inc. under one or more contributor license agreements.
6    * See the AUTHORS.txt file in the distribution for a full listing of 
7    * individual contributors. 
8    *
9    * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
10   * is licensed to you under the terms of the GNU Lesser General Public License as
11   * published by the Free Software Foundation; either version 2.1 of
12   * the License, or (at your option) any later version.
13   *
14   * ModeShape is distributed in the hope that it will be useful,
15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17   * Lesser General Public License for more details.
18   *
19   * You should have received a copy of the GNU Lesser General Public
20   * License along with this software; if not, write to the Free
21   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23   */
24  package org.modeshape.graph.connector;
25  
26  import net.jcip.annotations.GuardedBy;
27  import net.jcip.annotations.ThreadSafe;
28  import org.modeshape.common.util.CheckArg;
29  import org.modeshape.common.util.Logger;
30  import org.modeshape.graph.ExecutionContext;
31  import org.modeshape.graph.GraphI18n;
32  import org.modeshape.graph.cache.CachePolicy;
33  import org.modeshape.graph.request.Request;
34  
35  import javax.transaction.xa.XAResource;
36  import java.util.Collection;
37  import java.util.HashSet;
38  import java.util.LinkedList;
39  import java.util.Set;
40  import java.util.concurrent.BlockingQueue;
41  import java.util.concurrent.LinkedBlockingQueue;
42  import java.util.concurrent.TimeUnit;
43  import java.util.concurrent.atomic.AtomicBoolean;
44  import java.util.concurrent.atomic.AtomicInteger;
45  import java.util.concurrent.atomic.AtomicLong;
46  import java.util.concurrent.locks.Condition;
47  import java.util.concurrent.locks.ReentrantLock;
48  
49  /**
50   * A reusable implementation of a managed pool of connections that is optimized for safe concurrent operations.
51   */
52  @ThreadSafe
53  public class RepositoryConnectionPool {
54  
55      /**
56       * The core pool size for default-constructed pools is {@value} .
57       */
58      public static final int DEFAULT_CORE_POOL_SIZE = 1;
59  
60      /**
61       * The maximum pool size for default-constructed pools is {@value} .
62       */
63      public static final int DEFAULT_MAXIMUM_POOL_SIZE = 10;
64  
65      /**
66       * The keep-alive time for connections in default-constructed pools is {@value} seconds.
67       */
68      public static final long DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS = 30;
69  
70      /**
71       * Permission for checking shutdown
72       */
73      private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
74  
75      /**
76       * The source that this pool uses to create new connections.
77       */
78      private final RepositorySource source;
79  
80      /**
81       * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and workers set.
82       */
83      private final ReentrantLock mainLock = new ReentrantLock();
84  
85      /**
86       * Wait condition to support awaitTermination
87       */
88      private final Condition termination = mainLock.newCondition();
89  
90      /**
91       * Set containing all connections that are available for use.
92       */
93      @GuardedBy( "mainLock" )
94      private final BlockingQueue<ConnectionWrapper> availableConnections = new LinkedBlockingQueue<ConnectionWrapper>();
95  
96      /**
97       * The connections that are currently in use.
98       */
99      @GuardedBy( "mainLock" )
100     private final Set<ConnectionWrapper> inUseConnections = new HashSet<ConnectionWrapper>();
101 
102     /**
103      * Timeout in nanoseconds for idle connections waiting to be used. Threads use this timeout only when there are more than
104      * corePoolSize present. Otherwise they wait forever to be used.
105      */
106     private volatile long keepAliveTime;
107 
108     /**
109      * The target pool size, updated only while holding mainLock, but volatile to allow concurrent readability even during
110      * updates.
111      */
112     @GuardedBy( "mainLock" )
113     private volatile int corePoolSize;
114 
115     /**
116      * Maximum pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
117      */
118     @GuardedBy( "mainLock" )
119     private volatile int maximumPoolSize;
120 
121     /**
122      * Current pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
123      */
124     @GuardedBy( "mainLock" )
125     private volatile int poolSize;
126 
127     /**
128      * Lifecycle state, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
129      */
130     @GuardedBy( "mainLock" )
131     private volatile int runState;
132 
133     // Special values for runState
134     /** Normal, not-shutdown mode */
135     static final int RUNNING = 0;
136     /** Controlled shutdown mode */
137     static final int SHUTDOWN = 1;
138     /** Immediate shutdown mode */
139     static final int STOP = 2;
140     /** Final state */
141     static final int TERMINATED = 3;
142 
143     /**
144      * Flag specifying whether a connection should be validated before returning it from the {@link #getConnection()} method.
145      */
146     private final AtomicBoolean validateConnectionBeforeUse = new AtomicBoolean(false);
147 
148     /**
149      * The time in nanoseconds that ping should wait before timing out and failing.
150      */
151     private final AtomicLong pingTimeout = new AtomicLong(0);
152 
153     /**
154      * The number of times an attempt to obtain a connection should fail with invalid connections before throwing an exception.
155      */
156     private final AtomicInteger maxFailedAttemptsBeforeError = new AtomicInteger(10);
157 
158     private final AtomicLong totalConnectionsCreated = new AtomicLong(0);
159 
160     private final AtomicLong totalConnectionsUsed = new AtomicLong(0);
161 
162     private static final Logger LOGGER = Logger.getLogger(RepositoryConnectionPool.class);
163 
164     /**
165      * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}. This constructor
166      * uses the {@link #DEFAULT_CORE_POOL_SIZE default core pool size}, {@link #DEFAULT_MAXIMUM_POOL_SIZE default maximum pool
167      * size}, and {@link #DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS default keep-alive time (in seconds)}.
168      * 
169      * @param source the source for connections
170      * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid
171      */
172     public RepositoryConnectionPool( RepositorySource source ) {
173         this(source, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS, TimeUnit.SECONDS);
174     }
175 
176     /**
177      * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}.
178      * 
179      * @param source the source for connections
180      * @param corePoolSize the number of connections to keep in the pool, even if they are idle.
181      * @param maximumPoolSize the maximum number of connections to allow in the pool.
182      * @param keepAliveTime when the number of connection is greater than the core, this is the maximum time that excess idle
183      *        connections will be kept before terminating.
184      * @param unit the time unit for the keepAliveTime argument.
185      * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid
186      */
187     public RepositoryConnectionPool( RepositorySource source,
188                                      int corePoolSize,
189                                      int maximumPoolSize,
190                                      long keepAliveTime,
191                                      TimeUnit unit ) {
192         CheckArg.isNonNegative(corePoolSize, "corePoolSize");
193         CheckArg.isPositive(maximumPoolSize, "maximumPoolSize");
194         CheckArg.isNonNegative(keepAliveTime, "keepAliveTime");
195         CheckArg.isNotNull(source, "source");
196         if (maximumPoolSize < corePoolSize) {
197             throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
198         }
199         this.source = source;
200         this.corePoolSize = corePoolSize;
201         this.maximumPoolSize = maximumPoolSize;
202         this.keepAliveTime = unit.toNanos(keepAliveTime);
203         this.setPingTimeout(100, TimeUnit.MILLISECONDS);
204     }
205 
206     /**
207      * Get the {@link RepositorySource} that's used by this pool.
208      * 
209      * @return the repository source; never null
210      */
211     public final RepositorySource getRepositorySource() {
212         return source;
213     }
214 
215     /**
216      * Get the name of this pool, which delegates to the connection factory.
217      * 
218      * @return the name of the source
219      */
220     protected String getSourceName() {
221         return source.getName();
222     }
223 
224     // -------------------------------------------------
225     // Property settings ...
226     // -------------------------------------------------
227 
228     /**
229      * @return validateConnectionBeforeUse
230      */
231     public boolean getValidateConnectionBeforeUse() {
232         return this.validateConnectionBeforeUse.get();
233     }
234 
235     /**
236      * @param validateConnectionBeforeUse Sets validateConnectionBeforeUse to the specified value.
237      */
238     public void setValidateConnectionBeforeUse( boolean validateConnectionBeforeUse ) {
239         this.validateConnectionBeforeUse.set(validateConnectionBeforeUse);
240     }
241 
242     /**
243      * @return pingTimeout
244      */
245     public long getPingTimeoutInNanos() {
246         return this.pingTimeout.get();
247     }
248 
249     /**
250      * @param pingTimeout the time to wait for a ping to complete
251      * @param unit the time unit of the time argument
252      */
253     public void setPingTimeout( long pingTimeout,
254                                 TimeUnit unit ) {
255         CheckArg.isNonNegative(pingTimeout, "time");
256         this.pingTimeout.set(unit.toNanos(pingTimeout));
257     }
258 
259     /**
260      * @return maxFailedAttemptsBeforeError
261      */
262     public int getMaxFailedAttemptsBeforeError() {
263         return this.maxFailedAttemptsBeforeError.get();
264     }
265 
266     /**
267      * @param maxFailedAttemptsBeforeError Sets maxFailedAttemptsBeforeError to the specified value.
268      */
269     public void setMaxFailedAttemptsBeforeError( int maxFailedAttemptsBeforeError ) {
270         this.maxFailedAttemptsBeforeError.set(maxFailedAttemptsBeforeError);
271     }
272 
273     /**
274      * Sets the time limit for which connections may remain idle before being closed. If there are more than the core number of
275      * connections currently in the pool, after waiting this amount of time without being used, excess threads will be terminated.
276      * This overrides any value set in the constructor.
277      * 
278      * @param time the time to wait. A time value of zero will cause excess connections to terminate immediately after being
279      *        returned.
280      * @param unit the time unit of the time argument
281      * @throws IllegalArgumentException if time less than zero
282      * @see #getKeepAliveTime
283      */
284     public void setKeepAliveTime( long time,
285                                   TimeUnit unit ) {
286         CheckArg.isNonNegative(time, "time");
287         this.keepAliveTime = unit.toNanos(time);
288     }
289 
290     /**
291      * Returns the connection keep-alive time, which is the amount of time which connections in excess of the core pool size may
292      * remain idle before being closed.
293      * 
294      * @param unit the desired time unit of the result
295      * @return the time limit
296      * @see #setKeepAliveTime
297      */
298     public long getKeepAliveTime( TimeUnit unit ) {
299         assert unit != null;
300         return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
301     }
302 
303     /**
304      * @return maximumPoolSize
305      */
306     public int getMaximumPoolSize() {
307         return this.maximumPoolSize;
308     }
309 
310     /**
311      * Sets the maximum allowed number of connections. This overrides any value set in the constructor. If the new value is
312      * smaller than the current value, excess existing but unused connections will be closed.
313      * 
314      * @param maximumPoolSize the new maximum
315      * @throws IllegalArgumentException if maximumPoolSize less than zero or the {@link #getCorePoolSize() core pool size}
316      * @see #getMaximumPoolSize
317      */
318     public void setMaximumPoolSize( int maximumPoolSize ) {
319         CheckArg.isPositive(maximumPoolSize, "maximum pool size");
320         if (maximumPoolSize < corePoolSize) {
321             throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
322         }
323         final ReentrantLock mainLock = this.mainLock;
324         try {
325             mainLock.lock();
326             int extra = this.maximumPoolSize - maximumPoolSize;
327             this.maximumPoolSize = maximumPoolSize;
328             if (extra > 0 && poolSize > maximumPoolSize) {
329                 // Drain the extra connections from those available ...
330                 drainUnusedConnections(extra);
331             }
332         } finally {
333             mainLock.unlock();
334         }
335     }
336 
337     /**
338      * Returns the core number of connections.
339      * 
340      * @return the core number of connections
341      * @see #setCorePoolSize(int)
342      */
343     public int getCorePoolSize() {
344         return this.corePoolSize;
345     }
346 
347     /**
348      * Sets the core number of connections. This overrides any value set in the constructor. If the new value is smaller than the
349      * current value, excess existing and unused connections will be closed. If larger, new connections will, if needed, be
350      * created.
351      * 
352      * @param corePoolSize the new core size
353      * @throws RepositorySourceException if there was an error obtaining the new connection
354      * @throws InterruptedException if the thread was interrupted during the operation
355      * @throws IllegalArgumentException if <tt>corePoolSize</tt> less than zero
356      * @see #getCorePoolSize()
357      */
358     public void setCorePoolSize( int corePoolSize ) throws RepositorySourceException, InterruptedException {
359         CheckArg.isNonNegative(corePoolSize, "core pool size");
360         if (maximumPoolSize < corePoolSize) {
361             throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
362         }
363         final ReentrantLock mainLock = this.mainLock;
364         try {
365             mainLock.lock();
366             int extra = this.corePoolSize - corePoolSize;
367             this.corePoolSize = corePoolSize;
368             if (extra < 0) {
369                 // Add connections ...
370                 addConnectionsIfUnderCorePoolSize();
371             } else if (extra > 0 && poolSize > corePoolSize) {
372                 // Drain the extra connections from those available ...
373                 drainUnusedConnections(extra);
374             }
375         } finally {
376             mainLock.unlock();
377         }
378     }
379 
380     // -------------------------------------------------
381     // Statistics ...
382     // -------------------------------------------------
383 
384     /**
385      * Returns the current number of connections in the pool, including those that are checked out (in use) and those that are not
386      * being used.
387      * 
388      * @return the number of connections
389      */
390     public int getPoolSize() {
391         return poolSize;
392     }
393 
394     /**
395      * Returns the approximate number of connections that are currently checked out from the pool.
396      * 
397      * @return the number of checked-out connections
398      */
399     public int getInUseCount() {
400         final ReentrantLock mainLock = this.mainLock;
401         try {
402             mainLock.lock();
403             return this.inUseConnections.size();
404         } finally {
405             mainLock.unlock();
406         }
407     }
408 
409     /**
410      * Get the total number of connections that have been created by this pool.
411      * 
412      * @return the total number of connections created by this pool
413      */
414     public long getTotalConnectionsCreated() {
415         return this.totalConnectionsCreated.get();
416     }
417 
418     /**
419      * Get the total number of times connections have been {@link #getConnection()} used.
420      * 
421      * @return the total number
422      */
423     public long getTotalConnectionsUsed() {
424         return this.totalConnectionsUsed.get();
425     }
426 
427     // -------------------------------------------------
428     // State management methods ...
429     // -------------------------------------------------
430 
431     /**
432      * Starts a core connection, causing it to idly wait for use. This overrides the default policy of starting core connections
433      * only when they are {@link #getConnection() needed}. This method will return <tt>false</tt> if all core connections have
434      * already been started.
435      * 
436      * @return true if a connection was started
437      * @throws RepositorySourceException if there was an error obtaining the new connection
438      * @throws InterruptedException if the thread was interrupted during the operation
439      */
440     public boolean prestartCoreConnection() throws RepositorySourceException, InterruptedException {
441         final ReentrantLock mainLock = this.mainLock;
442         try {
443             mainLock.lock();
444             return addConnectionIfUnderCorePoolSize();
445         } finally {
446             mainLock.unlock();
447         }
448     }
449 
450     /**
451      * Starts all core connections, causing them to idly wait for use. This overrides the default policy of starting core
452      * connections only when they are {@link #getConnection() needed}.
453      * 
454      * @return the number of connections started.
455      * @throws RepositorySourceException if there was an error obtaining the new connection
456      * @throws InterruptedException if the thread was interrupted during the operation
457      */
458     public int prestartAllCoreConnections() throws RepositorySourceException, InterruptedException {
459         final ReentrantLock mainLock = this.mainLock;
460         try {
461             mainLock.lock();
462             return addConnectionsIfUnderCorePoolSize();
463         } finally {
464             mainLock.unlock();
465         }
466     }
467 
468     /**
469      * Initiates an orderly shutdown of the pool in which connections that are currently in use are allowed to be used and closed
470      * as normal, but no new connections will be created. Invocation has no additional effect if already shut down.
471      * <p>
472      * Once the pool has been shutdown, it may not be used to {@link #getConnection() get connections}.
473      * </p>
474      * 
475      * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller
476      *         is not permitted to modify because it does not hold {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
477      *         or the security manager's <tt>checkAccess</tt> method denies access.
478      * @see #shutdownNow()
479      */
480     public void shutdown() {
481         // Fail if caller doesn't have modifyThread permission. We
482         // explicitly check permissions directly because we can't trust
483         // implementations of SecurityManager to correctly override
484         // the "check access" methods such that our documented
485         // security policy is implemented.
486         SecurityManager security = System.getSecurityManager();
487         if (security != null) java.security.AccessController.checkPermission(shutdownPerm);
488 
489         LOGGER.debug("Shutting down repository connection pool for {0}", getSourceName());
490         boolean fullyTerminated = false;
491         final ReentrantLock mainLock = this.mainLock;
492         try {
493             mainLock.lock();
494             int state = this.runState;
495             if (state == RUNNING) {
496                 // don't override shutdownNow
497                 this.runState = SHUTDOWN;
498             }
499 
500             // Kill the maintenance thread ...
501 
502             // Remove and close all available connections ...
503             if (!this.availableConnections.isEmpty()) {
504                 // Drain the extra connections from those available ...
505                 drainUnusedConnections(this.availableConnections.size());
506             }
507 
508             // If there are no connections being used, trigger full termination now ...
509             if (this.inUseConnections.isEmpty()) {
510                 fullyTerminated = true;
511                 LOGGER.trace("Signalling termination of repository connection pool for {0}", getSourceName());
512                 runState = TERMINATED;
513                 termination.signalAll();
514                 LOGGER.debug("Terminated repository connection pool for {0}", getSourceName());
515             }
516             // Otherwise the last connection that is closed will transition the runState to TERMINATED ...
517         } finally {
518             mainLock.unlock();
519         }
520         if (fullyTerminated) terminated();
521     }
522 
523     /**
524      * Attempts to close all connections in the pool, including those connections currently in use, and prevent the use of other
525      * connections.
526      * 
527      * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller
528      *         is not permitted to modify because it does not hold {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
529      *         or the security manager's <tt>checkAccess</tt> method denies access.
530      * @see #shutdown()
531      */
532     public void shutdownNow() {
533         // Almost the same code as shutdown()
534         SecurityManager security = System.getSecurityManager();
535         if (security != null) java.security.AccessController.checkPermission(shutdownPerm);
536 
537         LOGGER.debug("Shutting down (immediately) repository connection pool for {0}", getSourceName());
538         boolean fullyTerminated = false;
539         final ReentrantLock mainLock = this.mainLock;
540         try {
541             mainLock.lock();
542             int state = this.runState;
543             if (state != TERMINATED) {
544                 // don't override shutdownNow
545                 this.runState = STOP;
546             }
547 
548             // Kill the maintenance thread ...
549 
550             // Remove and close all available connections ...
551             if (!this.availableConnections.isEmpty()) {
552                 // Drain the extra connections from those available ...
553                 drainUnusedConnections(this.availableConnections.size());
554             }
555 
556             // If there are connections being used, close them now ...
557             if (!this.inUseConnections.isEmpty()) {
558                 for (ConnectionWrapper connectionInUse : this.inUseConnections) {
559                     LOGGER.trace("Closing repository connection to {0}", getSourceName());
560                     connectionInUse.getOriginal().close();
561                 }
562                 this.poolSize -= this.inUseConnections.size();
563                 // The last connection that is closed will transition the runState to TERMINATED ...
564             } else {
565                 // There are no connections in use, so trigger full termination now ...
566                 fullyTerminated = true;
567                 LOGGER.trace("Signalling termination of repository connection pool for {0}", getSourceName());
568                 runState = TERMINATED;
569                 termination.signalAll();
570                 LOGGER.debug("Terminated repository connection pool for {0}", getSourceName());
571             }
572 
573         } finally {
574             mainLock.unlock();
575         }
576         if (fullyTerminated) terminated();
577     }
578 
579     /**
580      * Return whether this connection pool is running and is able to {@link #getConnection() provide connections}. Note that this
581      * method is effectively <code>!isShutdown()</code>.
582      * 
583      * @return true if this pool is running, or false otherwise
584      * @see #isShutdown()
585      * @see #isTerminated()
586      * @see #isTerminating()
587      */
588     public boolean isRunning() {
589         return runState == RUNNING;
590     }
591 
592     /**
593      * Return whether this connection pool is in the process of shutting down or has already been shut down. A result of
594      * <code>true</code> signals that the pool may no longer be used. Note that this method is effectively
595      * <code>!isRunning()</code>.
596      * 
597      * @return true if this pool has been shut down, or false otherwise
598      * @see #isShutdown()
599      * @see #isTerminated()
600      * @see #isTerminating()
601      */
602     public boolean isShutdown() {
603         return runState != RUNNING;
604     }
605 
606     /**
607      * Returns true if this pool is in the process of terminating after {@link #shutdown()} or {@link #shutdownNow()} has been
608      * called but has not completely terminated. This method may be useful for debugging. A return of <tt>true</tt> reported a
609      * sufficient period after shutdown may indicate that submitted tasks have ignored or suppressed interruption, causing this
610      * executor not to properly terminate.
611      * 
612      * @return true if terminating but not yet terminated, or false otherwise
613      * @see #isTerminated()
614      */
615     public boolean isTerminating() {
616         return runState == STOP;
617     }
618 
619     /**
620      * Return true if this pool has completed its termination and no longer has any open connections.
621      * 
622      * @return true if terminated, or false otherwise
623      * @see #isTerminating()
624      */
625     public boolean isTerminated() {
626         return runState == TERMINATED;
627     }
628 
629     /**
630      * Method that can be called after {@link #shutdown()} or {@link #shutdownNow()} to wait until all connections in use at the
631      * time those methods were called have been closed normally. This method accepts a maximum time duration, after which it will
632      * return even if all connections have not been closed.
633      * 
634      * @param timeout the maximum time to wait for all connections to be closed and returned to the pool
635      * @param unit the time unit for <code>timeout</code>
636      * @return true if the pool was terminated in the supplied time (or was already terminated), or false if the timeout occurred
637      *         before all the connections were closed
638      * @throws InterruptedException if the thread was interrupted
639      */
640     public boolean awaitTermination( long timeout,
641                                      TimeUnit unit ) throws InterruptedException {
642         LOGGER.trace("Awaiting termination");
643         long nanos = unit.toNanos(timeout);
644         final ReentrantLock mainLock = this.mainLock;
645         try {
646             mainLock.lock();
647             for (;;) {
648                 // LOGGER.trace("---> Run state = {0}; condition = {1}, {2} open", runState, termination, poolSize);
649                 if (runState == TERMINATED) return true;
650                 if (nanos <= 0) return false;
651                 nanos = termination.awaitNanos(nanos);
652                 // LOGGER.trace("---> Done waiting: run state = {0}; condition = {1}, {2} open",runState,termination,poolSize)
653                 // ;
654             }
655         } finally {
656             mainLock.unlock();
657             LOGGER.trace("Finished awaiting termination");
658         }
659     }
660 
661     /**
662      * Method invoked when the pool has terminated. Default implementation does nothing. Note: To properly nest multiple
663      * overridings, subclasses should generally invoke <tt>super.terminated</tt> within this method.
664      */
665     protected void terminated() {
666     }
667 
668     /**
669      * Invokes <tt>shutdown</tt> when this pool is no longer referenced.
670      */
671     @Override
672     protected void finalize() {
673         shutdown();
674     }
675 
676     // -------------------------------------------------
677     // Connection management methods ...
678     // -------------------------------------------------
679 
680     /**
681      * Get a connection from the pool. This method either returns an unused connection if one is available, creates a connection
682      * if there is still room in the pool, or blocks until a connection becomes available if the pool already contains the maximum
683      * number of connections and all connections are currently being used.
684      * 
685      * @return a connection
686      * @throws RepositorySourceException if there is a problem obtaining a connection
687      * @throws IllegalStateException if the factory is not in a state to create or return connections
688      */
689     public RepositoryConnection getConnection() throws RepositorySourceException {
690         int attemptsAllowed = this.maxFailedAttemptsBeforeError.get();
691         ConnectionWrapper connection = null;
692         // Do this until we get a good connection ...
693         int attemptsRemaining = attemptsAllowed;
694         while (connection == null && attemptsRemaining > 0) {
695             --attemptsRemaining;
696             ReentrantLock mainLock = this.mainLock;
697             try {
698                 mainLock.lock();
699                 // If we're shutting down the pool, then just close the connection ...
700                 if (this.runState != RUNNING) {
701                     throw new IllegalStateException(GraphI18n.repositoryConnectionPoolIsNotRunning.text());
702                 }
703                 // If there are fewer total connections than the core size ...
704                 if (this.poolSize < this.corePoolSize) {
705                     // Immediately create a wrapped connection and return it ...
706                     connection = newWrappedConnection();
707                 }
708                 // Peek to see if there is a connection available ...
709                 else if (this.availableConnections.peek() != null) {
710                     // There is, so take it and return it ...
711                     try {
712                         connection = this.availableConnections.take();
713                     } catch (InterruptedException e) {
714                         LOGGER.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName());
715                         Thread.interrupted();
716                         throw new RepositorySourceException(getSourceName(), e);
717                     }
718                 }
719                 // There is no connection available. If there are fewer total connections than the maximum size ...
720                 else if (this.poolSize < this.maximumPoolSize) {
721                     // Immediately create a wrapped connection and return it ...
722                     connection = newWrappedConnection();
723                 }
724                 if (connection != null) {
725                     this.inUseConnections.add(connection);
726                 }
727             } finally {
728                 mainLock.unlock();
729             }
730             if (connection == null) {
731                 // There are not enough connections, so wait in line for the next available connection ...
732                 LOGGER.trace("Waiting for a repository connection from pool {0}", getSourceName());
733                 try {
734                     connection = this.availableConnections.take();
735                 } catch (InterruptedException e) {
736                     LOGGER.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName());
737                     Thread.interrupted();
738                     throw new RepositorySourceException(getSourceName(), e);
739                 }
740                 mainLock = this.mainLock;
741                 mainLock.lock();
742                 try {
743                     if (connection != null) {
744                         this.inUseConnections.add(connection);
745                     }
746                 } finally {
747                     mainLock.unlock();
748                 }
749                 LOGGER.trace("Recieved a repository connection from pool {0}", getSourceName());
750             }
751             if (connection != null && this.validateConnectionBeforeUse.get()) {
752                 try {
753                     connection = validateConnection(connection);
754                 } catch (InterruptedException e) {
755                     LOGGER.trace("Cancelled validating a repository connection obtained from pool {0}", getSourceName());
756                     returnConnection(connection);
757                     Thread.interrupted();
758                     throw new RepositorySourceException(getSourceName(), e);
759                 }
760             }
761         }
762         if (connection == null) {
763             // We were unable to obtain a usable connection, so fail ...
764             throw new RepositorySourceException(GraphI18n.unableToObtainValidRepositoryAfterAttempts.text(attemptsAllowed));
765         }
766         this.totalConnectionsUsed.incrementAndGet();
767         return connection;
768     }
769 
770     /**
771      * This method is automatically called by the {@link ConnectionWrapper} when it is {@link ConnectionWrapper#close() closed}.
772      * 
773      * @param wrapper the wrapper to the connection that is being returned to the pool
774      */
775     protected void returnConnection( ConnectionWrapper wrapper ) {
776         assert wrapper != null;
777         ConnectionWrapper wrapperToClose = null;
778         final ReentrantLock mainLock = this.mainLock;
779         try {
780             mainLock.lock();
781             // Remove the connection from the in-use set ...
782             boolean removed = this.inUseConnections.remove(wrapper);
783             assert removed;
784 
785             // If we're shutting down the pool, then just close the connection ...
786             if (this.runState != RUNNING) {
787                 wrapperToClose = wrapper;
788             }
789             // If there are more connections than the maximum size...
790             else if (this.poolSize > this.maximumPoolSize) {
791                 // Immediately close this connection ...
792                 wrapperToClose = wrapper;
793             }
794             // Attempt to make the connection available (this should generally work, unless there is an upper limit
795             // to the number of available connections) ...
796             else if (!this.availableConnections.offer(new ConnectionWrapper(wrapper.getOriginal()))) {
797                 // The pool of available connection is full, so release it ...
798                 wrapperToClose = wrapper;
799             }
800         } finally {
801             mainLock.unlock();
802         }
803         // Close the connection if we're supposed to (do it outside of the main lock)...
804         if (wrapperToClose != null) {
805             closeConnection(wrapperToClose);
806         }
807     }
808 
809     /**
810      * Validate the supplied connection, returning the connection if valid or null if the connection is not valid.
811      * 
812      * @param connection the connection to be validated; may not be null
813      * @return the validated connection, or null if the connection did not validate and was removed from the pool
814      * @throws InterruptedException if the thread is interrupted while validating the connection
815      */
816     protected ConnectionWrapper validateConnection( ConnectionWrapper connection ) throws InterruptedException {
817         assert connection != null;
818         ConnectionWrapper invalidConnection = null;
819         try {
820             if (!connection.ping(this.pingTimeout.get(), TimeUnit.NANOSECONDS)) {
821                 invalidConnection = connection;
822             }
823         } finally {
824             if (invalidConnection != null) {
825                 try {
826                     mainLock.lock();
827                     connection = null;
828                     this.poolSize--;
829                     this.inUseConnections.remove(connection);
830                     // returnConnection(invalidConnection);
831                 } finally {
832                     mainLock.unlock();
833                 }
834             }
835         }
836         return connection;
837     }
838 
839     /**
840      * Obtain a new connection wrapped in a {@link ConnectionWrapper}. This method does not check whether creating the new
841      * connection would violate the {@link #maximumPoolSize maximum pool size} nor does it add the new connection to the
842      * {@link #availableConnections available connections} (as the caller may want it immediately), but it does increment the
843      * {@link #poolSize pool size}.
844      * 
845      * @return the connection wrapper with a new connection
846      * @throws RepositorySourceException if there was an error obtaining the new connection
847      */
848     @GuardedBy( "mainLock" )
849     protected ConnectionWrapper newWrappedConnection() throws RepositorySourceException {
850         RepositoryConnection connection = this.source.getConnection();
851         ++this.poolSize;
852         this.totalConnectionsCreated.incrementAndGet();
853         return new ConnectionWrapper(connection);
854     }
855 
856     /**
857      * Close a connection that is in the pool but no longer in the {@link #availableConnections available connections}. This
858      * method does decrement the {@link #poolSize pool size}.
859      * 
860      * @param wrapper the wrapper for the connection to be closed
861      */
862     protected void closeConnection( ConnectionWrapper wrapper ) {
863         assert wrapper != null;
864         RepositoryConnection original = wrapper.getOriginal();
865         assert original != null;
866         try {
867             LOGGER.debug("Closing repository connection to {0} ({1} open connections remain)", getSourceName(), poolSize);
868             original.close();
869         } finally {
870             final ReentrantLock mainLock = this.mainLock;
871             try {
872                 mainLock.lock();
873                 // No matter what reduce the pool size count
874                 --this.poolSize;
875                 // And if shutting down and this was the last connection being used...
876                 if ((runState == SHUTDOWN || runState == STOP) && this.poolSize <= 0) {
877                     // then signal anybody that has called "awaitTermination(...)"
878                     LOGGER.trace("Signalling termination of repository connection pool for {0}", getSourceName());
879                     this.runState = TERMINATED;
880                     this.termination.signalAll();
881                     LOGGER.trace("Terminated repository connection pool for {0}", getSourceName());
882 
883                     // fall through to call terminate() outside of lock.
884                 }
885             } finally {
886                 mainLock.unlock();
887             }
888         }
889     }
890 
891     @GuardedBy( "mainLock" )
892     protected int drainUnusedConnections( int count ) {
893         if (count <= 0) return 0;
894         LOGGER.trace("Draining up to {0} unused repository connections to {1}", count, getSourceName());
895         // Drain the extra connections from those available ...
896         Collection<ConnectionWrapper> extraConnections = new LinkedList<ConnectionWrapper>();
897         this.availableConnections.drainTo(extraConnections, count);
898         for (ConnectionWrapper connection : extraConnections) {
899             LOGGER.trace("Closing repository connection to {0}", getSourceName());
900             connection.getOriginal().close();
901         }
902         int numClosed = extraConnections.size();
903         this.poolSize -= numClosed;
904         LOGGER.trace("Drained {0} unused connections ({1} open connections remain)", numClosed, poolSize);
905         return numClosed;
906     }
907 
908     @GuardedBy( "mainLock" )
909     protected boolean addConnectionIfUnderCorePoolSize() throws RepositorySourceException {
910         // Add connection ...
911         if (this.poolSize < this.corePoolSize) {
912             this.availableConnections.offer(newWrappedConnection());
913             LOGGER.trace("Added connection to {0} in undersized pool", getSourceName());
914             return true;
915         }
916         return false;
917     }
918 
919     @GuardedBy( "mainLock" )
920     protected int addConnectionsIfUnderCorePoolSize() throws RepositorySourceException {
921         // Add connections ...
922         int n = 0;
923         while (this.poolSize < this.corePoolSize) {
924             this.availableConnections.offer(newWrappedConnection());
925             ++n;
926         }
927         LOGGER.trace("Added {0} connection(s) to {1} in undersized pool", n, getSourceName());
928         return n;
929     }
930 
931     protected class ConnectionWrapper implements RepositoryConnection {
932 
933         private final RepositoryConnection original;
934         private final long timeCreated;
935         private long lastUsed;
936         private boolean closed = false;
937 
938         protected ConnectionWrapper( RepositoryConnection connection ) {
939             assert connection != null;
940             this.original = connection;
941             this.timeCreated = System.currentTimeMillis();
942         }
943 
944         /**
945          * @return original
946          */
947         protected RepositoryConnection getOriginal() {
948             return this.original;
949         }
950 
951         /**
952          * @return lastUsed
953          */
954         public long getTimeLastUsed() {
955             return this.lastUsed;
956         }
957 
958         /**
959          * @return timeCreated
960          */
961         public long getTimeCreated() {
962             return this.timeCreated;
963         }
964 
965         /**
966          * {@inheritDoc}
967          */
968         public String getSourceName() {
969             return this.original.getSourceName();
970         }
971 
972         /**
973          * {@inheritDoc}
974          */
975         public XAResource getXAResource() {
976             if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
977             return this.original.getXAResource();
978         }
979 
980         /**
981          * {@inheritDoc}
982          */
983         public CachePolicy getDefaultCachePolicy() {
984             if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
985             return this.original.getDefaultCachePolicy();
986         }
987 
988         /**
989          * {@inheritDoc}
990          * 
991          * @see org.modeshape.graph.connector.RepositoryConnection#execute(org.modeshape.graph.ExecutionContext,
992          *      org.modeshape.graph.request.Request)
993          */
994         public void execute( ExecutionContext context,
995                              Request request ) throws RepositorySourceException {
996             if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
997             this.original.execute(context, request);
998         }
999 
1000         /**
1001          * {@inheritDoc}
1002          */
1003         public boolean ping( long time,
1004                              TimeUnit unit ) throws InterruptedException {
1005             if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
1006             return this.original.ping(time, unit);
1007         }
1008 
1009         /**
1010          * {@inheritDoc}
1011          */
1012         public void close() {
1013             if (!closed) {
1014                 this.lastUsed = System.currentTimeMillis();
1015                 this.closed = true;
1016                 returnConnection(this);
1017             }
1018         }
1019     }
1020 
1021 }