001    /*
002     * JBoss, Home of Professional Open Source.
003     * Copyright 2008, Red Hat Middleware LLC, and individual contributors
004     * as indicated by the @author tags. See the copyright.txt file in the
005     * distribution for a full listing of individual contributors. 
006     *
007     * This is free software; you can redistribute it and/or modify it
008     * under the terms of the GNU Lesser General Public License as
009     * published by the Free Software Foundation; either version 2.1 of
010     * the License, or (at your option) any later version.
011     *
012     * This software is distributed in the hope that it will be useful,
013     * but WITHOUT ANY WARRANTY; without even the implied warranty of
014     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015     * Lesser General Public License for more details.
016     *
017     * You should have received a copy of the GNU Lesser General Public
018     * License along with this software; if not, write to the Free
019     * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020     * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021     */
022    package org.jboss.dna.graph.connectors;
023    
024    import java.util.Collection;
025    import java.util.HashSet;
026    import java.util.LinkedList;
027    import java.util.Set;
028    import java.util.concurrent.BlockingQueue;
029    import java.util.concurrent.LinkedBlockingQueue;
030    import java.util.concurrent.TimeUnit;
031    import java.util.concurrent.atomic.AtomicBoolean;
032    import java.util.concurrent.atomic.AtomicInteger;
033    import java.util.concurrent.atomic.AtomicLong;
034    import java.util.concurrent.locks.Condition;
035    import java.util.concurrent.locks.ReentrantLock;
036    import javax.transaction.xa.XAResource;
037    import net.jcip.annotations.GuardedBy;
038    import net.jcip.annotations.ThreadSafe;
039    import org.jboss.dna.common.util.CheckArg;
040    import org.jboss.dna.common.util.Logger;
041    import org.jboss.dna.graph.ExecutionContext;
042    import org.jboss.dna.graph.GraphI18n;
043    import org.jboss.dna.graph.cache.CachePolicy;
044    import org.jboss.dna.graph.commands.GraphCommand;
045    
046    /**
047     * A reusable implementation of a managed pool of connections that is optimized for safe concurrent operations.
048     * 
049     * @author Randall Hauch
050     */
051    @ThreadSafe
052    public class RepositoryConnectionPool {
053    
054        /**
055         * The core pool size for default-constructed pools is {@value} .
056         */
057        public static final int DEFAULT_CORE_POOL_SIZE = 1;
058    
059        /**
060         * The maximum pool size for default-constructed pools is {@value} .
061         */
062        public static final int DEFAULT_MAXIMUM_POOL_SIZE = 10;
063    
064        /**
065         * The keep-alive time for connections in default-constructed pools is {@value} seconds.
066         */
067        public static final long DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS = 30;
068    
069        /**
070         * Permission for checking shutdown
071         */
072        private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
073    
074        /**
075         * The source that this pool uses to create new connections.
076         */
077        private final RepositorySource source;
078    
079        /**
080         * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and workers set.
081         */
082        private final ReentrantLock mainLock = new ReentrantLock();
083    
084        /**
085         * Wait condition to support awaitTermination
086         */
087        private final Condition termination = mainLock.newCondition();
088    
089        /**
090         * Set containing all connections that are available for use.
091         */
092        @GuardedBy( "mainLock" )
093        private final BlockingQueue<ConnectionWrapper> availableConnections = new LinkedBlockingQueue<ConnectionWrapper>();
094    
095        /**
096         * The connections that are currently in use.
097         */
098        @GuardedBy( "mainLock" )
099        private final Set<ConnectionWrapper> inUseConnections = new HashSet<ConnectionWrapper>();
100    
101        /**
102         * Timeout in nanoseconds for idle connections waiting to be used. Threads use this timeout only when there are more than
103         * corePoolSize present. Otherwise they wait forever to be used.
104         */
105        private volatile long keepAliveTime;
106    
107        /**
108         * The target pool size, updated only while holding mainLock, but volatile to allow concurrent readability even during
109         * updates.
110         */
111        @GuardedBy( "mainLock" )
112        private volatile int corePoolSize;
113    
114        /**
115         * Maximum pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
116         */
117        @GuardedBy( "mainLock" )
118        private volatile int maximumPoolSize;
119    
120        /**
121         * Current pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
122         */
123        @GuardedBy( "mainLock" )
124        private volatile int poolSize;
125    
126        /**
127         * Lifecycle state, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
128         */
129        @GuardedBy( "mainLock" )
130        private volatile int runState;
131    
132        // Special values for runState
133        /** Normal, not-shutdown mode */
134        static final int RUNNING = 0;
135        /** Controlled shutdown mode */
136        static final int SHUTDOWN = 1;
137        /** Immediate shutdown mode */
138        static final int STOP = 2;
139        /** Final state */
140        static final int TERMINATED = 3;
141    
142        /**
143         * Flag specifying whether a connection should be validated before returning it from the {@link #getConnection()} method.
144         */
145        private final AtomicBoolean validateConnectionBeforeUse = new AtomicBoolean(false);
146    
147        /**
148         * The time in nanoseconds that ping should wait before timing out and failing.
149         */
150        private final AtomicLong pingTimeout = new AtomicLong(0);
151    
152        /**
153         * The number of times an attempt to obtain a connection should fail with invalid connections before throwing an exception.
154         */
155        private final AtomicInteger maxFailedAttemptsBeforeError = new AtomicInteger(10);
156    
157        private final AtomicLong totalConnectionsCreated = new AtomicLong(0);
158    
159        private final AtomicLong totalConnectionsUsed = new AtomicLong(0);
160    
161        private final Logger logger = Logger.getLogger(this.getClass());
162    
163        /**
164         * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}. This constructor
165         * uses the {@link #DEFAULT_CORE_POOL_SIZE default core pool size}, {@link #DEFAULT_MAXIMUM_POOL_SIZE default maximum pool
166         * size}, and {@link #DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS default keep-alive time (in seconds)}.
167         * 
168         * @param source the source for connections
169         * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid
170         */
171        public RepositoryConnectionPool( RepositorySource source ) {
172            this(source, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS, TimeUnit.SECONDS);
173        }
174    
175        /**
176         * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}.
177         * 
178         * @param source the source for connections
179         * @param corePoolSize the number of connections to keep in the pool, even if they are idle.
180         * @param maximumPoolSize the maximum number of connections to allow in the pool.
181         * @param keepAliveTime when the number of connection is greater than the core, this is the maximum time that excess idle
182         *        connections will be kept before terminating.
183         * @param unit the time unit for the keepAliveTime argument.
184         * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid
185         */
186        public RepositoryConnectionPool( RepositorySource source,
187                                         int corePoolSize,
188                                         int maximumPoolSize,
189                                         long keepAliveTime,
190                                         TimeUnit unit ) {
191            CheckArg.isNonNegative(corePoolSize, "corePoolSize");
192            CheckArg.isPositive(maximumPoolSize, "maximumPoolSize");
193            CheckArg.isNonNegative(keepAliveTime, "keepAliveTime");
194            CheckArg.isNotNull(source, "source");
195            if (maximumPoolSize < corePoolSize) {
196                throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
197            }
198            this.source = source;
199            this.corePoolSize = corePoolSize;
200            this.maximumPoolSize = maximumPoolSize;
201            this.keepAliveTime = unit.toNanos(keepAliveTime);
202            this.setPingTimeout(100, TimeUnit.MILLISECONDS);
203        }
204    
205        /**
206         * Get the {@link RepositorySource} that's used by this pool.
207         * 
208         * @return the repository source; never null
209         */
210        public final RepositorySource getRepositorySource() {
211            return source;
212        }
213    
214        /**
215         * Get the name of this pool, which delegates to the connection factory.
216         * 
217         * @return the name of the source
218         */
219        protected String getSourceName() {
220            return source.getName();
221        }
222    
223        // -------------------------------------------------
224        // Property settings ...
225        // -------------------------------------------------
226    
227        /**
228         * @return validateConnectionBeforeUse
229         */
230        public boolean getValidateConnectionBeforeUse() {
231            return this.validateConnectionBeforeUse.get();
232        }
233    
234        /**
235         * @param validateConnectionBeforeUse Sets validateConnectionBeforeUse to the specified value.
236         */
237        public void setValidateConnectionBeforeUse( boolean validateConnectionBeforeUse ) {
238            this.validateConnectionBeforeUse.set(validateConnectionBeforeUse);
239        }
240    
241        /**
242         * @return pingTimeout
243         */
244        public long getPingTimeoutInNanos() {
245            return this.pingTimeout.get();
246        }
247    
248        /**
249         * @param pingTimeout the time to wait for a ping to complete
250         * @param unit the time unit of the time argument
251         */
252        public void setPingTimeout( long pingTimeout,
253                                    TimeUnit unit ) {
254            CheckArg.isNonNegative(pingTimeout, "time");
255            this.pingTimeout.set(unit.toNanos(pingTimeout));
256        }
257    
258        /**
259         * @return maxFailedAttemptsBeforeError
260         */
261        public int getMaxFailedAttemptsBeforeError() {
262            return this.maxFailedAttemptsBeforeError.get();
263        }
264    
265        /**
266         * @param maxFailedAttemptsBeforeError Sets maxFailedAttemptsBeforeError to the specified value.
267         */
268        public void setMaxFailedAttemptsBeforeError( int maxFailedAttemptsBeforeError ) {
269            this.maxFailedAttemptsBeforeError.set(maxFailedAttemptsBeforeError);
270        }
271    
272        /**
273         * Sets the time limit for which connections may remain idle before being closed. If there are more than the core number of
274         * connections currently in the pool, after waiting this amount of time without being used, excess threads will be terminated.
275         * This overrides any value set in the constructor.
276         * 
277         * @param time the time to wait. A time value of zero will cause excess connections to terminate immediately after being
278         *        returned.
279         * @param unit the time unit of the time argument
280         * @throws IllegalArgumentException if time less than zero
281         * @see #getKeepAliveTime
282         */
283        public void setKeepAliveTime( long time,
284                                      TimeUnit unit ) {
285            CheckArg.isNonNegative(time, "time");
286            this.keepAliveTime = unit.toNanos(time);
287        }
288    
289        /**
290         * Returns the connection keep-alive time, which is the amount of time which connections in excess of the core pool size may
291         * remain idle before being closed.
292         * 
293         * @param unit the desired time unit of the result
294         * @return the time limit
295         * @see #setKeepAliveTime
296         */
297        public long getKeepAliveTime( TimeUnit unit ) {
298            assert unit != null;
299            return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
300        }
301    
302        /**
303         * @return maximumPoolSize
304         */
305        public int getMaximumPoolSize() {
306            return this.maximumPoolSize;
307        }
308    
309        /**
310         * Sets the maximum allowed number of connections. This overrides any value set in the constructor. If the new value is
311         * smaller than the current value, excess existing but unused connections will be closed.
312         * 
313         * @param maximumPoolSize the new maximum
314         * @throws IllegalArgumentException if maximumPoolSize less than zero or the {@link #getCorePoolSize() core pool size}
315         * @see #getMaximumPoolSize
316         */
317        public void setMaximumPoolSize( int maximumPoolSize ) {
318            CheckArg.isPositive(maximumPoolSize, "maximum pool size");
319            if (maximumPoolSize < corePoolSize) {
320                throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
321            }
322            final ReentrantLock mainLock = this.mainLock;
323            try {
324                mainLock.lock();
325                int extra = this.maximumPoolSize - maximumPoolSize;
326                this.maximumPoolSize = maximumPoolSize;
327                if (extra > 0 && poolSize > maximumPoolSize) {
328                    // Drain the extra connections from those available ...
329                    drainUnusedConnections(extra);
330                }
331            } finally {
332                mainLock.unlock();
333            }
334        }
335    
336        /**
337         * Returns the core number of connections.
338         * 
339         * @return the core number of connections
340         * @see #setCorePoolSize(int)
341         */
342        public int getCorePoolSize() {
343            return this.corePoolSize;
344        }
345    
346        /**
347         * Sets the core number of connections. This overrides any value set in the constructor. If the new value is smaller than the
348         * current value, excess existing and unused connections will be closed. If larger, new connections will, if needed, be
349         * created.
350         * 
351         * @param corePoolSize the new core size
352         * @throws RepositorySourceException if there was an error obtaining the new connection
353         * @throws InterruptedException if the thread was interrupted during the operation
354         * @throws IllegalArgumentException if <tt>corePoolSize</tt> less than zero
355         * @see #getCorePoolSize()
356         */
357        public void setCorePoolSize( int corePoolSize ) throws RepositorySourceException, InterruptedException {
358            CheckArg.isNonNegative(corePoolSize, "core pool size");
359            if (maximumPoolSize < corePoolSize) {
360                throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
361            }
362            final ReentrantLock mainLock = this.mainLock;
363            try {
364                mainLock.lock();
365                int extra = this.corePoolSize - corePoolSize;
366                this.corePoolSize = corePoolSize;
367                if (extra < 0) {
368                    // Add connections ...
369                    addConnectionsIfUnderCorePoolSize();
370                } else if (extra > 0 && poolSize > corePoolSize) {
371                    // Drain the extra connections from those available ...
372                    drainUnusedConnections(extra);
373                }
374            } finally {
375                mainLock.unlock();
376            }
377        }
378    
379        // -------------------------------------------------
380        // Statistics ...
381        // -------------------------------------------------
382    
383        /**
384         * Returns the current number of connections in the pool, including those that are checked out (in use) and those that are not
385         * being used.
386         * 
387         * @return the number of connections
388         */
389        public int getPoolSize() {
390            return poolSize;
391        }
392    
393        /**
394         * Returns the approximate number of connections that are currently checked out from the pool.
395         * 
396         * @return the number of checked-out connections
397         */
398        public int getInUseCount() {
399            final ReentrantLock mainLock = this.mainLock;
400            try {
401                mainLock.lock();
402                return this.inUseConnections.size();
403            } finally {
404                mainLock.unlock();
405            }
406        }
407    
408        /**
409         * Get the total number of connections that have been created by this pool.
410         * 
411         * @return the total number of connections created by this pool
412         */
413        public long getTotalConnectionsCreated() {
414            return this.totalConnectionsCreated.get();
415        }
416    
417        /**
418         * Get the total number of times connections have been {@link #getConnection()} used.
419         * 
420         * @return the total number
421         */
422        public long getTotalConnectionsUsed() {
423            return this.totalConnectionsUsed.get();
424        }
425    
426        // -------------------------------------------------
427        // State management methods ...
428        // -------------------------------------------------
429    
430        /**
431         * Starts a core connection, causing it to idly wait for use. This overrides the default policy of starting core connections
432         * only when they are {@link #getConnection() needed}. This method will return <tt>false</tt> if all core connections have
433         * already been started.
434         * 
435         * @return true if a connection was started
436         * @throws RepositorySourceException if there was an error obtaining the new connection
437         * @throws InterruptedException if the thread was interrupted during the operation
438         */
439        public boolean prestartCoreConnection() throws RepositorySourceException, InterruptedException {
440            final ReentrantLock mainLock = this.mainLock;
441            try {
442                mainLock.lock();
443                return addConnectionIfUnderCorePoolSize();
444            } finally {
445                mainLock.unlock();
446            }
447        }
448    
449        /**
450         * Starts all core connections, causing them to idly wait for use. This overrides the default policy of starting core
451         * connections only when they are {@link #getConnection() needed}.
452         * 
453         * @return the number of connections started.
454         * @throws RepositorySourceException if there was an error obtaining the new connection
455         * @throws InterruptedException if the thread was interrupted during the operation
456         */
457        public int prestartAllCoreConnections() throws RepositorySourceException, InterruptedException {
458            final ReentrantLock mainLock = this.mainLock;
459            try {
460                mainLock.lock();
461                return addConnectionsIfUnderCorePoolSize();
462            } finally {
463                mainLock.unlock();
464            }
465        }
466    
467        /**
468         * Initiates an orderly shutdown of the pool in which connections that are currently in use are allowed to be used and closed
469         * as normal, but no new connections will be created. Invocation has no additional effect if already shut down.
470         * <p>
471         * Once the pool has been shutdown, it may not be used to {@link #getConnection() get connections}.
472         * </p>
473         * 
474         * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller
475         *         is not permitted to modify because it does not hold {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
476         *         or the security manager's <tt>checkAccess</tt> method denies access.
477         * @see #shutdownNow()
478         */
479        public void shutdown() {
480            // Fail if caller doesn't have modifyThread permission. We
481            // explicitly check permissions directly because we can't trust
482            // implementations of SecurityManager to correctly override
483            // the "check access" methods such that our documented
484            // security policy is implemented.
485            SecurityManager security = System.getSecurityManager();
486            if (security != null) java.security.AccessController.checkPermission(shutdownPerm);
487    
488            this.logger.debug("Shutting down repository connection pool for {0}", getSourceName());
489            boolean fullyTerminated = false;
490            final ReentrantLock mainLock = this.mainLock;
491            try {
492                mainLock.lock();
493                int state = this.runState;
494                if (state == RUNNING) {
495                    // don't override shutdownNow
496                    this.runState = SHUTDOWN;
497                }
498    
499                // Kill the maintenance thread ...
500    
501                // Remove and close all available connections ...
502                if (!this.availableConnections.isEmpty()) {
503                    // Drain the extra connections from those available ...
504                    drainUnusedConnections(this.availableConnections.size());
505                }
506    
507                // If there are no connections being used, trigger full termination now ...
508                if (this.inUseConnections.isEmpty()) {
509                    fullyTerminated = true;
510                    this.logger.trace("Signalling termination of repository connection pool for {0}", getSourceName());
511                    runState = TERMINATED;
512                    termination.signalAll();
513                    this.logger.debug("Terminated repository connection pool for {0}", getSourceName());
514                }
515                // Otherwise the last connection that is closed will transition the runState to TERMINATED ...
516            } finally {
517                mainLock.unlock();
518            }
519            if (fullyTerminated) terminated();
520        }
521    
522        /**
523         * Attempts to close all connections in the pool, including those connections currently in use, and prevent the use of other
524         * connections.
525         * 
526         * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller
527         *         is not permitted to modify because it does not hold {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
528         *         or the security manager's <tt>checkAccess</tt> method denies access.
529         * @see #shutdown()
530         */
531        public void shutdownNow() {
532            // Almost the same code as shutdown()
533            SecurityManager security = System.getSecurityManager();
534            if (security != null) java.security.AccessController.checkPermission(shutdownPerm);
535    
536            this.logger.debug("Shutting down (immediately) repository connection pool for {0}", getSourceName());
537            boolean fullyTerminated = false;
538            final ReentrantLock mainLock = this.mainLock;
539            try {
540                mainLock.lock();
541                int state = this.runState;
542                if (state != TERMINATED) {
543                    // don't override shutdownNow
544                    this.runState = STOP;
545                }
546    
547                // Kill the maintenance thread ...
548    
549                // Remove and close all available connections ...
550                if (!this.availableConnections.isEmpty()) {
551                    // Drain the extra connections from those available ...
552                    drainUnusedConnections(this.availableConnections.size());
553                }
554    
555                // If there are connections being used, close them now ...
556                if (!this.inUseConnections.isEmpty()) {
557                    for (ConnectionWrapper connectionInUse : this.inUseConnections) {
558                        this.logger.trace("Closing repository connection to {0}", getSourceName());
559                        connectionInUse.getOriginal().close();
560                    }
561                    this.poolSize -= this.inUseConnections.size();
562                    // The last connection that is closed will transition the runState to TERMINATED ...
563                } else {
564                    // There are no connections in use, so trigger full termination now ...
565                    fullyTerminated = true;
566                    this.logger.trace("Signalling termination of repository connection pool for {0}", getSourceName());
567                    runState = TERMINATED;
568                    termination.signalAll();
569                    this.logger.debug("Terminated repository connection pool for {0}", getSourceName());
570                }
571    
572            } finally {
573                mainLock.unlock();
574            }
575            if (fullyTerminated) terminated();
576        }
577    
578        /**
579         * Return whether this connection pool is running and is able to {@link #getConnection() provide connections}. Note that this
580         * method is effectively <code>!isShutdown()</code>.
581         * 
582         * @return true if this pool is running, or false otherwise
583         * @see #isShutdown()
584         * @see #isTerminated()
585         * @see #isTerminating()
586         */
587        public boolean isRunning() {
588            return runState == RUNNING;
589        }
590    
591        /**
592         * Return whether this connection pool is in the process of shutting down or has already been shut down. A result of
593         * <code>true</code> signals that the pool may no longer be used. Note that this method is effectively
594         * <code>!isRunning()</code>.
595         * 
596         * @return true if this pool has been shut down, or false otherwise
597         * @see #isShutdown()
598         * @see #isTerminated()
599         * @see #isTerminating()
600         */
601        public boolean isShutdown() {
602            return runState != RUNNING;
603        }
604    
605        /**
606         * Returns true if this pool is in the process of terminating after {@link #shutdown()} or {@link #shutdownNow()} has been
607         * called but has not completely terminated. This method may be useful for debugging. A return of <tt>true</tt> reported a
608         * sufficient period after shutdown may indicate that submitted tasks have ignored or suppressed interruption, causing this
609         * executor not to properly terminate.
610         * 
611         * @return true if terminating but not yet terminated, or false otherwise
612         * @see #isTerminated()
613         */
614        public boolean isTerminating() {
615            return runState == STOP;
616        }
617    
618        /**
619         * Return true if this pool has completed its termination and no longer has any open connections.
620         * 
621         * @return true if terminated, or false otherwise
622         * @see #isTerminating()
623         */
624        public boolean isTerminated() {
625            return runState == TERMINATED;
626        }
627    
628        /**
629         * Method that can be called after {@link #shutdown()} or {@link #shutdownNow()} to wait until all connections in use at the
630         * time those methods were called have been closed normally. This method accepts a maximum time duration, after which it will
631         * return even if all connections have not been closed.
632         * 
633         * @param timeout the maximum time to wait for all connections to be closed and returned to the pool
634         * @param unit the time unit for <code>timeout</code>
635         * @return true if the pool was terminated in the supplied time (or was already terminated), or false if the timeout occurred
636         *         before all the connections were closed
637         * @throws InterruptedException if the thread was interrupted
638         */
639        public boolean awaitTermination( long timeout,
640                                         TimeUnit unit ) throws InterruptedException {
641            this.logger.trace("Awaiting termination");
642            long nanos = unit.toNanos(timeout);
643            final ReentrantLock mainLock = this.mainLock;
644            try {
645                mainLock.lock();
646                for (;;) {
647                    // this.logger.trace("---> Run state = {0}; condition = {1}, {2} open", runState, termination, poolSize);
648                    if (runState == TERMINATED) return true;
649                    if (nanos <= 0) return false;
650                    nanos = termination.awaitNanos(nanos);
651                    //this.logger.trace("---> Done waiting: run state = {0}; condition = {1}, {2} open",runState,termination,poolSize)
652                    // ;
653                }
654            } finally {
655                mainLock.unlock();
656                this.logger.trace("Finished awaiting termination");
657            }
658        }
659    
660        /**
661         * Method invoked when the pool has terminated. Default implementation does nothing. Note: To properly nest multiple
662         * overridings, subclasses should generally invoke <tt>super.terminated</tt> within this method.
663         */
664        protected void terminated() {
665        }
666    
667        /**
668         * Invokes <tt>shutdown</tt> when this pool is no longer referenced.
669         */
670        @Override
671        protected void finalize() {
672            shutdown();
673        }
674    
675        // -------------------------------------------------
676        // Connection management methods ...
677        // -------------------------------------------------
678    
679        /**
680         * Get a connection from the pool. This method either returns an unused connection if one is available, creates a connection
681         * if there is still room in the pool, or blocks until a connection becomes available if the pool already contains the maximum
682         * number of connections and all connections are currently being used.
683         * 
684         * @return a connection
685         * @throws RepositorySourceException if there is a problem obtaining a connection
686         * @throws IllegalStateException if the factory is not in a state to create or return connections
687         */
688        public RepositoryConnection getConnection() throws RepositorySourceException {
689            int attemptsAllowed = this.maxFailedAttemptsBeforeError.get();
690            ConnectionWrapper connection = null;
691            // Do this until we get a good connection ...
692            int attemptsRemaining = attemptsAllowed;
693            while (connection == null && attemptsRemaining > 0) {
694                --attemptsRemaining;
695                ReentrantLock mainLock = this.mainLock;
696                try {
697                    mainLock.lock();
698                    // If we're shutting down the pool, then just close the connection ...
699                    if (this.runState != RUNNING) {
700                        throw new IllegalStateException(GraphI18n.repositoryConnectionPoolIsNotRunning.text());
701                    }
702                    // If there are fewer total connections than the core size ...
703                    if (this.poolSize < this.corePoolSize) {
704                        // Immediately create a wrapped connection and return it ...
705                        connection = newWrappedConnection();
706                    }
707                    // Peek to see if there is a connection available ...
708                    else if (this.availableConnections.peek() != null) {
709                        // There is, so take it and return it ...
710                        try {
711                            connection = this.availableConnections.take();
712                        } catch (InterruptedException e) {
713                            this.logger.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName());
714                            Thread.interrupted();
715                            throw new RepositorySourceException(getSourceName(), e);
716                        }
717                    }
718                    // There is no connection available. If there are fewer total connections than the maximum size ...
719                    else if (this.poolSize < this.maximumPoolSize) {
720                        // Immediately create a wrapped connection and return it ...
721                        connection = newWrappedConnection();
722                    }
723                    if (connection != null) {
724                        this.inUseConnections.add(connection);
725                    }
726                } finally {
727                    mainLock.unlock();
728                }
729                if (connection == null) {
730                    // There are not enough connections, so wait in line for the next available connection ...
731                    this.logger.trace("Waiting for a repository connection from pool {0}", getSourceName());
732                    try {
733                        connection = this.availableConnections.take();
734                    } catch (InterruptedException e) {
735                        this.logger.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName());
736                        Thread.interrupted();
737                        throw new RepositorySourceException(getSourceName(), e);
738                    }
739                    mainLock = this.mainLock;
740                    mainLock.lock();
741                    try {
742                        if (connection != null) {
743                            this.inUseConnections.add(connection);
744                        }
745                    } finally {
746                        mainLock.unlock();
747                    }
748                    this.logger.trace("Recieved a repository connection from pool {0}", getSourceName());
749                }
750                if (connection != null && this.validateConnectionBeforeUse.get()) {
751                    try {
752                        connection = validateConnection(connection);
753                    } catch (InterruptedException e) {
754                        this.logger.trace("Cancelled validating a repository connection obtained from pool {0}", getSourceName());
755                        returnConnection(connection);
756                        Thread.interrupted();
757                        throw new RepositorySourceException(getSourceName(), e);
758                    }
759                }
760            }
761            if (connection == null) {
762                // We were unable to obtain a usable connection, so fail ...
763                throw new RepositorySourceException(GraphI18n.unableToObtainValidRepositoryAfterAttempts.text(attemptsAllowed));
764            }
765            this.totalConnectionsUsed.incrementAndGet();
766            return connection;
767        }
768    
769        /**
770         * This method is automatically called by the {@link ConnectionWrapper} when it is {@link ConnectionWrapper#close() closed}.
771         * 
772         * @param wrapper the wrapper to the connection that is being returned to the pool
773         */
774        protected void returnConnection( ConnectionWrapper wrapper ) {
775            assert wrapper != null;
776            ConnectionWrapper wrapperToClose = null;
777            final ReentrantLock mainLock = this.mainLock;
778            try {
779                mainLock.lock();
780                // Remove the connection from the in-use set ...
781                boolean removed = this.inUseConnections.remove(wrapper);
782                assert removed;
783    
784                // If we're shutting down the pool, then just close the connection ...
785                if (this.runState != RUNNING) {
786                    wrapperToClose = wrapper;
787                }
788                // If there are more connections than the maximum size...
789                else if (this.poolSize > this.maximumPoolSize) {
790                    // Immediately close this connection ...
791                    wrapperToClose = wrapper;
792                }
793                // Attempt to make the connection available (this should generally work, unless there is an upper limit
794                // to the number of available connections) ...
795                else if (!this.availableConnections.offer(new ConnectionWrapper(wrapper.getOriginal()))) {
796                    // The pool of available connection is full, so release it ...
797                    wrapperToClose = wrapper;
798                }
799            } finally {
800                mainLock.unlock();
801            }
802            // Close the connection if we're supposed to (do it outside of the main lock)...
803            if (wrapperToClose != null) {
804                closeConnection(wrapperToClose);
805            }
806        }
807    
808        /**
809         * Validate the supplied connection, returning the connection if valid or null if the connection is not valid.
810         * 
811         * @param connection the connection to be validated; may not be null
812         * @return the validated connection, or null if the connection did not validate and was removed from the pool
813         * @throws InterruptedException if the thread is interrupted while validating the connection
814         */
815        protected ConnectionWrapper validateConnection( ConnectionWrapper connection ) throws InterruptedException {
816            assert connection != null;
817            ConnectionWrapper invalidConnection = null;
818            try {
819                if (!connection.ping(this.pingTimeout.get(), TimeUnit.NANOSECONDS)) {
820                    invalidConnection = connection;
821                }
822            } finally {
823                if (invalidConnection != null) {
824                    connection = null;
825                    returnConnection(invalidConnection);
826                }
827            }
828            return connection;
829        }
830    
831        /**
832         * Obtain a new connection wrapped in a {@link ConnectionWrapper}. This method does not check whether creating the new
833         * connection would violate the {@link #maximumPoolSize maximum pool size} nor does it add the new connection to the
834         * {@link #availableConnections available connections} (as the caller may want it immediately), but it does increment the
835         * {@link #poolSize pool size}.
836         * 
837         * @return the connection wrapper with a new connection
838         * @throws RepositorySourceException if there was an error obtaining the new connection
839         */
840        @GuardedBy( "mainLock" )
841        protected ConnectionWrapper newWrappedConnection() throws RepositorySourceException {
842            RepositoryConnection connection = this.source.getConnection();
843            ++this.poolSize;
844            this.totalConnectionsCreated.incrementAndGet();
845            return new ConnectionWrapper(connection);
846        }
847    
848        /**
849         * Close a connection that is in the pool but no longer in the {@link #availableConnections available connections}. This
850         * method does decrement the {@link #poolSize pool size}.
851         * 
852         * @param wrapper the wrapper for the connection to be closed
853         */
854        protected void closeConnection( ConnectionWrapper wrapper ) {
855            assert wrapper != null;
856            RepositoryConnection original = wrapper.getOriginal();
857            assert original != null;
858            try {
859                this.logger.debug("Closing repository connection to {0} ({1} open connections remain)", getSourceName(), poolSize);
860                original.close();
861            } finally {
862                final ReentrantLock mainLock = this.mainLock;
863                try {
864                    mainLock.lock();
865                    // No matter what reduce the pool size count
866                    --this.poolSize;
867                    // And if shutting down and this was the last connection being used...
868                    if ((runState == SHUTDOWN || runState == STOP) && this.poolSize <= 0) {
869                        // then signal anybody that has called "awaitTermination(...)"
870                        this.logger.trace("Signalling termination of repository connection pool for {0}", getSourceName());
871                        this.runState = TERMINATED;
872                        this.termination.signalAll();
873                        this.logger.trace("Terminated repository connection pool for {0}", getSourceName());
874    
875                        // fall through to call terminate() outside of lock.
876                    }
877                } finally {
878                    mainLock.unlock();
879                }
880            }
881        }
882    
883        @GuardedBy( "mainLock" )
884        protected int drainUnusedConnections( int count ) {
885            if (count <= 0) return 0;
886            this.logger.trace("Draining up to {0} unused repository connections to {1}", count, getSourceName());
887            // Drain the extra connections from those available ...
888            Collection<ConnectionWrapper> extraConnections = new LinkedList<ConnectionWrapper>();
889            this.availableConnections.drainTo(extraConnections, count);
890            for (ConnectionWrapper connection : extraConnections) {
891                this.logger.trace("Closing repository connection to {0}", getSourceName());
892                connection.getOriginal().close();
893            }
894            int numClosed = extraConnections.size();
895            this.poolSize -= numClosed;
896            this.logger.trace("Drained {0} unused connections ({1} open connections remain)", numClosed, poolSize);
897            return numClosed;
898        }
899    
900        @GuardedBy( "mainLock" )
901        protected boolean addConnectionIfUnderCorePoolSize() throws RepositorySourceException {
902            // Add connection ...
903            if (this.poolSize < this.corePoolSize) {
904                this.availableConnections.offer(newWrappedConnection());
905                this.logger.trace("Added connection to {0} in undersized pool", getSourceName());
906                return true;
907            }
908            return false;
909        }
910    
911        @GuardedBy( "mainLock" )
912        protected int addConnectionsIfUnderCorePoolSize() throws RepositorySourceException {
913            // Add connections ...
914            int n = 0;
915            while (this.poolSize < this.corePoolSize) {
916                this.availableConnections.offer(newWrappedConnection());
917                ++n;
918            }
919            this.logger.trace("Added {0} connection(s) to {1} in undersized pool", n, getSourceName());
920            return n;
921        }
922    
923        protected class ConnectionWrapper implements RepositoryConnection {
924    
925            private final RepositoryConnection original;
926            private final long timeCreated;
927            private long lastUsed;
928            private boolean closed = false;
929    
930            protected ConnectionWrapper( RepositoryConnection connection ) {
931                assert connection != null;
932                this.original = connection;
933                this.timeCreated = System.currentTimeMillis();
934            }
935    
936            /**
937             * @return original
938             */
939            protected RepositoryConnection getOriginal() {
940                return this.original;
941            }
942    
943            /**
944             * @return lastUsed
945             */
946            public long getTimeLastUsed() {
947                return this.lastUsed;
948            }
949    
950            /**
951             * @return timeCreated
952             */
953            public long getTimeCreated() {
954                return this.timeCreated;
955            }
956    
957            /**
958             * {@inheritDoc}
959             */
960            public String getSourceName() {
961                return this.original.getSourceName();
962            }
963    
964            /**
965             * {@inheritDoc}
966             */
967            public XAResource getXAResource() {
968                if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
969                return this.original.getXAResource();
970            }
971    
972            /**
973             * {@inheritDoc}
974             */
975            public CachePolicy getDefaultCachePolicy() {
976                if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
977                return this.original.getDefaultCachePolicy();
978            }
979    
980            /**
981             * {@inheritDoc}
982             */
983            public void execute( ExecutionContext context,
984                                 GraphCommand... commands ) throws RepositorySourceException {
985                if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
986                this.original.execute(context, commands);
987            }
988    
989            /**
990             * {@inheritDoc}
991             */
992            public boolean ping( long time,
993                                 TimeUnit unit ) throws InterruptedException {
994                if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
995                return this.original.ping(time, unit);
996            }
997    
998            /**
999             * {@inheritDoc}
1000             */
1001            public void close() {
1002                if (!closed) {
1003                    this.lastUsed = System.currentTimeMillis();
1004                    this.original.close();
1005                    this.closed = true;
1006                    returnConnection(this);
1007                }
1008            }
1009    
1010            /**
1011             * {@inheritDoc}
1012             */
1013            public void setListener( RepositorySourceListener listener ) {
1014                if (!closed) this.original.setListener(listener);
1015            }
1016    
1017        }
1018    
1019    }