1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package org.modeshape.graph.connector;
25
26 import net.jcip.annotations.GuardedBy;
27 import net.jcip.annotations.ThreadSafe;
28 import org.modeshape.common.util.CheckArg;
29 import org.modeshape.common.util.Logger;
30 import org.modeshape.graph.ExecutionContext;
31 import org.modeshape.graph.GraphI18n;
32 import org.modeshape.graph.cache.CachePolicy;
33 import org.modeshape.graph.request.Request;
34
35 import javax.transaction.xa.XAResource;
36 import java.util.Collection;
37 import java.util.HashSet;
38 import java.util.LinkedList;
39 import java.util.Set;
40 import java.util.concurrent.BlockingQueue;
41 import java.util.concurrent.LinkedBlockingQueue;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicBoolean;
44 import java.util.concurrent.atomic.AtomicInteger;
45 import java.util.concurrent.atomic.AtomicLong;
46 import java.util.concurrent.locks.Condition;
47 import java.util.concurrent.locks.ReentrantLock;
48
49
50
51
52 @ThreadSafe
53 public class RepositoryConnectionPool {
54
55
56
57
58 public static final int DEFAULT_CORE_POOL_SIZE = 1;
59
60
61
62
63 public static final int DEFAULT_MAXIMUM_POOL_SIZE = 10;
64
65
66
67
68 public static final long DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS = 30;
69
70
71
72
73 private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
74
75
76
77
78 private final RepositorySource source;
79
80
81
82
83 private final ReentrantLock mainLock = new ReentrantLock();
84
85
86
87
88 private final Condition termination = mainLock.newCondition();
89
90
91
92
93 @GuardedBy( "mainLock" )
94 private final BlockingQueue<ConnectionWrapper> availableConnections = new LinkedBlockingQueue<ConnectionWrapper>();
95
96
97
98
99 @GuardedBy( "mainLock" )
100 private final Set<ConnectionWrapper> inUseConnections = new HashSet<ConnectionWrapper>();
101
102
103
104
105
106 private volatile long keepAliveTime;
107
108
109
110
111
112 @GuardedBy( "mainLock" )
113 private volatile int corePoolSize;
114
115
116
117
118 @GuardedBy( "mainLock" )
119 private volatile int maximumPoolSize;
120
121
122
123
124 @GuardedBy( "mainLock" )
125 private volatile int poolSize;
126
127
128
129
130 @GuardedBy( "mainLock" )
131 private volatile int runState;
132
133
134
135 static final int RUNNING = 0;
136
137 static final int SHUTDOWN = 1;
138
139 static final int STOP = 2;
140
141 static final int TERMINATED = 3;
142
143
144
145
146 private final AtomicBoolean validateConnectionBeforeUse = new AtomicBoolean(false);
147
148
149
150
151 private final AtomicLong pingTimeout = new AtomicLong(0);
152
153
154
155
156 private final AtomicInteger maxFailedAttemptsBeforeError = new AtomicInteger(10);
157
158 private final AtomicLong totalConnectionsCreated = new AtomicLong(0);
159
160 private final AtomicLong totalConnectionsUsed = new AtomicLong(0);
161
162 private static final Logger LOGGER = Logger.getLogger(RepositoryConnectionPool.class);
163
164
165
166
167
168
169
170
171
172 public RepositoryConnectionPool( RepositorySource source ) {
173 this(source, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS, TimeUnit.SECONDS);
174 }
175
176
177
178
179
180
181
182
183
184
185
186
187 public RepositoryConnectionPool( RepositorySource source,
188 int corePoolSize,
189 int maximumPoolSize,
190 long keepAliveTime,
191 TimeUnit unit ) {
192 CheckArg.isNonNegative(corePoolSize, "corePoolSize");
193 CheckArg.isPositive(maximumPoolSize, "maximumPoolSize");
194 CheckArg.isNonNegative(keepAliveTime, "keepAliveTime");
195 CheckArg.isNotNull(source, "source");
196 if (maximumPoolSize < corePoolSize) {
197 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
198 }
199 this.source = source;
200 this.corePoolSize = corePoolSize;
201 this.maximumPoolSize = maximumPoolSize;
202 this.keepAliveTime = unit.toNanos(keepAliveTime);
203 this.setPingTimeout(100, TimeUnit.MILLISECONDS);
204 }
205
206
207
208
209
210
211 public final RepositorySource getRepositorySource() {
212 return source;
213 }
214
215
216
217
218
219
220 protected String getSourceName() {
221 return source.getName();
222 }
223
224
225
226
227
228
229
230
231 public boolean getValidateConnectionBeforeUse() {
232 return this.validateConnectionBeforeUse.get();
233 }
234
235
236
237
238 public void setValidateConnectionBeforeUse( boolean validateConnectionBeforeUse ) {
239 this.validateConnectionBeforeUse.set(validateConnectionBeforeUse);
240 }
241
242
243
244
245 public long getPingTimeoutInNanos() {
246 return this.pingTimeout.get();
247 }
248
249
250
251
252
253 public void setPingTimeout( long pingTimeout,
254 TimeUnit unit ) {
255 CheckArg.isNonNegative(pingTimeout, "time");
256 this.pingTimeout.set(unit.toNanos(pingTimeout));
257 }
258
259
260
261
262 public int getMaxFailedAttemptsBeforeError() {
263 return this.maxFailedAttemptsBeforeError.get();
264 }
265
266
267
268
269 public void setMaxFailedAttemptsBeforeError( int maxFailedAttemptsBeforeError ) {
270 this.maxFailedAttemptsBeforeError.set(maxFailedAttemptsBeforeError);
271 }
272
273
274
275
276
277
278
279
280
281
282
283
284 public void setKeepAliveTime( long time,
285 TimeUnit unit ) {
286 CheckArg.isNonNegative(time, "time");
287 this.keepAliveTime = unit.toNanos(time);
288 }
289
290
291
292
293
294
295
296
297
298 public long getKeepAliveTime( TimeUnit unit ) {
299 assert unit != null;
300 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
301 }
302
303
304
305
306 public int getMaximumPoolSize() {
307 return this.maximumPoolSize;
308 }
309
310
311
312
313
314
315
316
317
318 public void setMaximumPoolSize( int maximumPoolSize ) {
319 CheckArg.isPositive(maximumPoolSize, "maximum pool size");
320 if (maximumPoolSize < corePoolSize) {
321 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
322 }
323 final ReentrantLock mainLock = this.mainLock;
324 try {
325 mainLock.lock();
326 int extra = this.maximumPoolSize - maximumPoolSize;
327 this.maximumPoolSize = maximumPoolSize;
328 if (extra > 0 && poolSize > maximumPoolSize) {
329
330 drainUnusedConnections(extra);
331 }
332 } finally {
333 mainLock.unlock();
334 }
335 }
336
337
338
339
340
341
342
343 public int getCorePoolSize() {
344 return this.corePoolSize;
345 }
346
347
348
349
350
351
352
353
354
355
356
357
358 public void setCorePoolSize( int corePoolSize ) throws RepositorySourceException, InterruptedException {
359 CheckArg.isNonNegative(corePoolSize, "core pool size");
360 if (maximumPoolSize < corePoolSize) {
361 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
362 }
363 final ReentrantLock mainLock = this.mainLock;
364 try {
365 mainLock.lock();
366 int extra = this.corePoolSize - corePoolSize;
367 this.corePoolSize = corePoolSize;
368 if (extra < 0) {
369
370 addConnectionsIfUnderCorePoolSize();
371 } else if (extra > 0 && poolSize > corePoolSize) {
372
373 drainUnusedConnections(extra);
374 }
375 } finally {
376 mainLock.unlock();
377 }
378 }
379
380
381
382
383
384
385
386
387
388
389
390 public int getPoolSize() {
391 return poolSize;
392 }
393
394
395
396
397
398
399 public int getInUseCount() {
400 final ReentrantLock mainLock = this.mainLock;
401 try {
402 mainLock.lock();
403 return this.inUseConnections.size();
404 } finally {
405 mainLock.unlock();
406 }
407 }
408
409
410
411
412
413
414 public long getTotalConnectionsCreated() {
415 return this.totalConnectionsCreated.get();
416 }
417
418
419
420
421
422
423 public long getTotalConnectionsUsed() {
424 return this.totalConnectionsUsed.get();
425 }
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440 public boolean prestartCoreConnection() throws RepositorySourceException, InterruptedException {
441 final ReentrantLock mainLock = this.mainLock;
442 try {
443 mainLock.lock();
444 return addConnectionIfUnderCorePoolSize();
445 } finally {
446 mainLock.unlock();
447 }
448 }
449
450
451
452
453
454
455
456
457
458 public int prestartAllCoreConnections() throws RepositorySourceException, InterruptedException {
459 final ReentrantLock mainLock = this.mainLock;
460 try {
461 mainLock.lock();
462 return addConnectionsIfUnderCorePoolSize();
463 } finally {
464 mainLock.unlock();
465 }
466 }
467
468
469
470
471
472
473
474
475
476
477
478
479
480 public void shutdown() {
481
482
483
484
485
486 SecurityManager security = System.getSecurityManager();
487 if (security != null) java.security.AccessController.checkPermission(shutdownPerm);
488
489 LOGGER.debug("Shutting down repository connection pool for {0}", getSourceName());
490 boolean fullyTerminated = false;
491 final ReentrantLock mainLock = this.mainLock;
492 try {
493 mainLock.lock();
494 int state = this.runState;
495 if (state == RUNNING) {
496
497 this.runState = SHUTDOWN;
498 }
499
500
501
502
503 if (!this.availableConnections.isEmpty()) {
504
505 drainUnusedConnections(this.availableConnections.size());
506 }
507
508
509 if (this.inUseConnections.isEmpty()) {
510 fullyTerminated = true;
511 LOGGER.trace("Signalling termination of repository connection pool for {0}", getSourceName());
512 runState = TERMINATED;
513 termination.signalAll();
514 LOGGER.debug("Terminated repository connection pool for {0}", getSourceName());
515 }
516
517 } finally {
518 mainLock.unlock();
519 }
520 if (fullyTerminated) terminated();
521 }
522
523
524
525
526
527
528
529
530
531
532 public void shutdownNow() {
533
534 SecurityManager security = System.getSecurityManager();
535 if (security != null) java.security.AccessController.checkPermission(shutdownPerm);
536
537 LOGGER.debug("Shutting down (immediately) repository connection pool for {0}", getSourceName());
538 boolean fullyTerminated = false;
539 final ReentrantLock mainLock = this.mainLock;
540 try {
541 mainLock.lock();
542 int state = this.runState;
543 if (state != TERMINATED) {
544
545 this.runState = STOP;
546 }
547
548
549
550
551 if (!this.availableConnections.isEmpty()) {
552
553 drainUnusedConnections(this.availableConnections.size());
554 }
555
556
557 if (!this.inUseConnections.isEmpty()) {
558 for (ConnectionWrapper connectionInUse : this.inUseConnections) {
559 LOGGER.trace("Closing repository connection to {0}", getSourceName());
560 connectionInUse.getOriginal().close();
561 }
562 this.poolSize -= this.inUseConnections.size();
563
564 } else {
565
566 fullyTerminated = true;
567 LOGGER.trace("Signalling termination of repository connection pool for {0}", getSourceName());
568 runState = TERMINATED;
569 termination.signalAll();
570 LOGGER.debug("Terminated repository connection pool for {0}", getSourceName());
571 }
572
573 } finally {
574 mainLock.unlock();
575 }
576 if (fullyTerminated) terminated();
577 }
578
579
580
581
582
583
584
585
586
587
588 public boolean isRunning() {
589 return runState == RUNNING;
590 }
591
592
593
594
595
596
597
598
599
600
601
602 public boolean isShutdown() {
603 return runState != RUNNING;
604 }
605
606
607
608
609
610
611
612
613
614
615 public boolean isTerminating() {
616 return runState == STOP;
617 }
618
619
620
621
622
623
624
625 public boolean isTerminated() {
626 return runState == TERMINATED;
627 }
628
629
630
631
632
633
634
635
636
637
638
639
640 public boolean awaitTermination( long timeout,
641 TimeUnit unit ) throws InterruptedException {
642 LOGGER.trace("Awaiting termination");
643 long nanos = unit.toNanos(timeout);
644 final ReentrantLock mainLock = this.mainLock;
645 try {
646 mainLock.lock();
647 for (;;) {
648
649 if (runState == TERMINATED) return true;
650 if (nanos <= 0) return false;
651 nanos = termination.awaitNanos(nanos);
652
653
654 }
655 } finally {
656 mainLock.unlock();
657 LOGGER.trace("Finished awaiting termination");
658 }
659 }
660
661
662
663
664
665 protected void terminated() {
666 }
667
668
669
670
671 @Override
672 protected void finalize() {
673 shutdown();
674 }
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689 public RepositoryConnection getConnection() throws RepositorySourceException {
690 int attemptsAllowed = this.maxFailedAttemptsBeforeError.get();
691 ConnectionWrapper connection = null;
692
693 int attemptsRemaining = attemptsAllowed;
694 while (connection == null && attemptsRemaining > 0) {
695 --attemptsRemaining;
696 ReentrantLock mainLock = this.mainLock;
697 try {
698 mainLock.lock();
699
700 if (this.runState != RUNNING) {
701 throw new IllegalStateException(GraphI18n.repositoryConnectionPoolIsNotRunning.text());
702 }
703
704 if (this.poolSize < this.corePoolSize) {
705
706 connection = newWrappedConnection();
707 }
708
709 else if (this.availableConnections.peek() != null) {
710
711 try {
712 connection = this.availableConnections.take();
713 } catch (InterruptedException e) {
714 LOGGER.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName());
715 Thread.interrupted();
716 throw new RepositorySourceException(getSourceName(), e);
717 }
718 }
719
720 else if (this.poolSize < this.maximumPoolSize) {
721
722 connection = newWrappedConnection();
723 }
724 if (connection != null) {
725 this.inUseConnections.add(connection);
726 }
727 } finally {
728 mainLock.unlock();
729 }
730 if (connection == null) {
731
732 LOGGER.trace("Waiting for a repository connection from pool {0}", getSourceName());
733 try {
734 connection = this.availableConnections.take();
735 } catch (InterruptedException e) {
736 LOGGER.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName());
737 Thread.interrupted();
738 throw new RepositorySourceException(getSourceName(), e);
739 }
740 mainLock = this.mainLock;
741 mainLock.lock();
742 try {
743 if (connection != null) {
744 this.inUseConnections.add(connection);
745 }
746 } finally {
747 mainLock.unlock();
748 }
749 LOGGER.trace("Recieved a repository connection from pool {0}", getSourceName());
750 }
751 if (connection != null && this.validateConnectionBeforeUse.get()) {
752 try {
753 connection = validateConnection(connection);
754 } catch (InterruptedException e) {
755 LOGGER.trace("Cancelled validating a repository connection obtained from pool {0}", getSourceName());
756 returnConnection(connection);
757 Thread.interrupted();
758 throw new RepositorySourceException(getSourceName(), e);
759 }
760 }
761 }
762 if (connection == null) {
763
764 throw new RepositorySourceException(GraphI18n.unableToObtainValidRepositoryAfterAttempts.text(attemptsAllowed));
765 }
766 this.totalConnectionsUsed.incrementAndGet();
767 return connection;
768 }
769
770
771
772
773
774
775 protected void returnConnection( ConnectionWrapper wrapper ) {
776 assert wrapper != null;
777 ConnectionWrapper wrapperToClose = null;
778 final ReentrantLock mainLock = this.mainLock;
779 try {
780 mainLock.lock();
781
782 boolean removed = this.inUseConnections.remove(wrapper);
783 assert removed;
784
785
786 if (this.runState != RUNNING) {
787 wrapperToClose = wrapper;
788 }
789
790 else if (this.poolSize > this.maximumPoolSize) {
791
792 wrapperToClose = wrapper;
793 }
794
795
796 else if (!this.availableConnections.offer(new ConnectionWrapper(wrapper.getOriginal()))) {
797
798 wrapperToClose = wrapper;
799 }
800 } finally {
801 mainLock.unlock();
802 }
803
804 if (wrapperToClose != null) {
805 closeConnection(wrapperToClose);
806 }
807 }
808
809
810
811
812
813
814
815
816 protected ConnectionWrapper validateConnection( ConnectionWrapper connection ) throws InterruptedException {
817 assert connection != null;
818 ConnectionWrapper invalidConnection = null;
819 try {
820 if (!connection.ping(this.pingTimeout.get(), TimeUnit.NANOSECONDS)) {
821 invalidConnection = connection;
822 }
823 } finally {
824 if (invalidConnection != null) {
825 try {
826 mainLock.lock();
827 connection = null;
828 this.poolSize--;
829 this.inUseConnections.remove(connection);
830
831 } finally {
832 mainLock.unlock();
833 }
834 }
835 }
836 return connection;
837 }
838
839
840
841
842
843
844
845
846
847
848 @GuardedBy( "mainLock" )
849 protected ConnectionWrapper newWrappedConnection() throws RepositorySourceException {
850 RepositoryConnection connection = this.source.getConnection();
851 ++this.poolSize;
852 this.totalConnectionsCreated.incrementAndGet();
853 return new ConnectionWrapper(connection);
854 }
855
856
857
858
859
860
861
862 protected void closeConnection( ConnectionWrapper wrapper ) {
863 assert wrapper != null;
864 RepositoryConnection original = wrapper.getOriginal();
865 assert original != null;
866 try {
867 LOGGER.debug("Closing repository connection to {0} ({1} open connections remain)", getSourceName(), poolSize);
868 original.close();
869 } finally {
870 final ReentrantLock mainLock = this.mainLock;
871 try {
872 mainLock.lock();
873
874 --this.poolSize;
875
876 if ((runState == SHUTDOWN || runState == STOP) && this.poolSize <= 0) {
877
878 LOGGER.trace("Signalling termination of repository connection pool for {0}", getSourceName());
879 this.runState = TERMINATED;
880 this.termination.signalAll();
881 LOGGER.trace("Terminated repository connection pool for {0}", getSourceName());
882
883
884 }
885 } finally {
886 mainLock.unlock();
887 }
888 }
889 }
890
891 @GuardedBy( "mainLock" )
892 protected int drainUnusedConnections( int count ) {
893 if (count <= 0) return 0;
894 LOGGER.trace("Draining up to {0} unused repository connections to {1}", count, getSourceName());
895
896 Collection<ConnectionWrapper> extraConnections = new LinkedList<ConnectionWrapper>();
897 this.availableConnections.drainTo(extraConnections, count);
898 for (ConnectionWrapper connection : extraConnections) {
899 LOGGER.trace("Closing repository connection to {0}", getSourceName());
900 connection.getOriginal().close();
901 }
902 int numClosed = extraConnections.size();
903 this.poolSize -= numClosed;
904 LOGGER.trace("Drained {0} unused connections ({1} open connections remain)", numClosed, poolSize);
905 return numClosed;
906 }
907
908 @GuardedBy( "mainLock" )
909 protected boolean addConnectionIfUnderCorePoolSize() throws RepositorySourceException {
910
911 if (this.poolSize < this.corePoolSize) {
912 this.availableConnections.offer(newWrappedConnection());
913 LOGGER.trace("Added connection to {0} in undersized pool", getSourceName());
914 return true;
915 }
916 return false;
917 }
918
919 @GuardedBy( "mainLock" )
920 protected int addConnectionsIfUnderCorePoolSize() throws RepositorySourceException {
921
922 int n = 0;
923 while (this.poolSize < this.corePoolSize) {
924 this.availableConnections.offer(newWrappedConnection());
925 ++n;
926 }
927 LOGGER.trace("Added {0} connection(s) to {1} in undersized pool", n, getSourceName());
928 return n;
929 }
930
931 protected class ConnectionWrapper implements RepositoryConnection {
932
933 private final RepositoryConnection original;
934 private final long timeCreated;
935 private long lastUsed;
936 private boolean closed = false;
937
938 protected ConnectionWrapper( RepositoryConnection connection ) {
939 assert connection != null;
940 this.original = connection;
941 this.timeCreated = System.currentTimeMillis();
942 }
943
944
945
946
947 protected RepositoryConnection getOriginal() {
948 return this.original;
949 }
950
951
952
953
954 public long getTimeLastUsed() {
955 return this.lastUsed;
956 }
957
958
959
960
961 public long getTimeCreated() {
962 return this.timeCreated;
963 }
964
965
966
967
968 public String getSourceName() {
969 return this.original.getSourceName();
970 }
971
972
973
974
975 public XAResource getXAResource() {
976 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
977 return this.original.getXAResource();
978 }
979
980
981
982
983 public CachePolicy getDefaultCachePolicy() {
984 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
985 return this.original.getDefaultCachePolicy();
986 }
987
988
989
990
991
992
993
994 public void execute( ExecutionContext context,
995 Request request ) throws RepositorySourceException {
996 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
997 this.original.execute(context, request);
998 }
999
1000
1001
1002
1003 public boolean ping( long time,
1004 TimeUnit unit ) throws InterruptedException {
1005 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
1006 return this.original.ping(time, unit);
1007 }
1008
1009
1010
1011
1012 public void close() {
1013 if (!closed) {
1014 this.lastUsed = System.currentTimeMillis();
1015 this.closed = true;
1016 returnConnection(this);
1017 }
1018 }
1019 }
1020
1021 }