package org.jboss.jms.asf;
import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.XAQueueConnection;
import javax.jms.XAQueueSession;
import javax.jms.XASession;
import javax.jms.XATopicConnection;
import javax.jms.XATopicSession;
import org.jboss.logging.Logger;
import org.jboss.tm.XidFactoryMBean;
public class StdServerSessionPool
implements ServerSessionPool
{
private static ThreadGroup threadGroup =
new ThreadGroup("ASF Session Pool Threads");
private final Logger log = Logger.getLogger(this.getClass());
private int minSize;
private int poolSize;
private long keepAlive;
private int ack;
private boolean useLocalTX;
private boolean transacted;
private Destination destination;
private Connection con;
private MessageListener listener;
private List sessionPool;
private PooledExecutor executor;
private boolean closing = false;
private int numServerSessions = 0;
private XidFactoryMBean xidFactory;
public StdServerSessionPool(final Destination destination,
final Connection con,
final boolean transacted,
final int ack,
final boolean useLocalTX,
final MessageListener listener,
final int minSession,
final int maxSession,
final long keepAlive,
final XidFactoryMBean xidFactory)
throws JMSException
{
this.destination = destination;
this.con = con;
this.ack = ack;
this.listener = listener;
this.transacted = transacted;
this.minSize = minSession;
this.poolSize = maxSession;
this.keepAlive = keepAlive;
this.sessionPool = new ArrayList(maxSession);
this.useLocalTX = useLocalTX;
this.xidFactory = xidFactory;
executor = new MyPooledExecutor(poolSize);
executor.setMinimumPoolSize(minSize);
executor.setKeepAliveTime(keepAlive);
executor.waitWhenBlocked();
executor.setThreadFactory(new DefaultThreadFactory());
create();
log.debug("Server Session pool set up");
}
public ServerSession getServerSession() throws JMSException
{
if( log.isTraceEnabled() )
log.trace("getting a server session");
ServerSession session = null;
try
{
while (true)
{
synchronized (sessionPool)
{
if (closing)
{
throw new JMSException("Cannot get session after pool has been closed down.");
}
else if (sessionPool.size() > 0)
{
session = (ServerSession)sessionPool.remove(0);
break;
}
else
{
try
{
sessionPool.wait();
}
catch (InterruptedException ignore)
{
}
}
}
}
}
catch (Exception e)
{
throw new JMSException("Failed to get a server session: " + e);
}
if( log.isTraceEnabled() )
log.trace("using server session: " + session);
return session;
}
public void clear()
{
synchronized (sessionPool)
{
closing = true;
if (log.isDebugEnabled())
{
log.debug("Clearing " + sessionPool.size() +
" from ServerSessionPool");
}
Iterator iter = sessionPool.iterator();
while (iter.hasNext())
{
StdServerSession ses = (StdServerSession)iter.next();
ses.close();
numServerSessions--;
}
sessionPool.clear();
sessionPool.notifyAll();
}
executor.shutdownAfterProcessingCurrentlyQueuedTasks();
synchronized (sessionPool)
{
while (numServerSessions > 0)
{
try
{
sessionPool.wait();
}
catch (InterruptedException ignore)
{
}
}
}
}
Executor getExecutor()
{
return executor;
}
boolean isTransacted()
{
return transacted;
}
void recycle(StdServerSession session)
{
synchronized (sessionPool)
{
if (closing)
{
session.close();
numServerSessions--;
if (numServerSessions == 0)
{
sessionPool.notifyAll();
}
}
else
{
sessionPool.add(session);
sessionPool.notifyAll();
if( log.isTraceEnabled() )
log.trace("recycled server session: " + session);
}
}
}
private void create() throws JMSException
{
boolean debug = log.isDebugEnabled();
for (int index = 0; index < poolSize; index++)
{
Session ses = null;
XASession xaSes = null;
if (debug)
log.debug("initializing with connection: " + con);
if (destination instanceof Topic && con instanceof XATopicConnection)
{
xaSes = ((XATopicConnection)con).createXATopicSession();
ses = ((XATopicSession)xaSes).getTopicSession();
}
else if (destination instanceof Queue && con instanceof XAQueueConnection)
{
xaSes = ((XAQueueConnection)con).createXAQueueSession();
ses = ((XAQueueSession)xaSes).getQueueSession();
}
else if (destination instanceof Topic && con instanceof TopicConnection)
{
ses = ((TopicConnection)con).createTopicSession(transacted, ack);
log.warn("Using a non-XA TopicConnection. " +
"It will not be able to participate in a Global UOW");
}
else if (destination instanceof Queue && con instanceof QueueConnection)
{
ses = ((QueueConnection)con).createQueueSession(transacted, ack);
log.warn("Using a non-XA QueueConnection. " +
"It will not be able to participate in a Global UOW");
}
else
{
throw new JMSException("Connection was not reconizable: " + con + " for destination " + destination);
}
StdServerSession serverSession = new StdServerSession(this, ses, xaSes,
listener, useLocalTX, xidFactory);
sessionPool.add(serverSession);
numServerSessions++;
if (debug)
log.debug("added server session to the pool: " + serverSession);
}
}
private static class MyPooledExecutor extends PooledExecutor
{
public MyPooledExecutor(int poolSize)
{
super(poolSize);
}
protected Runnable getTask() throws InterruptedException
{
Runnable task = null;
while ((task = super.getTask()) == null && keepRunning());
return task;
}
protected synchronized boolean keepRunning()
{
if (shutdown_)
return false;
return poolSize_ <= minimumPoolSize_;
}
}
private static class DefaultThreadFactory implements ThreadFactory
{
private static int count = 0;
private static synchronized int nextCount()
{
return count ++;
}
public Thread newThread(final Runnable command)
{
String name = "JMS SessionPool Worker-" + nextCount();
return new Thread(threadGroup, command, name);
}
}
}