package org.jboss.resource.connectionmanager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionRequestInfo;
import javax.resource.spi.ManagedConnection;
import javax.resource.spi.ManagedConnectionFactory;
import javax.security.auth.Subject;
import org.jboss.logging.Logger;
import org.jboss.resource.JBossResourceException;
import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
public class InternalManagedConnectionPool
{
private final ManagedConnectionFactory mcf;
private final ConnectionListenerFactory clf;
private final Subject defaultSubject;
private final ConnectionRequestInfo defaultCri;
private final PoolParams poolParams;
private int maxSize;
private ArrayList cls;
private final FIFOSemaphore permits;
private final Logger log;
private final boolean trace;
private final Counter connectionCounter = new Counter();
private final HashSet checkedOut = new HashSet();
private boolean started = false;
private SynchronizedBoolean shutdown = new SynchronizedBoolean(false);
private volatile int maxUsedConnections = 0;
public InternalManagedConnectionPool(ManagedConnectionFactory mcf, ConnectionListenerFactory clf, Subject subject,
ConnectionRequestInfo cri, PoolParams poolParams, Logger log)
{
this.mcf = mcf;
this.clf = clf;
defaultSubject = subject;
defaultCri = cri;
this.poolParams = poolParams;
this.maxSize = poolParams.maxSize;
this.log = log;
this.trace = log.isTraceEnabled();
cls = new ArrayList(this.maxSize);
permits = new FIFOSemaphore(this.maxSize);
if (poolParams.idleTimeout != 0)
IdleRemover.registerPool(this, poolParams.idleTimeout);
}
public long getAvailableConnections()
{
return permits.permits();
}
public int getMaxConnectionsInUseCount()
{
return maxUsedConnections;
}
public int getConnectionInUseCount()
{
return checkedOut.size();
}
public ConnectionListener getConnection(Subject subject, ConnectionRequestInfo cri) throws ResourceException
{
subject = (subject == null) ? defaultSubject : subject;
cri = (cri == null) ? defaultCri : cri;
long startWait = System.currentTimeMillis();
try
{
if (permits.attempt(poolParams.blockingTimeout))
{
ConnectionListener cl = null;
do
{
synchronized (cls)
{
if (shutdown.get())
{
permits.release();
throw new ResourceException("The pool has been shutdown");
}
if (cls.size() > 0)
{
cl = (ConnectionListener) cls.remove(cls.size() - 1);
checkedOut.add(cl);
int size = (int) (maxSize - permits.permits());
if (size > maxUsedConnections)
maxUsedConnections = size;
}
}
if (cl != null)
{
try
{
Object matchedMC = mcf.matchManagedConnections(Collections.singleton(cl.getManagedConnection()),
subject, cri);
if (matchedMC != null)
{
if (trace)
log.trace("supplying ManagedConnection from pool: " + cl);
cl.grantPermit(true);
return cl;
}
log.warn("Destroying connection that could not be successfully matched: " + cl);
synchronized (cls)
{
checkedOut.remove(cl);
}
doDestroy(cl);
cl = null;
}
catch (Throwable t)
{
log.warn("Throwable while trying to match ManagedConnection, destroying connection: " + cl, t);
synchronized (cls)
{
checkedOut.remove(cl);
}
doDestroy(cl);
cl = null;
}
} }
while (cls.size() > 0);
try
{
cl = createConnectionEventListener(subject, cri);
synchronized (cls)
{
checkedOut.add(cl);
int size = (int) (maxSize - permits.permits());
if (size > maxUsedConnections)
maxUsedConnections = size;
}
if (started == false)
{
started = true;
if (poolParams.minSize > 0)
PoolFiller.fillPool(this);
}
if (trace)
log.trace("supplying new ManagedConnection: " + cl);
cl.grantPermit(true);
return cl;
}
catch (Throwable t)
{
log.warn("Throwable while attempting to get a new connection: " + cl, t);
synchronized (cls)
{
checkedOut.remove(cl);
}
permits.release();
if (t instanceof ResourceException)
throw (ResourceException) t;
throw new JBossResourceException("Unexpected throwable while trying to create a connection: " + cl, t);
}
}
else
{
throw new ResourceException("No ManagedConnections available within configured blocking timeout ( "
+ poolParams.blockingTimeout + " [ms] )");
}
}
catch (InterruptedException ie)
{
long end = System.currentTimeMillis() - startWait;
throw new ResourceException("Interrupted while requesting permit! Waited " + end + " ms");
}
}
public void returnConnection(ConnectionListener cl, boolean kill)
{
if (cl.getState() == ConnectionListener.DESTROYED)
{
log.trace("ManagedConnection is being returned after it was destroyed" + cl);
if (cl.hasPermit())
{
cl.grantPermit(false);
permits.release();
}
return;
}
if (trace)
log.trace("putting ManagedConnection back into pool kill=" + kill + " cl=" + cl);
try
{
cl.getManagedConnection().cleanup();
}
catch (ResourceException re)
{
log.warn("ResourceException cleaning up ManagedConnection: " + cl, re);
kill = true;
}
if (cl.getState() == ConnectionListener.DESTROY)
kill = true;
synchronized (cls)
{
checkedOut.remove(cl);
if (kill == false && cls.size() >= poolParams.maxSize)
{
log.warn("Destroying returned connection, maximum pool size exceeded " + cl);
kill = true;
}
if (kill)
{
cls.remove(cl);
}
else
{
cl.used();
cls.add(cl);
}
if (cl.hasPermit())
{
cl.grantPermit(false);
permits.release();
}
}
if (kill)
{
if (trace)
log.trace("Destroying returned connection " + cl);
doDestroy(cl);
}
}
public void flush()
{
ArrayList destroy = null;
synchronized (cls)
{
if (trace)
log.trace("Flushing pool checkedOut=" + checkedOut + " inPool=" + cls);
for (Iterator i = checkedOut.iterator(); i.hasNext();)
{
ConnectionListener cl = (ConnectionListener) i.next();
if (trace)
log.trace("Flush marking checked out connection for destruction " + cl);
cl.setState(ConnectionListener.DESTROY);
}
while (cls.size() > 0)
{
ConnectionListener cl = (ConnectionListener) cls.remove(0);
if (destroy == null)
destroy = new ArrayList();
destroy.add(cl);
}
}
if (destroy != null)
{
for (int i = 0; i < destroy.size(); ++i)
{
ConnectionListener cl = (ConnectionListener) destroy.get(i);
if (trace)
log.trace("Destroying flushed connection " + cl);
doDestroy(cl);
}
if (shutdown.get() == false && poolParams.minSize > 0)
PoolFiller.fillPool(this);
}
}
public void removeTimedOut()
{
ArrayList destroy = null;
long timeout = System.currentTimeMillis() - poolParams.idleTimeout;
while (true)
{
synchronized (cls)
{
if (cls.size() == 0)
break;
ConnectionListener cl = (ConnectionListener) cls.get(0);
if (cl.isTimedOut(timeout))
{
cls.remove(0);
if (destroy == null)
destroy = new ArrayList();
destroy.add(cl);
}
else
{
break;
}
}
}
if (destroy != null)
{
for (int i = 0; i < destroy.size(); ++i)
{
ConnectionListener cl = (ConnectionListener) destroy.get(i);
if (trace)
log.trace("Destroying timedout connection " + cl);
doDestroy(cl);
}
if (shutdown.get() == false && poolParams.minSize > 0)
PoolFiller.fillPool(this);
}
}
public void shutdown()
{
shutdown.set(true);
IdleRemover.unregisterPool(this);
flush();
}
public void fillToMin()
{
while (true)
{
try
{
if (permits.attempt(poolParams.blockingTimeout))
{
try
{
if (shutdown.get())
return;
if (getMinSize() - connectionCounter.getGuaranteedCount() <= 0)
return;
try
{
ConnectionListener cl = createConnectionEventListener(defaultSubject, defaultCri);
synchronized (cls)
{
if (trace)
log.trace("Filling pool cl=" + cl);
cls.add(cl);
}
}
catch (ResourceException re)
{
log.warn("Unable to fill pool ", re);
return;
}
}
finally
{
permits.release();
}
}
}
catch (InterruptedException ignored)
{
log.trace("Interrupted while requesting permit in fillToMin");
}
}
}
public int getConnectionCount()
{
return connectionCounter.getCount();
}
public int getConnectionCreatedCount()
{
return connectionCounter.getCreatedCount();
}
public int getConnectionDestroyedCount()
{
return connectionCounter.getDestroyedCount();
}
private ConnectionListener createConnectionEventListener(Subject subject, ConnectionRequestInfo cri)
throws ResourceException
{
ManagedConnection mc = mcf.createManagedConnection(subject, cri);
connectionCounter.inc();
try
{
return clf.createConnectionListener(mc, this);
}
catch (ResourceException re)
{
connectionCounter.dec();
mc.destroy();
throw re;
}
}
private void doDestroy(ConnectionListener cl)
{
if (cl.getState() == ConnectionListener.DESTROYED)
{
log.trace("ManagedConnection is already destroyed " + cl);
return;
}
connectionCounter.dec();
cl.setState(ConnectionListener.DESTROYED);
try
{
cl.getManagedConnection().destroy();
}
catch (Throwable t)
{
log.warn("Exception destroying ManagedConnection " + cl, t);
}
}
private int getMinSize()
{
if (poolParams.minSize > maxSize)
return maxSize;
return poolParams.minSize;
}
public static class PoolParams
{
public int minSize = 0;
public int maxSize = 10;
public int blockingTimeout = 30000;
public long idleTimeout = 1000 * 60 * 30; }
private static class Counter
{
private int created = 0;
private int destroyed = 0;
synchronized int getGuaranteedCount()
{
return created - destroyed;
}
int getCount()
{
return created - destroyed;
}
int getCreatedCount()
{
return created;
}
int getDestroyedCount()
{
return destroyed;
}
synchronized void inc()
{
++created;
}
synchronized void dec()
{
++destroyed;
}
}
}