1 /*
2 * ModeShape (http://www.modeshape.org)
3 * See the COPYRIGHT.txt file distributed with this work for information
4 * regarding copyright ownership. Some portions may be licensed
5 * to Red Hat, Inc. under one or more contributor license agreements.
6 * See the AUTHORS.txt file in the distribution for a full listing of
7 * individual contributors.
8 *
9 * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
10 * is licensed to you under the terms of the GNU Lesser General Public License as
11 * published by the Free Software Foundation; either version 2.1 of
12 * the License, or (at your option) any later version.
13 *
14 * ModeShape is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Lesser General Public License for more details.
18 *
19 * You should have received a copy of the GNU Lesser General Public
20 * License along with this software; if not, write to the Free
21 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23 */
24 package org.modeshape.graph.connector;
25
26 import java.util.Collection;
27 import java.util.HashSet;
28 import java.util.LinkedList;
29 import java.util.Set;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.atomic.AtomicLong;
36 import java.util.concurrent.locks.Condition;
37 import java.util.concurrent.locks.ReentrantLock;
38 import javax.transaction.xa.XAResource;
39 import net.jcip.annotations.GuardedBy;
40 import net.jcip.annotations.ThreadSafe;
41 import org.modeshape.common.annotation.Category;
42 import org.modeshape.common.annotation.Description;
43 import org.modeshape.common.annotation.Label;
44 import org.modeshape.common.util.CheckArg;
45 import org.modeshape.common.util.Logger;
46 import org.modeshape.graph.ExecutionContext;
47 import org.modeshape.graph.GraphI18n;
48 import org.modeshape.graph.cache.CachePolicy;
49 import org.modeshape.graph.request.Request;
50
51 /**
52 * A reusable implementation of a managed pool of connections that is optimized for safe concurrent operations.
53 */
54 @ThreadSafe
55 public class RepositoryConnectionPool {
56
57 /**
58 * The default time to wait for ping when called from a client.
59 */
60 public static final long DEFAULT_TIME_TO_WAIT = 10;
61
62 /**
63 * The default TimeUnit to wait for ping when called from a client.
64 */
65 public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
66
67 /**
68 * The core pool size for default-constructed pools is {@value} .
69 */
70 public static final int DEFAULT_CORE_POOL_SIZE = 1;
71
72 /**
73 * The maximum pool size for default-constructed pools is {@value} .
74 */
75 public static final int DEFAULT_MAXIMUM_POOL_SIZE = 10;
76
77 /**
78 * The keep-alive time for connections in default-constructed pools is * * * * {@value} seconds.
79 */
80 public static final long DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS = 30;
81
82 /**
83 * Permission for checking shutdown
84 */
85 private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
86
87 /**
88 * The source that this pool uses to create new connections.
89 */
90 private final RepositorySource source;
91
92 /**
93 * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and workers set.
94 */
95 private final ReentrantLock mainLock = new ReentrantLock();
96
97 /**
98 * Wait condition to support awaitTermination
99 */
100 private final Condition termination = mainLock.newCondition();
101
102 /**
103 * Set containing all connections that are available for use.
104 */
105 @GuardedBy( "mainLock" )
106 private final BlockingQueue<ConnectionWrapper> availableConnections = new LinkedBlockingQueue<ConnectionWrapper>();
107
108 /**
109 * The connections that are currently in use.
110 */
111 @GuardedBy( "mainLock" )
112 private final Set<ConnectionWrapper> inUseConnections = new HashSet<ConnectionWrapper>();
113
114 /**
115 * Timeout in nanoseconds for idle connections waiting to be used. Threads use this timeout only when there are more than
116 * corePoolSize present. Otherwise they wait forever to be used.
117 */
118 @Description( i18n = GraphI18n.class, value = "poolKeepAliveTimeDescription" )
119 @Label( i18n = GraphI18n.class, value = "poolKeepAliveTimeLabel" )
120 @Category( i18n = GraphI18n.class, value = "poolKeepAliveTimeCategory" )
121 private volatile long keepAliveTime;
122
123 /**
124 * The target pool size, updated only while holding mainLock, but volatile to allow concurrent readability even during
125 * updates.
126 */
127 @Description( i18n = GraphI18n.class, value = "poolCorePoolSizeDescription" )
128 @Label( i18n = GraphI18n.class, value = "poolCorePoolSizeLabel" )
129 @Category( i18n = GraphI18n.class, value = "poolCorePoolSizeCategory" )
130 @GuardedBy( "mainLock" )
131 private volatile int corePoolSize;
132
133 /**
134 * Maximum pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
135 */
136 @Description( i18n = GraphI18n.class, value = "poolMaxiumumPoolSizeDescription" )
137 @Label( i18n = GraphI18n.class, value = "poolMaxiumumPoolSizeLabel" )
138 @Category( i18n = GraphI18n.class, value = "poolMaxiumumPoolSizeCategory" )
139 @GuardedBy( "mainLock" )
140 private volatile int maximumPoolSize;
141
142 /**
143 * Current pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
144 */
145 @GuardedBy( "mainLock" )
146 private volatile int poolSize;
147
148 /**
149 * Lifecycle state, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
150 */
151 @GuardedBy( "mainLock" )
152 private volatile int runState;
153
154 // Special values for runState
155 /** Normal, not-shutdown mode */
156 static final int RUNNING = 0;
157 /** Controlled shutdown mode */
158 static final int SHUTDOWN = 1;
159 /** Immediate shutdown mode */
160 static final int STOP = 2;
161 /** Final state */
162 static final int TERMINATED = 3;
163
164 /**
165 * Flag specifying whether a connection should be validated before returning it from the {@link #getConnection()} method.
166 */
167 @Description( i18n = GraphI18n.class, value = "poolValidateConnectionBeforeUseDescription" )
168 @Label( i18n = GraphI18n.class, value = "poolValidateConnectionBeforeUseLabel" )
169 @Category( i18n = GraphI18n.class, value = "poolValidateConnectionBeforeUseCategory" )
170 private final AtomicBoolean validateConnectionBeforeUse = new AtomicBoolean(false);
171
172 /**
173 * The time in nanoseconds that ping should wait before timing out and failing.
174 */
175 @Description( i18n = GraphI18n.class, value = "poolPingTimeoutDescription" )
176 @Label( i18n = GraphI18n.class, value = "poolPingTimeoutLabel" )
177 @Category( i18n = GraphI18n.class, value = "poolPingTimeoutCategory" )
178 private final AtomicLong pingTimeout = new AtomicLong(0);
179
180 /**
181 * The number of times an attempt to obtain a connection should fail with invalid connections before throwing an exception.
182 */
183 @Description( i18n = GraphI18n.class, value = "poolMaximumFailedAttemptsBeforeErrorDescription" )
184 @Label( i18n = GraphI18n.class, value = "poolMaximumFailedAttemptsBeforeErrorLabel" )
185 @Category( i18n = GraphI18n.class, value = "poolMaximumFailedAttemptsBeforeErrorCategory" )
186 private final AtomicInteger maxFailedAttemptsBeforeError = new AtomicInteger(10);
187
188 private final AtomicLong totalConnectionsCreated = new AtomicLong(0);
189
190 private final AtomicLong totalConnectionsUsed = new AtomicLong(0);
191
192 private static final Logger LOGGER = Logger.getLogger(RepositoryConnectionPool.class);
193
194 /**
195 * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}. This constructor
196 * uses the {@link #DEFAULT_CORE_POOL_SIZE default core pool size}, {@link #DEFAULT_MAXIMUM_POOL_SIZE default maximum pool
197 * size}, and {@link #DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS default keep-alive time (in seconds)}.
198 *
199 * @param source the source for connections
200 * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid
201 */
202 public RepositoryConnectionPool( RepositorySource source ) {
203 this(source, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS, TimeUnit.SECONDS);
204 }
205
206 /**
207 * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}.
208 *
209 * @param source the source for connections
210 * @param corePoolSize the number of connections to keep in the pool, even if they are idle.
211 * @param maximumPoolSize the maximum number of connections to allow in the pool.
212 * @param keepAliveTime when the number of connection is greater than the core, this is the maximum time that excess idle
213 * connections will be kept before terminating.
214 * @param unit the time unit for the keepAliveTime argument.
215 * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid
216 */
217 public RepositoryConnectionPool( RepositorySource source,
218 int corePoolSize,
219 int maximumPoolSize,
220 long keepAliveTime,
221 TimeUnit unit ) {
222 CheckArg.isNonNegative(corePoolSize, "corePoolSize");
223 CheckArg.isPositive(maximumPoolSize, "maximumPoolSize");
224 CheckArg.isNonNegative(keepAliveTime, "keepAliveTime");
225 CheckArg.isNotNull(source, "source");
226 if (maximumPoolSize < corePoolSize) {
227 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
228 }
229 this.source = source;
230 this.corePoolSize = corePoolSize;
231 this.maximumPoolSize = maximumPoolSize;
232 this.keepAliveTime = unit.toNanos(keepAliveTime);
233 this.setPingTimeout(100, TimeUnit.MILLISECONDS);
234 }
235
236 /**
237 * Used to get and test a connection in the pool
238 *
239 * @param time
240 * @param unit
241 * @return boolean
242 * @throws InterruptedException
243 */
244 public boolean ping( long time,
245 TimeUnit unit ) throws InterruptedException {
246
247 if (isRunning()) {
248 RepositoryConnection connection = null;
249 Throwable error = null;
250 try {
251 connection = getConnection();
252 return connection.ping(time, unit);
253 } catch (RuntimeException t) {
254 error = t;
255 throw t;
256 } catch (InterruptedException t) {
257 error = t;
258 throw t;
259 } finally {
260 if (connection != null) {
261 try {
262 connection.close();
263 } catch (RuntimeException e) {
264 if (error == null) throw e;
265 }
266 }
267 }
268 }
269 return false;
270 }
271
272 /**
273 * Used to get and test a connection in the pool with default wait and time unit values
274 *
275 * @return boolean
276 * @throws InterruptedException
277 */
278 public boolean ping() throws InterruptedException {
279 return ping(DEFAULT_TIME_TO_WAIT, DEFAULT_TIME_UNIT);
280 }
281
282 /**
283 * Get the {@link RepositorySource} that's used by this pool.
284 *
285 * @return the repository source; never null
286 */
287 public final RepositorySource getRepositorySource() {
288 return source;
289 }
290
291 /**
292 * Get the name of this pool, which delegates to the connection factory.
293 *
294 * @return the name of the source
295 */
296 protected String getSourceName() {
297 return source.getName();
298 }
299
300 // -------------------------------------------------
301 // Property settings ...
302 // -------------------------------------------------
303
304 /**
305 * @return validateConnectionBeforeUse
306 */
307 public boolean getValidateConnectionBeforeUse() {
308 return this.validateConnectionBeforeUse.get();
309 }
310
311 /**
312 * @param validateConnectionBeforeUse Sets validateConnectionBeforeUse to the specified value.
313 */
314 public void setValidateConnectionBeforeUse( boolean validateConnectionBeforeUse ) {
315 this.validateConnectionBeforeUse.set(validateConnectionBeforeUse);
316 }
317
318 /**
319 * @return pingTimeout
320 */
321 public long getPingTimeoutInNanos() {
322 return this.pingTimeout.get();
323 }
324
325 /**
326 * @param pingTimeout the time to wait for a ping to complete
327 * @param unit the time unit of the time argument
328 */
329 public void setPingTimeout( long pingTimeout,
330 TimeUnit unit ) {
331 CheckArg.isNonNegative(pingTimeout, "time");
332 this.pingTimeout.set(unit.toNanos(pingTimeout));
333 }
334
335 /**
336 * The ping timeout, in seconds.
337 *
338 * @return pingTimeout
339 */
340 public long getPingTimeout() {
341 return TimeUnit.NANOSECONDS.toSeconds(this.pingTimeout.get());
342 }
343
344 /**
345 * Sets the ping timeout, in seconds.
346 *
347 * @param pingTimeoutInSeconds the time to wait for a ping to complete
348 */
349 public void setPingTimeout( long pingTimeoutInSeconds ) {
350 setPingTimeout(pingTimeoutInSeconds, TimeUnit.SECONDS);
351 }
352
353 /**
354 * @return maxFailedAttemptsBeforeError
355 */
356 public int getMaxFailedAttemptsBeforeError() {
357 return this.maxFailedAttemptsBeforeError.get();
358 }
359
360 /**
361 * @param maxFailedAttemptsBeforeError Sets maxFailedAttemptsBeforeError to the specified value.
362 */
363 public void setMaxFailedAttemptsBeforeError( int maxFailedAttemptsBeforeError ) {
364 this.maxFailedAttemptsBeforeError.set(maxFailedAttemptsBeforeError);
365 }
366
367 /**
368 * Sets the time limit for which connections may remain idle before being closed. If there are more than the core number of
369 * connections currently in the pool, after waiting this amount of time without being used, excess threads will be terminated.
370 * This overrides any value set in the constructor.
371 *
372 * @param time the time to wait. A time value of zero will cause excess connections to terminate immediately after being
373 * returned.
374 * @param unit the time unit of the time argument
375 * @throws IllegalArgumentException if time less than zero
376 * @see #getKeepAliveTime
377 */
378 public void setKeepAliveTime( long time,
379 TimeUnit unit ) {
380 CheckArg.isNonNegative(time, "time");
381 this.keepAliveTime = unit.toNanos(time);
382 }
383
384 /**
385 * Returns the connection keep-alive time, which is the amount of time which connections in excess of the core pool size may
386 * remain idle before being closed.
387 *
388 * @param unit the desired time unit of the result
389 * @return the time limit
390 * @see #setKeepAliveTime
391 */
392 public long getKeepAliveTime( TimeUnit unit ) {
393 assert unit != null;
394 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
395 }
396
397 /**
398 * Returns the connection keep-alive time, which is the amount of time which connections in excess of the core pool size may
399 * remain idle before being closed.
400 *
401 * @return the time limit in seconds
402 * @see #setKeepAliveTime
403 */
404 public long getKeepAliveTime() {
405 return getKeepAliveTime(TimeUnit.SECONDS);
406 }
407
408 /**
409 * Sets the time limit for which connections may remain idle before being closed. If there are more than the core number of
410 * connections currently in the pool, after waiting this amount of time without being used, excess threads will be terminated.
411 * This overrides any value set in the constructor.
412 *
413 * @param time the time to wait, in seconds. A time value of zero will cause excess connections to terminate immediately after
414 * being returned.
415 * @throws IllegalArgumentException if time less than zero
416 * @see #getKeepAliveTime
417 */
418 public void setKeepAliveTime( long time ) {
419 setKeepAliveTime(time, TimeUnit.SECONDS);
420 }
421
422 /**
423 * @return maximumPoolSize
424 */
425 public int getMaximumPoolSize() {
426 return this.maximumPoolSize;
427 }
428
429 /**
430 * Sets the maximum allowed number of connections. This overrides any value set in the constructor. If the new value is
431 * smaller than the current value, excess existing but unused connections will be closed.
432 *
433 * @param maximumPoolSize the new maximum
434 * @throws IllegalArgumentException if maximumPoolSize less than zero or the {@link #getCorePoolSize() core pool size}
435 * @see #getMaximumPoolSize
436 */
437 public void setMaximumPoolSize( int maximumPoolSize ) {
438 CheckArg.isPositive(maximumPoolSize, "maximum pool size");
439 if (maximumPoolSize < corePoolSize) {
440 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
441 }
442 final ReentrantLock mainLock = this.mainLock;
443 try {
444 mainLock.lock();
445 int extra = this.maximumPoolSize - maximumPoolSize;
446 this.maximumPoolSize = maximumPoolSize;
447 if (extra > 0 && poolSize > maximumPoolSize) {
448 // Drain the extra connections from those available ...
449 drainUnusedConnections(extra);
450 }
451 } finally {
452 mainLock.unlock();
453 }
454 }
455
456 /**
457 * Returns the core number of connections.
458 *
459 * @return the core number of connections
460 * @see #setCorePoolSize(int)
461 */
462 public int getCorePoolSize() {
463 return this.corePoolSize;
464 }
465
466 /**
467 * Sets the core number of connections. This overrides any value set in the constructor. If the new value is smaller than the
468 * current value, excess existing and unused connections will be closed. If larger, new connections will, if needed, be
469 * created.
470 *
471 * @param corePoolSize the new core size
472 * @throws RepositorySourceException if there was an error obtaining the new connection
473 * @throws InterruptedException if the thread was interrupted during the operation
474 * @throws IllegalArgumentException if <tt>corePoolSize</tt> less than zero
475 * @see #getCorePoolSize()
476 */
477 public void setCorePoolSize( int corePoolSize ) throws RepositorySourceException, InterruptedException {
478 CheckArg.isNonNegative(corePoolSize, "core pool size");
479 if (maximumPoolSize < corePoolSize) {
480 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
481 }
482 final ReentrantLock mainLock = this.mainLock;
483 try {
484 mainLock.lock();
485 int extra = this.corePoolSize - corePoolSize;
486 this.corePoolSize = corePoolSize;
487 if (extra < 0) {
488 // Add connections ...
489 addConnectionsIfUnderCorePoolSize();
490 } else if (extra > 0 && poolSize > corePoolSize) {
491 // Drain the extra connections from those available ...
492 drainUnusedConnections(extra);
493 }
494 } finally {
495 mainLock.unlock();
496 }
497 }
498
499 // -------------------------------------------------
500 // Statistics ...
501 // -------------------------------------------------
502
503 /**
504 * Returns the current number of connections in the pool, including those that are checked out (in use) and those that are not
505 * being used.
506 *
507 * @return the number of connections
508 */
509 public int getPoolSize() {
510 return poolSize;
511 }
512
513 /**
514 * Returns the approximate number of connections that are currently checked out from the pool.
515 *
516 * @return the number of checked-out connections
517 */
518 public int getInUseCount() {
519 final ReentrantLock mainLock = this.mainLock;
520 try {
521 mainLock.lock();
522 return this.inUseConnections.size();
523 } finally {
524 mainLock.unlock();
525 }
526 }
527
528 /**
529 * Get the total number of connections that have been created by this pool.
530 *
531 * @return the total number of connections created by this pool
532 */
533 public long getTotalConnectionsCreated() {
534 return this.totalConnectionsCreated.get();
535 }
536
537 /**
538 * Get the total number of times connections have been {@link #getConnection()} used.
539 *
540 * @return the total number
541 */
542 public long getTotalConnectionsUsed() {
543 return this.totalConnectionsUsed.get();
544 }
545
546 // -------------------------------------------------
547 // State management methods ...
548 // -------------------------------------------------
549
550 /**
551 * Starts a core connection, causing it to idly wait for use. This overrides the default policy of starting core connections
552 * only when they are {@link #getConnection() needed}. This method will return <tt>false</tt> if all core connections have
553 * already been started.
554 *
555 * @return true if a connection was started
556 * @throws RepositorySourceException if there was an error obtaining the new connection
557 * @throws InterruptedException if the thread was interrupted during the operation
558 */
559 public boolean prestartCoreConnection() throws RepositorySourceException, InterruptedException {
560 final ReentrantLock mainLock = this.mainLock;
561 try {
562 mainLock.lock();
563 return addConnectionIfUnderCorePoolSize();
564 } finally {
565 mainLock.unlock();
566 }
567 }
568
569 /**
570 * Starts all core connections, causing them to idly wait for use. This overrides the default policy of starting core
571 * connections only when they are {@link #getConnection() needed}.
572 *
573 * @return the number of connections started.
574 * @throws RepositorySourceException if there was an error obtaining the new connection
575 * @throws InterruptedException if the thread was interrupted during the operation
576 */
577 public int prestartAllCoreConnections() throws RepositorySourceException, InterruptedException {
578 final ReentrantLock mainLock = this.mainLock;
579 try {
580 mainLock.lock();
581 return addConnectionsIfUnderCorePoolSize();
582 } finally {
583 mainLock.unlock();
584 }
585 }
586
587 /**
588 * Initiates an orderly shutdown of the pool in which connections that are currently in use are allowed to be used and closed
589 * as normal, but no new connections will be created. Invocation has no additional effect if already shut down.
590 * <p>
591 * Once the pool has been shutdown, it may not be used to {@link #getConnection() get connections}.
592 * </p>
593 *
594 * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller
595 * is not permitted to modify because it does not hold {@link java.lang.RuntimePermission} <tt>("modifyThread")</tt>,
596 * or the security manager's <tt>checkAccess</tt> method denies access.
597 * @see #shutdownNow()
598 */
599 public void shutdown() {
600 // Fail if caller doesn't have modifyThread permission. We
601 // explicitly check permissions directly because we can't trust
602 // implementations of SecurityManager to correctly override
603 // the "check access" methods such that our documented
604 // security policy is implemented.
605 SecurityManager security = System.getSecurityManager();
606 if (security != null) java.security.AccessController.checkPermission(shutdownPerm);
607
608 LOGGER.debug("Shutting down repository connection pool for {0}", getSourceName());
609 boolean fullyTerminated = false;
610 final ReentrantLock mainLock = this.mainLock;
611 try {
612 mainLock.lock();
613 int state = this.runState;
614 if (state == RUNNING) {
615 // don't override shutdownNow
616 this.runState = SHUTDOWN;
617 }
618
619 // Kill the maintenance thread ...
620
621 // Remove and close all available connections ...
622 if (!this.availableConnections.isEmpty()) {
623 // Drain the extra connections from those available ...
624 drainUnusedConnections(this.availableConnections.size());
625 }
626
627 // If there are no connections being used, trigger full termination
628 // now ...
629 if (this.inUseConnections.isEmpty()) {
630 fullyTerminated = true;
631 LOGGER.trace("Signalling termination of repository connection pool for {0}", getSourceName());
632 runState = TERMINATED;
633 termination.signalAll();
634 LOGGER.debug("Terminated repository connection pool for {0}", getSourceName());
635 }
636 // Otherwise the last connection that is closed will transition the
637 // runState to TERMINATED ...
638 } finally {
639 mainLock.unlock();
640 }
641 if (fullyTerminated) terminated();
642 }
643
644 /**
645 * Attempts to close all connections in the pool, including those connections currently in use, and prevent the use of other
646 * connections.
647 *
648 * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller
649 * is not permitted to modify because it does not hold {@link java.lang.RuntimePermission} <tt>("modifyThread")</tt>,
650 * or the security manager's <tt>checkAccess</tt> method denies access.
651 * @see #shutdown()
652 */
653 public void shutdownNow() {
654 // Almost the same code as shutdown()
655 SecurityManager security = System.getSecurityManager();
656 if (security != null) java.security.AccessController.checkPermission(shutdownPerm);
657
658 LOGGER.debug("Shutting down (immediately) repository connection pool for {0}", getSourceName());
659 boolean fullyTerminated = false;
660 final ReentrantLock mainLock = this.mainLock;
661 try {
662 mainLock.lock();
663 int state = this.runState;
664 if (state != TERMINATED) {
665 // don't override shutdownNow
666 this.runState = STOP;
667 }
668
669 // Kill the maintenance thread ...
670
671 // Remove and close all available connections ...
672 if (!this.availableConnections.isEmpty()) {
673 // Drain the extra connections from those available ...
674 drainUnusedConnections(this.availableConnections.size());
675 }
676
677 // If there are connections being used, close them now ...
678 if (!this.inUseConnections.isEmpty()) {
679 for (ConnectionWrapper connectionInUse : this.inUseConnections) {
680 LOGGER.trace("Closing repository connection to {0}", getSourceName());
681 connectionInUse.getOriginal().close();
682 }
683 this.poolSize -= this.inUseConnections.size();
684 // The last connection that is closed will transition the
685 // runState to TERMINATED ...
686 } else {
687 // There are no connections in use, so trigger full termination
688 // now ...
689 fullyTerminated = true;
690 LOGGER.trace("Signalling termination of repository connection pool for {0}", getSourceName());
691 runState = TERMINATED;
692 termination.signalAll();
693 LOGGER.debug("Terminated repository connection pool for {0}", getSourceName());
694 }
695
696 } finally {
697 mainLock.unlock();
698 }
699 if (fullyTerminated) terminated();
700 }
701
702 /**
703 * Return whether this connection pool is running and is able to {@link #getConnection() provide connections}. Note that this
704 * method is effectively <code>!isShutdown()</code>.
705 *
706 * @return true if this pool is running, or false otherwise
707 * @see #isShutdown()
708 * @see #isTerminated()
709 * @see #isTerminating()
710 */
711 public boolean isRunning() {
712 return runState == RUNNING;
713 }
714
715 /**
716 * Return whether this connection pool is in the process of shutting down or has already been shut down. A result of
717 * <code>true</code> signals that the pool may no longer be used. Note that this method is effectively
718 * <code>!isRunning()</code>.
719 *
720 * @return true if this pool has been shut down, or false otherwise
721 * @see #isShutdown()
722 * @see #isTerminated()
723 * @see #isTerminating()
724 */
725 public boolean isShutdown() {
726 return runState != RUNNING;
727 }
728
729 /**
730 * Returns true if this pool is in the process of terminating after {@link #shutdown()} or {@link #shutdownNow()} has been
731 * called but has not completely terminated. This method may be useful for debugging. A return of <tt>true</tt> reported a
732 * sufficient period after shutdown may indicate that submitted tasks have ignored or suppressed interruption, causing this
733 * executor not to properly terminate.
734 *
735 * @return true if terminating but not yet terminated, or false otherwise
736 * @see #isTerminated()
737 */
738 public boolean isTerminating() {
739 return runState == STOP;
740 }
741
742 /**
743 * Return true if this pool has completed its termination and no longer has any open connections.
744 *
745 * @return true if terminated, or false otherwise
746 * @see #isTerminating()
747 */
748 public boolean isTerminated() {
749 return runState == TERMINATED;
750 }
751
752 /**
753 * Method that can be called after {@link #shutdown()} or {@link #shutdownNow()} to wait until all connections in use at the
754 * time those methods were called have been closed normally. This method accepts a maximum time duration, after which it will
755 * return even if all connections have not been closed.
756 *
757 * @param timeout the maximum time to wait for all connections to be closed and returned to the pool
758 * @param unit the time unit for <code>timeout</code>
759 * @return true if the pool was terminated in the supplied time (or was already terminated), or false if the timeout occurred
760 * before all the connections were closed
761 * @throws InterruptedException if the thread was interrupted
762 */
763 public boolean awaitTermination( long timeout,
764 TimeUnit unit ) throws InterruptedException {
765 LOGGER.trace("Awaiting termination");
766 long nanos = unit.toNanos(timeout);
767 final ReentrantLock mainLock = this.mainLock;
768 try {
769 mainLock.lock();
770 for (;;) {
771 // LOGGER.trace("---> Run state = {0}; condition = {1}, {2} open",
772 // runState, termination, poolSize);
773 if (runState == TERMINATED) return true;
774 if (nanos <= 0) return false;
775 nanos = termination.awaitNanos(nanos);
776 // LOGGER.trace("---> Done waiting: run state = {0}; condition = {1}, {2} open",runState,termination,poolSize)
777 // ;
778 }
779 } finally {
780 mainLock.unlock();
781 LOGGER.trace("Finished awaiting termination");
782 }
783 }
784
785 /**
786 * Method invoked when the pool has terminated. Default implementation does nothing. Note: To properly nest multiple
787 * overridings, subclasses should generally invoke <tt>super.terminated</tt> within this method.
788 */
789 protected void terminated() {
790 }
791
792 /**
793 * Invokes <tt>shutdown</tt> when this pool is no longer referenced.
794 */
795 @Override
796 protected void finalize() {
797 shutdown();
798 }
799
800 // -------------------------------------------------
801 // Connection management methods ...
802 // -------------------------------------------------
803
804 /**
805 * Get a connection from the pool. This method either returns an unused connection if one is available, creates a connection
806 * if there is still room in the pool, or blocks until a connection becomes available if the pool already contains the maximum
807 * number of connections and all connections are currently being used.
808 *
809 * @return a connection
810 * @throws RepositorySourceException if there is a problem obtaining a connection
811 * @throws IllegalStateException if the factory is not in a state to create or return connections
812 */
813 public RepositoryConnection getConnection() throws RepositorySourceException {
814 int attemptsAllowed = this.maxFailedAttemptsBeforeError.get();
815 ConnectionWrapper connection = null;
816 // Do this until we get a good connection ...
817 int attemptsRemaining = attemptsAllowed;
818 while (connection == null && attemptsRemaining > 0) {
819 --attemptsRemaining;
820 ReentrantLock mainLock = this.mainLock;
821 try {
822 mainLock.lock();
823 // If we're shutting down the pool, then just close the
824 // connection ...
825 if (this.runState != RUNNING) {
826 throw new IllegalStateException(GraphI18n.repositoryConnectionPoolIsNotRunning.text());
827 }
828 // If there are fewer total connections than the core size ...
829 if (this.poolSize < this.corePoolSize) {
830 // Immediately create a wrapped connection and return it ...
831 connection = newWrappedConnection();
832 }
833 // Peek to see if there is a connection available ...
834 else if (this.availableConnections.peek() != null) {
835 // There is, so take it and return it ...
836 try {
837 connection = this.availableConnections.take();
838 } catch (InterruptedException e) {
839 LOGGER.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName());
840 Thread.interrupted();
841 throw new RepositorySourceException(getSourceName(), e);
842 }
843 }
844 // There is no connection available. If there are fewer total
845 // connections than the maximum size ...
846 else if (this.poolSize < this.maximumPoolSize) {
847 // Immediately create a wrapped connection and return it ...
848 connection = newWrappedConnection();
849 }
850 if (connection != null) {
851 this.inUseConnections.add(connection);
852 }
853 } finally {
854 mainLock.unlock();
855 }
856 if (connection == null) {
857 // There are not enough connections, so wait in line for the
858 // next available connection ...
859 LOGGER.trace("Waiting for a repository connection from pool {0}", getSourceName());
860 try {
861 connection = this.availableConnections.take();
862 } catch (InterruptedException e) {
863 LOGGER.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName());
864 Thread.interrupted();
865 throw new RepositorySourceException(getSourceName(), e);
866 }
867 mainLock = this.mainLock;
868 mainLock.lock();
869 try {
870 if (connection != null) {
871 this.inUseConnections.add(connection);
872 }
873 } finally {
874 mainLock.unlock();
875 }
876 LOGGER.trace("Recieved a repository connection from pool {0}", getSourceName());
877 }
878 if (connection != null && this.validateConnectionBeforeUse.get()) {
879 try {
880 connection = validateConnection(connection);
881 } catch (InterruptedException e) {
882 LOGGER.trace("Cancelled validating a repository connection obtained from pool {0}", getSourceName());
883 returnConnection(connection);
884 Thread.interrupted();
885 throw new RepositorySourceException(getSourceName(), e);
886 }
887 }
888 }
889 if (connection == null) {
890 // We were unable to obtain a usable connection, so fail ...
891 throw new RepositorySourceException(GraphI18n.unableToObtainValidRepositoryAfterAttempts.text(attemptsAllowed));
892 }
893 this.totalConnectionsUsed.incrementAndGet();
894 return connection;
895 }
896
897 /**
898 * This method is automatically called by the {@link ConnectionWrapper} when it is {@link ConnectionWrapper#close() closed}.
899 *
900 * @param wrapper the wrapper to the connection that is being returned to the pool
901 */
902 protected void returnConnection( ConnectionWrapper wrapper ) {
903 assert wrapper != null;
904 ConnectionWrapper wrapperToClose = null;
905 final ReentrantLock mainLock = this.mainLock;
906 try {
907 mainLock.lock();
908 // Remove the connection from the in-use set ...
909 boolean removed = this.inUseConnections.remove(wrapper);
910 assert removed;
911
912 // If we're shutting down the pool, then just close the connection
913 // ...
914 if (this.runState != RUNNING) {
915 wrapperToClose = wrapper;
916 }
917 // If there are more connections than the maximum size...
918 else if (this.poolSize > this.maximumPoolSize) {
919 // Immediately close this connection ...
920 wrapperToClose = wrapper;
921 }
922 // Attempt to make the connection available (this should generally
923 // work, unless there is an upper limit
924 // to the number of available connections) ...
925 else if (!this.availableConnections.offer(new ConnectionWrapper(wrapper.getOriginal()))) {
926 // The pool of available connection is full, so release it ...
927 wrapperToClose = wrapper;
928 }
929 } finally {
930 mainLock.unlock();
931 }
932 // Close the connection if we're supposed to (do it outside of the main
933 // lock)...
934 if (wrapperToClose != null) {
935 closeConnection(wrapperToClose);
936 }
937 }
938
939 /**
940 * Validate the supplied connection, returning the connection if valid or null if the connection is not valid.
941 *
942 * @param connection the connection to be validated; may not be null
943 * @return the validated connection, or null if the connection did not validate and was removed from the pool
944 * @throws InterruptedException if the thread is interrupted while validating the connection
945 */
946 protected ConnectionWrapper validateConnection( ConnectionWrapper connection ) throws InterruptedException {
947 assert connection != null;
948 ConnectionWrapper invalidConnection = null;
949 try {
950 if (!connection.ping(this.pingTimeout.get(), TimeUnit.NANOSECONDS)) {
951 invalidConnection = connection;
952 }
953 } finally {
954 if (invalidConnection != null) {
955 try {
956 mainLock.lock();
957 connection = null;
958 this.poolSize--;
959 this.inUseConnections.remove(connection);
960 // returnConnection(invalidConnection);
961 } finally {
962 mainLock.unlock();
963 }
964 }
965 }
966 return connection;
967 }
968
969 /**
970 * Obtain a new connection wrapped in a {@link ConnectionWrapper}. This method does not check whether creating the new
971 * connection would violate the {@link #maximumPoolSize maximum pool size} nor does it add the new connection to the
972 * {@link #availableConnections available connections} (as the caller may want it immediately), but it does increment the
973 * {@link #poolSize pool size}.
974 *
975 * @return the connection wrapper with a new connection
976 * @throws RepositorySourceException if there was an error obtaining the new connection
977 */
978 @GuardedBy( "mainLock" )
979 protected ConnectionWrapper newWrappedConnection() throws RepositorySourceException {
980 RepositoryConnection connection = this.source.getConnection();
981 ++this.poolSize;
982 this.totalConnectionsCreated.incrementAndGet();
983 return new ConnectionWrapper(connection);
984 }
985
986 /**
987 * Close a connection that is in the pool but no longer in the {@link #availableConnections available connections}. This
988 * method does decrement the {@link #poolSize pool size}.
989 *
990 * @param wrapper the wrapper for the connection to be closed
991 */
992 protected void closeConnection( ConnectionWrapper wrapper ) {
993 assert wrapper != null;
994 RepositoryConnection original = wrapper.getOriginal();
995 assert original != null;
996 try {
997 LOGGER.debug("Closing repository connection to {0} ({1} open connections remain)", getSourceName(), poolSize);
998 original.close();
999 } finally {
1000 final ReentrantLock mainLock = this.mainLock;
1001 try {
1002 mainLock.lock();
1003 // No matter what reduce the pool size count
1004 --this.poolSize;
1005 // And if shutting down and this was the last connection being
1006 // used...
1007 if ((runState == SHUTDOWN || runState == STOP) && this.poolSize <= 0) {
1008 // then signal anybody that has called
1009 // "awaitTermination(...)"
1010 LOGGER.trace("Signalling termination of repository connection pool for {0}", getSourceName());
1011 this.runState = TERMINATED;
1012 this.termination.signalAll();
1013 LOGGER.trace("Terminated repository connection pool for {0}", getSourceName());
1014
1015 // fall through to call terminate() outside of lock.
1016 }
1017 } finally {
1018 mainLock.unlock();
1019 }
1020 }
1021 }
1022
1023 @GuardedBy( "mainLock" )
1024 protected int drainUnusedConnections( int count ) {
1025 if (count <= 0) return 0;
1026 LOGGER.trace("Draining up to {0} unused repository connections to {1}", count, getSourceName());
1027 // Drain the extra connections from those available ...
1028 Collection<ConnectionWrapper> extraConnections = new LinkedList<ConnectionWrapper>();
1029 this.availableConnections.drainTo(extraConnections, count);
1030 for (ConnectionWrapper connection : extraConnections) {
1031 LOGGER.trace("Closing repository connection to {0}", getSourceName());
1032 connection.getOriginal().close();
1033 }
1034 int numClosed = extraConnections.size();
1035 this.poolSize -= numClosed;
1036 LOGGER.trace("Drained {0} unused connections ({1} open connections remain)", numClosed, poolSize);
1037 return numClosed;
1038 }
1039
1040 @GuardedBy( "mainLock" )
1041 protected boolean addConnectionIfUnderCorePoolSize() throws RepositorySourceException {
1042 // Add connection ...
1043 if (this.poolSize < this.corePoolSize) {
1044 this.availableConnections.offer(newWrappedConnection());
1045 LOGGER.trace("Added connection to {0} in undersized pool", getSourceName());
1046 return true;
1047 }
1048 return false;
1049 }
1050
1051 @GuardedBy( "mainLock" )
1052 protected int addConnectionsIfUnderCorePoolSize() throws RepositorySourceException {
1053 // Add connections ...
1054 int n = 0;
1055 while (this.poolSize < this.corePoolSize) {
1056 this.availableConnections.offer(newWrappedConnection());
1057 ++n;
1058 }
1059 LOGGER.trace("Added {0} connection(s) to {1} in undersized pool", n, getSourceName());
1060 return n;
1061 }
1062
1063 protected class ConnectionWrapper implements RepositoryConnection {
1064
1065 private final RepositoryConnection original;
1066 private final long timeCreated;
1067 private long lastUsed;
1068 private boolean closed = false;
1069
1070 protected ConnectionWrapper( RepositoryConnection connection ) {
1071 assert connection != null;
1072 this.original = connection;
1073 this.timeCreated = System.currentTimeMillis();
1074 }
1075
1076 /**
1077 * @return original
1078 */
1079 protected RepositoryConnection getOriginal() {
1080 return this.original;
1081 }
1082
1083 /**
1084 * @return lastUsed
1085 */
1086 public long getTimeLastUsed() {
1087 return this.lastUsed;
1088 }
1089
1090 /**
1091 * @return timeCreated
1092 */
1093 public long getTimeCreated() {
1094 return this.timeCreated;
1095 }
1096
1097 /**
1098 * {@inheritDoc}
1099 */
1100 public String getSourceName() {
1101 return this.original.getSourceName();
1102 }
1103
1104 /**
1105 * {@inheritDoc}
1106 */
1107 public XAResource getXAResource() {
1108 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
1109 return this.original.getXAResource();
1110 }
1111
1112 /**
1113 * {@inheritDoc}
1114 */
1115 public CachePolicy getDefaultCachePolicy() {
1116 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
1117 return this.original.getDefaultCachePolicy();
1118 }
1119
1120 /**
1121 * {@inheritDoc}
1122 *
1123 * @see org.modeshape.graph.connector.RepositoryConnection#execute(org.modeshape.graph.ExecutionContext,
1124 * org.modeshape.graph.request.Request)
1125 */
1126 public void execute( ExecutionContext context,
1127 Request request ) throws RepositorySourceException {
1128 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
1129 this.original.execute(context, request);
1130 }
1131
1132 /**
1133 * {@inheritDoc}
1134 */
1135 public boolean ping( long time,
1136 TimeUnit unit ) throws InterruptedException {
1137 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
1138 return this.original.ping(time, unit);
1139 }
1140
1141 /**
1142 * {@inheritDoc}
1143 */
1144 public void close() {
1145 if (!closed) {
1146 this.lastUsed = System.currentTimeMillis();
1147 this.closed = true;
1148 returnConnection(this);
1149 }
1150 }
1151 }
1152
1153 }