1 /*
2 * ModeShape (http://www.modeshape.org)
3 * See the COPYRIGHT.txt file distributed with this work for information
4 * regarding copyright ownership. Some portions may be licensed
5 * to Red Hat, Inc. under one or more contributor license agreements.
6 * See the AUTHORS.txt file in the distribution for a full listing of
7 * individual contributors.
8 *
9 * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
10 * is licensed to you under the terms of the GNU Lesser General Public License as
11 * published by the Free Software Foundation; either version 2.1 of
12 * the License, or (at your option) any later version.
13 *
14 * ModeShape is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Lesser General Public License for more details.
18 *
19 * You should have received a copy of the GNU Lesser General Public
20 * License along with this software; if not, write to the Free
21 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23 */
24 package org.modeshape.graph.connector;
25
26 import net.jcip.annotations.GuardedBy;
27 import net.jcip.annotations.ThreadSafe;
28 import org.modeshape.common.util.CheckArg;
29 import org.modeshape.common.util.Logger;
30 import org.modeshape.graph.ExecutionContext;
31 import org.modeshape.graph.GraphI18n;
32 import org.modeshape.graph.cache.CachePolicy;
33 import org.modeshape.graph.request.Request;
34
35 import javax.transaction.xa.XAResource;
36 import java.util.Collection;
37 import java.util.HashSet;
38 import java.util.LinkedList;
39 import java.util.Set;
40 import java.util.concurrent.BlockingQueue;
41 import java.util.concurrent.LinkedBlockingQueue;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicInteger;
45 import java.util.concurrent.atomic.AtomicLong;
46 import java.util.concurrent.locks.Condition;
47 import java.util.concurrent.locks.ReentrantLock;
48
49 /**
50 * A reusable implementation of a managed pool of connections that is optimized for safe concurrent operations.
51 */
52 @ThreadSafe
53 public class RepositoryConnectionPool {
54
55 /**
56 * The core pool size for default-constructed pools is {@value} .
57 */
58 public static final int DEFAULT_CORE_POOL_SIZE = 1;
59
60 /**
61 * The maximum pool size for default-constructed pools is {@value} .
62 */
63 public static final int DEFAULT_MAXIMUM_POOL_SIZE = 10;
64
65 /**
66 * The keep-alive time for connections in default-constructed pools is {@value} seconds.
67 */
68 public static final long DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS = 30;
69
70 /**
71 * Permission for checking shutdown
72 */
73 private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
74
75 /**
76 * The source that this pool uses to create new connections.
77 */
78 private final RepositorySource source;
79
80 /**
81 * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and workers set.
82 */
83 private final ReentrantLock mainLock = new ReentrantLock();
84
85 /**
86 * Wait condition to support awaitTermination
87 */
88 private final Condition termination = mainLock.newCondition();
89
90 /**
91 * Set containing all connections that are available for use.
92 */
93 @GuardedBy( "mainLock" )
94 private final BlockingQueue<ConnectionWrapper> availableConnections = new LinkedBlockingQueue<ConnectionWrapper>();
95
96 /**
97 * The connections that are currently in use.
98 */
99 @GuardedBy( "mainLock" )
100 private final Set<ConnectionWrapper> inUseConnections = new HashSet<ConnectionWrapper>();
101
102 /**
103 * Timeout in nanoseconds for idle connections waiting to be used. Threads use this timeout only when there are more than
104 * corePoolSize present. Otherwise they wait forever to be used.
105 */
106 private volatile long keepAliveTime;
107
108 /**
109 * The target pool size, updated only while holding mainLock, but volatile to allow concurrent readability even during
110 * updates.
111 */
112 @GuardedBy( "mainLock" )
113 private volatile int corePoolSize;
114
115 /**
116 * Maximum pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
117 */
118 @GuardedBy( "mainLock" )
119 private volatile int maximumPoolSize;
120
121 /**
122 * Current pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
123 */
124 @GuardedBy( "mainLock" )
125 private volatile int poolSize;
126
127 /**
128 * Lifecycle state, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
129 */
130 @GuardedBy( "mainLock" )
131 private volatile int runState;
132
133 // Special values for runState
134 /** Normal, not-shutdown mode */
135 static final int RUNNING = 0;
136 /** Controlled shutdown mode */
137 static final int SHUTDOWN = 1;
138 /** Immediate shutdown mode */
139 static final int STOP = 2;
140 /** Final state */
141 static final int TERMINATED = 3;
142
143 /**
144 * Flag specifying whether a connection should be validated before returning it from the {@link #getConnection()} method.
145 */
146 private final AtomicBoolean validateConnectionBeforeUse = new AtomicBoolean(false);
147
148 /**
149 * The time in nanoseconds that ping should wait before timing out and failing.
150 */
151 private final AtomicLong pingTimeout = new AtomicLong(0);
152
153 /**
154 * The number of times an attempt to obtain a connection should fail with invalid connections before throwing an exception.
155 */
156 private final AtomicInteger maxFailedAttemptsBeforeError = new AtomicInteger(10);
157
158 private final AtomicLong totalConnectionsCreated = new AtomicLong(0);
159
160 private final AtomicLong totalConnectionsUsed = new AtomicLong(0);
161
162 private static final Logger LOGGER = Logger.getLogger(RepositoryConnectionPool.class);
163
164 /**
165 * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}. This constructor
166 * uses the {@link #DEFAULT_CORE_POOL_SIZE default core pool size}, {@link #DEFAULT_MAXIMUM_POOL_SIZE default maximum pool
167 * size}, and {@link #DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS default keep-alive time (in seconds)}.
168 *
169 * @param source the source for connections
170 * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid
171 */
172 public RepositoryConnectionPool( RepositorySource source ) {
173 this(source, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS, TimeUnit.SECONDS);
174 }
175
176 /**
177 * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}.
178 *
179 * @param source the source for connections
180 * @param corePoolSize the number of connections to keep in the pool, even if they are idle.
181 * @param maximumPoolSize the maximum number of connections to allow in the pool.
182 * @param keepAliveTime when the number of connection is greater than the core, this is the maximum time that excess idle
183 * connections will be kept before terminating.
184 * @param unit the time unit for the keepAliveTime argument.
185 * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid
186 */
187 public RepositoryConnectionPool( RepositorySource source,
188 int corePoolSize,
189 int maximumPoolSize,
190 long keepAliveTime,
191 TimeUnit unit ) {
192 CheckArg.isNonNegative(corePoolSize, "corePoolSize");
193 CheckArg.isPositive(maximumPoolSize, "maximumPoolSize");
194 CheckArg.isNonNegative(keepAliveTime, "keepAliveTime");
195 CheckArg.isNotNull(source, "source");
196 if (maximumPoolSize < corePoolSize) {
197 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
198 }
199 this.source = source;
200 this.corePoolSize = corePoolSize;
201 this.maximumPoolSize = maximumPoolSize;
202 this.keepAliveTime = unit.toNanos(keepAliveTime);
203 this.setPingTimeout(100, TimeUnit.MILLISECONDS);
204 }
205
206 /**
207 * Get the {@link RepositorySource} that's used by this pool.
208 *
209 * @return the repository source; never null
210 */
211 public final RepositorySource getRepositorySource() {
212 return source;
213 }
214
215 /**
216 * Get the name of this pool, which delegates to the connection factory.
217 *
218 * @return the name of the source
219 */
220 protected String getSourceName() {
221 return source.getName();
222 }
223
224 // -------------------------------------------------
225 // Property settings ...
226 // -------------------------------------------------
227
228 /**
229 * @return validateConnectionBeforeUse
230 */
231 public boolean getValidateConnectionBeforeUse() {
232 return this.validateConnectionBeforeUse.get();
233 }
234
235 /**
236 * @param validateConnectionBeforeUse Sets validateConnectionBeforeUse to the specified value.
237 */
238 public void setValidateConnectionBeforeUse( boolean validateConnectionBeforeUse ) {
239 this.validateConnectionBeforeUse.set(validateConnectionBeforeUse);
240 }
241
242 /**
243 * @return pingTimeout
244 */
245 public long getPingTimeoutInNanos() {
246 return this.pingTimeout.get();
247 }
248
249 /**
250 * @param pingTimeout the time to wait for a ping to complete
251 * @param unit the time unit of the time argument
252 */
253 public void setPingTimeout( long pingTimeout,
254 TimeUnit unit ) {
255 CheckArg.isNonNegative(pingTimeout, "time");
256 this.pingTimeout.set(unit.toNanos(pingTimeout));
257 }
258
259 /**
260 * @return maxFailedAttemptsBeforeError
261 */
262 public int getMaxFailedAttemptsBeforeError() {
263 return this.maxFailedAttemptsBeforeError.get();
264 }
265
266 /**
267 * @param maxFailedAttemptsBeforeError Sets maxFailedAttemptsBeforeError to the specified value.
268 */
269 public void setMaxFailedAttemptsBeforeError( int maxFailedAttemptsBeforeError ) {
270 this.maxFailedAttemptsBeforeError.set(maxFailedAttemptsBeforeError);
271 }
272
273 /**
274 * Sets the time limit for which connections may remain idle before being closed. If there are more than the core number of
275 * connections currently in the pool, after waiting this amount of time without being used, excess threads will be terminated.
276 * This overrides any value set in the constructor.
277 *
278 * @param time the time to wait. A time value of zero will cause excess connections to terminate immediately after being
279 * returned.
280 * @param unit the time unit of the time argument
281 * @throws IllegalArgumentException if time less than zero
282 * @see #getKeepAliveTime
283 */
284 public void setKeepAliveTime( long time,
285 TimeUnit unit ) {
286 CheckArg.isNonNegative(time, "time");
287 this.keepAliveTime = unit.toNanos(time);
288 }
289
290 /**
291 * Returns the connection keep-alive time, which is the amount of time which connections in excess of the core pool size may
292 * remain idle before being closed.
293 *
294 * @param unit the desired time unit of the result
295 * @return the time limit
296 * @see #setKeepAliveTime
297 */
298 public long getKeepAliveTime( TimeUnit unit ) {
299 assert unit != null;
300 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
301 }
302
303 /**
304 * @return maximumPoolSize
305 */
306 public int getMaximumPoolSize() {
307 return this.maximumPoolSize;
308 }
309
310 /**
311 * Sets the maximum allowed number of connections. This overrides any value set in the constructor. If the new value is
312 * smaller than the current value, excess existing but unused connections will be closed.
313 *
314 * @param maximumPoolSize the new maximum
315 * @throws IllegalArgumentException if maximumPoolSize less than zero or the {@link #getCorePoolSize() core pool size}
316 * @see #getMaximumPoolSize
317 */
318 public void setMaximumPoolSize( int maximumPoolSize ) {
319 CheckArg.isPositive(maximumPoolSize, "maximum pool size");
320 if (maximumPoolSize < corePoolSize) {
321 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
322 }
323 final ReentrantLock mainLock = this.mainLock;
324 try {
325 mainLock.lock();
326 int extra = this.maximumPoolSize - maximumPoolSize;
327 this.maximumPoolSize = maximumPoolSize;
328 if (extra > 0 && poolSize > maximumPoolSize) {
329 // Drain the extra connections from those available ...
330 drainUnusedConnections(extra);
331 }
332 } finally {
333 mainLock.unlock();
334 }
335 }
336
337 /**
338 * Returns the core number of connections.
339 *
340 * @return the core number of connections
341 * @see #setCorePoolSize(int)
342 */
343 public int getCorePoolSize() {
344 return this.corePoolSize;
345 }
346
347 /**
348 * Sets the core number of connections. This overrides any value set in the constructor. If the new value is smaller than the
349 * current value, excess existing and unused connections will be closed. If larger, new connections will, if needed, be
350 * created.
351 *
352 * @param corePoolSize the new core size
353 * @throws RepositorySourceException if there was an error obtaining the new connection
354 * @throws InterruptedException if the thread was interrupted during the operation
355 * @throws IllegalArgumentException if <tt>corePoolSize</tt> less than zero
356 * @see #getCorePoolSize()
357 */
358 public void setCorePoolSize( int corePoolSize ) throws RepositorySourceException, InterruptedException {
359 CheckArg.isNonNegative(corePoolSize, "core pool size");
360 if (maximumPoolSize < corePoolSize) {
361 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
362 }
363 final ReentrantLock mainLock = this.mainLock;
364 try {
365 mainLock.lock();
366 int extra = this.corePoolSize - corePoolSize;
367 this.corePoolSize = corePoolSize;
368 if (extra < 0) {
369 // Add connections ...
370 addConnectionsIfUnderCorePoolSize();
371 } else if (extra > 0 && poolSize > corePoolSize) {
372 // Drain the extra connections from those available ...
373 drainUnusedConnections(extra);
374 }
375 } finally {
376 mainLock.unlock();
377 }
378 }
379
380 // -------------------------------------------------
381 // Statistics ...
382 // -------------------------------------------------
383
384 /**
385 * Returns the current number of connections in the pool, including those that are checked out (in use) and those that are not
386 * being used.
387 *
388 * @return the number of connections
389 */
390 public int getPoolSize() {
391 return poolSize;
392 }
393
394 /**
395 * Returns the approximate number of connections that are currently checked out from the pool.
396 *
397 * @return the number of checked-out connections
398 */
399 public int getInUseCount() {
400 final ReentrantLock mainLock = this.mainLock;
401 try {
402 mainLock.lock();
403 return this.inUseConnections.size();
404 } finally {
405 mainLock.unlock();
406 }
407 }
408
409 /**
410 * Get the total number of connections that have been created by this pool.
411 *
412 * @return the total number of connections created by this pool
413 */
414 public long getTotalConnectionsCreated() {
415 return this.totalConnectionsCreated.get();
416 }
417
418 /**
419 * Get the total number of times connections have been {@link #getConnection()} used.
420 *
421 * @return the total number
422 */
423 public long getTotalConnectionsUsed() {
424 return this.totalConnectionsUsed.get();
425 }
426
427 // -------------------------------------------------
428 // State management methods ...
429 // -------------------------------------------------
430
431 /**
432 * Starts a core connection, causing it to idly wait for use. This overrides the default policy of starting core connections
433 * only when they are {@link #getConnection() needed}. This method will return <tt>false</tt> if all core connections have
434 * already been started.
435 *
436 * @return true if a connection was started
437 * @throws RepositorySourceException if there was an error obtaining the new connection
438 * @throws InterruptedException if the thread was interrupted during the operation
439 */
440 public boolean prestartCoreConnection() throws RepositorySourceException, InterruptedException {
441 final ReentrantLock mainLock = this.mainLock;
442 try {
443 mainLock.lock();
444 return addConnectionIfUnderCorePoolSize();
445 } finally {
446 mainLock.unlock();
447 }
448 }
449
450 /**
451 * Starts all core connections, causing them to idly wait for use. This overrides the default policy of starting core
452 * connections only when they are {@link #getConnection() needed}.
453 *
454 * @return the number of connections started.
455 * @throws RepositorySourceException if there was an error obtaining the new connection
456 * @throws InterruptedException if the thread was interrupted during the operation
457 */
458 public int prestartAllCoreConnections() throws RepositorySourceException, InterruptedException {
459 final ReentrantLock mainLock = this.mainLock;
460 try {
461 mainLock.lock();
462 return addConnectionsIfUnderCorePoolSize();
463 } finally {
464 mainLock.unlock();
465 }
466 }
467
468 /**
469 * Initiates an orderly shutdown of the pool in which connections that are currently in use are allowed to be used and closed
470 * as normal, but no new connections will be created. Invocation has no additional effect if already shut down.
471 * <p>
472 * Once the pool has been shutdown, it may not be used to {@link #getConnection() get connections}.
473 * </p>
474 *
475 * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller
476 * is not permitted to modify because it does not hold {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
477 * or the security manager's <tt>checkAccess</tt> method denies access.
478 * @see #shutdownNow()
479 */
480 public void shutdown() {
481 // Fail if caller doesn't have modifyThread permission. We
482 // explicitly check permissions directly because we can't trust
483 // implementations of SecurityManager to correctly override
484 // the "check access" methods such that our documented
485 // security policy is implemented.
486 SecurityManager security = System.getSecurityManager();
487 if (security != null) java.security.AccessController.checkPermission(shutdownPerm);
488
489 LOGGER.debug("Shutting down repository connection pool for {0}", getSourceName());
490 boolean fullyTerminated = false;
491 final ReentrantLock mainLock = this.mainLock;
492 try {
493 mainLock.lock();
494 int state = this.runState;
495 if (state == RUNNING) {
496 // don't override shutdownNow
497 this.runState = SHUTDOWN;
498 }
499
500 // Kill the maintenance thread ...
501
502 // Remove and close all available connections ...
503 if (!this.availableConnections.isEmpty()) {
504 // Drain the extra connections from those available ...
505 drainUnusedConnections(this.availableConnections.size());
506 }
507
508 // If there are no connections being used, trigger full termination now ...
509 if (this.inUseConnections.isEmpty()) {
510 fullyTerminated = true;
511 LOGGER.trace("Signalling termination of repository connection pool for {0}", getSourceName());
512 runState = TERMINATED;
513 termination.signalAll();
514 LOGGER.debug("Terminated repository connection pool for {0}", getSourceName());
515 }
516 // Otherwise the last connection that is closed will transition the runState to TERMINATED ...
517 } finally {
518 mainLock.unlock();
519 }
520 if (fullyTerminated) terminated();
521 }
522
523 /**
524 * Attempts to close all connections in the pool, including those connections currently in use, and prevent the use of other
525 * connections.
526 *
527 * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller
528 * is not permitted to modify because it does not hold {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
529 * or the security manager's <tt>checkAccess</tt> method denies access.
530 * @see #shutdown()
531 */
532 public void shutdownNow() {
533 // Almost the same code as shutdown()
534 SecurityManager security = System.getSecurityManager();
535 if (security != null) java.security.AccessController.checkPermission(shutdownPerm);
536
537 LOGGER.debug("Shutting down (immediately) repository connection pool for {0}", getSourceName());
538 boolean fullyTerminated = false;
539 final ReentrantLock mainLock = this.mainLock;
540 try {
541 mainLock.lock();
542 int state = this.runState;
543 if (state != TERMINATED) {
544 // don't override shutdownNow
545 this.runState = STOP;
546 }
547
548 // Kill the maintenance thread ...
549
550 // Remove and close all available connections ...
551 if (!this.availableConnections.isEmpty()) {
552 // Drain the extra connections from those available ...
553 drainUnusedConnections(this.availableConnections.size());
554 }
555
556 // If there are connections being used, close them now ...
557 if (!this.inUseConnections.isEmpty()) {
558 for (ConnectionWrapper connectionInUse : this.inUseConnections) {
559 LOGGER.trace("Closing repository connection to {0}", getSourceName());
560 connectionInUse.getOriginal().close();
561 }
562 this.poolSize -= this.inUseConnections.size();
563 // The last connection that is closed will transition the runState to TERMINATED ...
564 } else {
565 // There are no connections in use, so trigger full termination now ...
566 fullyTerminated = true;
567 LOGGER.trace("Signalling termination of repository connection pool for {0}", getSourceName());
568 runState = TERMINATED;
569 termination.signalAll();
570 LOGGER.debug("Terminated repository connection pool for {0}", getSourceName());
571 }
572
573 } finally {
574 mainLock.unlock();
575 }
576 if (fullyTerminated) terminated();
577 }
578
579 /**
580 * Return whether this connection pool is running and is able to {@link #getConnection() provide connections}. Note that this
581 * method is effectively <code>!isShutdown()</code>.
582 *
583 * @return true if this pool is running, or false otherwise
584 * @see #isShutdown()
585 * @see #isTerminated()
586 * @see #isTerminating()
587 */
588 public boolean isRunning() {
589 return runState == RUNNING;
590 }
591
592 /**
593 * Return whether this connection pool is in the process of shutting down or has already been shut down. A result of
594 * <code>true</code> signals that the pool may no longer be used. Note that this method is effectively
595 * <code>!isRunning()</code>.
596 *
597 * @return true if this pool has been shut down, or false otherwise
598 * @see #isShutdown()
599 * @see #isTerminated()
600 * @see #isTerminating()
601 */
602 public boolean isShutdown() {
603 return runState != RUNNING;
604 }
605
606 /**
607 * Returns true if this pool is in the process of terminating after {@link #shutdown()} or {@link #shutdownNow()} has been
608 * called but has not completely terminated. This method may be useful for debugging. A return of <tt>true</tt> reported a
609 * sufficient period after shutdown may indicate that submitted tasks have ignored or suppressed interruption, causing this
610 * executor not to properly terminate.
611 *
612 * @return true if terminating but not yet terminated, or false otherwise
613 * @see #isTerminated()
614 */
615 public boolean isTerminating() {
616 return runState == STOP;
617 }
618
619 /**
620 * Return true if this pool has completed its termination and no longer has any open connections.
621 *
622 * @return true if terminated, or false otherwise
623 * @see #isTerminating()
624 */
625 public boolean isTerminated() {
626 return runState == TERMINATED;
627 }
628
629 /**
630 * Method that can be called after {@link #shutdown()} or {@link #shutdownNow()} to wait until all connections in use at the
631 * time those methods were called have been closed normally. This method accepts a maximum time duration, after which it will
632 * return even if all connections have not been closed.
633 *
634 * @param timeout the maximum time to wait for all connections to be closed and returned to the pool
635 * @param unit the time unit for <code>timeout</code>
636 * @return true if the pool was terminated in the supplied time (or was already terminated), or false if the timeout occurred
637 * before all the connections were closed
638 * @throws InterruptedException if the thread was interrupted
639 */
640 public boolean awaitTermination( long timeout,
641 TimeUnit unit ) throws InterruptedException {
642 LOGGER.trace("Awaiting termination");
643 long nanos = unit.toNanos(timeout);
644 final ReentrantLock mainLock = this.mainLock;
645 try {
646 mainLock.lock();
647 for (;;) {
648 // LOGGER.trace("---> Run state = {0}; condition = {1}, {2} open", runState, termination, poolSize);
649 if (runState == TERMINATED) return true;
650 if (nanos <= 0) return false;
651 nanos = termination.awaitNanos(nanos);
652 // LOGGER.trace("---> Done waiting: run state = {0}; condition = {1}, {2} open",runState,termination,poolSize)
653 // ;
654 }
655 } finally {
656 mainLock.unlock();
657 LOGGER.trace("Finished awaiting termination");
658 }
659 }
660
661 /**
662 * Method invoked when the pool has terminated. Default implementation does nothing. Note: To properly nest multiple
663 * overridings, subclasses should generally invoke <tt>super.terminated</tt> within this method.
664 */
665 protected void terminated() {
666 }
667
668 /**
669 * Invokes <tt>shutdown</tt> when this pool is no longer referenced.
670 */
671 @Override
672 protected void finalize() {
673 shutdown();
674 }
675
676 // -------------------------------------------------
677 // Connection management methods ...
678 // -------------------------------------------------
679
680 /**
681 * Get a connection from the pool. This method either returns an unused connection if one is available, creates a connection
682 * if there is still room in the pool, or blocks until a connection becomes available if the pool already contains the maximum
683 * number of connections and all connections are currently being used.
684 *
685 * @return a connection
686 * @throws RepositorySourceException if there is a problem obtaining a connection
687 * @throws IllegalStateException if the factory is not in a state to create or return connections
688 */
689 public RepositoryConnection getConnection() throws RepositorySourceException {
690 int attemptsAllowed = this.maxFailedAttemptsBeforeError.get();
691 ConnectionWrapper connection = null;
692 // Do this until we get a good connection ...
693 int attemptsRemaining = attemptsAllowed;
694 while (connection == null && attemptsRemaining > 0) {
695 --attemptsRemaining;
696 ReentrantLock mainLock = this.mainLock;
697 try {
698 mainLock.lock();
699 // If we're shutting down the pool, then just close the connection ...
700 if (this.runState != RUNNING) {
701 throw new IllegalStateException(GraphI18n.repositoryConnectionPoolIsNotRunning.text());
702 }
703 // If there are fewer total connections than the core size ...
704 if (this.poolSize < this.corePoolSize) {
705 // Immediately create a wrapped connection and return it ...
706 connection = newWrappedConnection();
707 }
708 // Peek to see if there is a connection available ...
709 else if (this.availableConnections.peek() != null) {
710 // There is, so take it and return it ...
711 try {
712 connection = this.availableConnections.take();
713 } catch (InterruptedException e) {
714 LOGGER.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName());
715 Thread.interrupted();
716 throw new RepositorySourceException(getSourceName(), e);
717 }
718 }
719 // There is no connection available. If there are fewer total connections than the maximum size ...
720 else if (this.poolSize < this.maximumPoolSize) {
721 // Immediately create a wrapped connection and return it ...
722 connection = newWrappedConnection();
723 }
724 if (connection != null) {
725 this.inUseConnections.add(connection);
726 }
727 } finally {
728 mainLock.unlock();
729 }
730 if (connection == null) {
731 // There are not enough connections, so wait in line for the next available connection ...
732 LOGGER.trace("Waiting for a repository connection from pool {0}", getSourceName());
733 try {
734 connection = this.availableConnections.take();
735 } catch (InterruptedException e) {
736 LOGGER.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName());
737 Thread.interrupted();
738 throw new RepositorySourceException(getSourceName(), e);
739 }
740 mainLock = this.mainLock;
741 mainLock.lock();
742 try {
743 if (connection != null) {
744 this.inUseConnections.add(connection);
745 }
746 } finally {
747 mainLock.unlock();
748 }
749 LOGGER.trace("Recieved a repository connection from pool {0}", getSourceName());
750 }
751 if (connection != null && this.validateConnectionBeforeUse.get()) {
752 try {
753 connection = validateConnection(connection);
754 } catch (InterruptedException e) {
755 LOGGER.trace("Cancelled validating a repository connection obtained from pool {0}", getSourceName());
756 returnConnection(connection);
757 Thread.interrupted();
758 throw new RepositorySourceException(getSourceName(), e);
759 }
760 }
761 }
762 if (connection == null) {
763 // We were unable to obtain a usable connection, so fail ...
764 throw new RepositorySourceException(GraphI18n.unableToObtainValidRepositoryAfterAttempts.text(attemptsAllowed));
765 }
766 this.totalConnectionsUsed.incrementAndGet();
767 return connection;
768 }
769
770 /**
771 * This method is automatically called by the {@link ConnectionWrapper} when it is {@link ConnectionWrapper#close() closed}.
772 *
773 * @param wrapper the wrapper to the connection that is being returned to the pool
774 */
775 protected void returnConnection( ConnectionWrapper wrapper ) {
776 assert wrapper != null;
777 ConnectionWrapper wrapperToClose = null;
778 final ReentrantLock mainLock = this.mainLock;
779 try {
780 mainLock.lock();
781 // Remove the connection from the in-use set ...
782 boolean removed = this.inUseConnections.remove(wrapper);
783 assert removed;
784
785 // If we're shutting down the pool, then just close the connection ...
786 if (this.runState != RUNNING) {
787 wrapperToClose = wrapper;
788 }
789 // If there are more connections than the maximum size...
790 else if (this.poolSize > this.maximumPoolSize) {
791 // Immediately close this connection ...
792 wrapperToClose = wrapper;
793 }
794 // Attempt to make the connection available (this should generally work, unless there is an upper limit
795 // to the number of available connections) ...
796 else if (!this.availableConnections.offer(new ConnectionWrapper(wrapper.getOriginal()))) {
797 // The pool of available connection is full, so release it ...
798 wrapperToClose = wrapper;
799 }
800 } finally {
801 mainLock.unlock();
802 }
803 // Close the connection if we're supposed to (do it outside of the main lock)...
804 if (wrapperToClose != null) {
805 closeConnection(wrapperToClose);
806 }
807 }
808
809 /**
810 * Validate the supplied connection, returning the connection if valid or null if the connection is not valid.
811 *
812 * @param connection the connection to be validated; may not be null
813 * @return the validated connection, or null if the connection did not validate and was removed from the pool
814 * @throws InterruptedException if the thread is interrupted while validating the connection
815 */
816 protected ConnectionWrapper validateConnection( ConnectionWrapper connection ) throws InterruptedException {
817 assert connection != null;
818 ConnectionWrapper invalidConnection = null;
819 try {
820 if (!connection.ping(this.pingTimeout.get(), TimeUnit.NANOSECONDS)) {
821 invalidConnection = connection;
822 }
823 } finally {
824 if (invalidConnection != null) {
825 try {
826 mainLock.lock();
827 connection = null;
828 this.poolSize--;
829 this.inUseConnections.remove(connection);
830 // returnConnection(invalidConnection);
831 } finally {
832 mainLock.unlock();
833 }
834 }
835 }
836 return connection;
837 }
838
839 /**
840 * Obtain a new connection wrapped in a {@link ConnectionWrapper}. This method does not check whether creating the new
841 * connection would violate the {@link #maximumPoolSize maximum pool size} nor does it add the new connection to the
842 * {@link #availableConnections available connections} (as the caller may want it immediately), but it does increment the
843 * {@link #poolSize pool size}.
844 *
845 * @return the connection wrapper with a new connection
846 * @throws RepositorySourceException if there was an error obtaining the new connection
847 */
848 @GuardedBy( "mainLock" )
849 protected ConnectionWrapper newWrappedConnection() throws RepositorySourceException {
850 RepositoryConnection connection = this.source.getConnection();
851 ++this.poolSize;
852 this.totalConnectionsCreated.incrementAndGet();
853 return new ConnectionWrapper(connection);
854 }
855
856 /**
857 * Close a connection that is in the pool but no longer in the {@link #availableConnections available connections}. This
858 * method does decrement the {@link #poolSize pool size}.
859 *
860 * @param wrapper the wrapper for the connection to be closed
861 */
862 protected void closeConnection( ConnectionWrapper wrapper ) {
863 assert wrapper != null;
864 RepositoryConnection original = wrapper.getOriginal();
865 assert original != null;
866 try {
867 LOGGER.debug("Closing repository connection to {0} ({1} open connections remain)", getSourceName(), poolSize);
868 original.close();
869 } finally {
870 final ReentrantLock mainLock = this.mainLock;
871 try {
872 mainLock.lock();
873 // No matter what reduce the pool size count
874 --this.poolSize;
875 // And if shutting down and this was the last connection being used...
876 if ((runState == SHUTDOWN || runState == STOP) && this.poolSize <= 0) {
877 // then signal anybody that has called "awaitTermination(...)"
878 LOGGER.trace("Signalling termination of repository connection pool for {0}", getSourceName());
879 this.runState = TERMINATED;
880 this.termination.signalAll();
881 LOGGER.trace("Terminated repository connection pool for {0}", getSourceName());
882
883 // fall through to call terminate() outside of lock.
884 }
885 } finally {
886 mainLock.unlock();
887 }
888 }
889 }
890
891 @GuardedBy( "mainLock" )
892 protected int drainUnusedConnections( int count ) {
893 if (count <= 0) return 0;
894 LOGGER.trace("Draining up to {0} unused repository connections to {1}", count, getSourceName());
895 // Drain the extra connections from those available ...
896 Collection<ConnectionWrapper> extraConnections = new LinkedList<ConnectionWrapper>();
897 this.availableConnections.drainTo(extraConnections, count);
898 for (ConnectionWrapper connection : extraConnections) {
899 LOGGER.trace("Closing repository connection to {0}", getSourceName());
900 connection.getOriginal().close();
901 }
902 int numClosed = extraConnections.size();
903 this.poolSize -= numClosed;
904 LOGGER.trace("Drained {0} unused connections ({1} open connections remain)", numClosed, poolSize);
905 return numClosed;
906 }
907
908 @GuardedBy( "mainLock" )
909 protected boolean addConnectionIfUnderCorePoolSize() throws RepositorySourceException {
910 // Add connection ...
911 if (this.poolSize < this.corePoolSize) {
912 this.availableConnections.offer(newWrappedConnection());
913 LOGGER.trace("Added connection to {0} in undersized pool", getSourceName());
914 return true;
915 }
916 return false;
917 }
918
919 @GuardedBy( "mainLock" )
920 protected int addConnectionsIfUnderCorePoolSize() throws RepositorySourceException {
921 // Add connections ...
922 int n = 0;
923 while (this.poolSize < this.corePoolSize) {
924 this.availableConnections.offer(newWrappedConnection());
925 ++n;
926 }
927 LOGGER.trace("Added {0} connection(s) to {1} in undersized pool", n, getSourceName());
928 return n;
929 }
930
931 protected class ConnectionWrapper implements RepositoryConnection {
932
933 private final RepositoryConnection original;
934 private final long timeCreated;
935 private long lastUsed;
936 private boolean closed = false;
937
938 protected ConnectionWrapper( RepositoryConnection connection ) {
939 assert connection != null;
940 this.original = connection;
941 this.timeCreated = System.currentTimeMillis();
942 }
943
944 /**
945 * @return original
946 */
947 protected RepositoryConnection getOriginal() {
948 return this.original;
949 }
950
951 /**
952 * @return lastUsed
953 */
954 public long getTimeLastUsed() {
955 return this.lastUsed;
956 }
957
958 /**
959 * @return timeCreated
960 */
961 public long getTimeCreated() {
962 return this.timeCreated;
963 }
964
965 /**
966 * {@inheritDoc}
967 */
968 public String getSourceName() {
969 return this.original.getSourceName();
970 }
971
972 /**
973 * {@inheritDoc}
974 */
975 public XAResource getXAResource() {
976 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
977 return this.original.getXAResource();
978 }
979
980 /**
981 * {@inheritDoc}
982 */
983 public CachePolicy getDefaultCachePolicy() {
984 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
985 return this.original.getDefaultCachePolicy();
986 }
987
988 /**
989 * {@inheritDoc}
990 *
991 * @see org.modeshape.graph.connector.RepositoryConnection#execute(org.modeshape.graph.ExecutionContext,
992 * org.modeshape.graph.request.Request)
993 */
994 public void execute( ExecutionContext context,
995 Request request ) throws RepositorySourceException {
996 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
997 this.original.execute(context, request);
998 }
999
1000 /**
1001 * {@inheritDoc}
1002 */
1003 public boolean ping( long time,
1004 TimeUnit unit ) throws InterruptedException {
1005 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
1006 return this.original.ping(time, unit);
1007 }
1008
1009 /**
1010 * {@inheritDoc}
1011 */
1012 public void close() {
1013 if (!closed) {
1014 this.lastUsed = System.currentTimeMillis();
1015 this.closed = true;
1016 returnConnection(this);
1017 }
1018 }
1019 }
1020
1021 }