package org.jboss.resource.adapter.jms;
import java.util.HashSet;
import java.util.Iterator;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import javax.jms.TopicSession;
import javax.naming.Reference;
import javax.resource.Referenceable;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionManager;
import javax.resource.spi.ManagedConnectionFactory;
import org.jboss.logging.Logger;
public class JmsSessionFactoryImpl
implements JmsSessionFactory, Referenceable
{
private static final Logger log = Logger.getLogger(JmsSessionFactoryImpl.class);
private boolean closed = false;
private boolean trace = log.isTraceEnabled();
private Reference reference;
private String userName;
private String password;
private String clientID;
private int type;
private boolean started = false;
private JmsManagedConnectionFactory mcf;
private ConnectionManager cm;
private HashSet sessions = new HashSet();
private HashSet tempQueues = new HashSet();
private HashSet tempTopics = new HashSet();
public JmsSessionFactoryImpl(final ManagedConnectionFactory mcf,
final ConnectionManager cm,
final int type)
{
this.mcf = (JmsManagedConnectionFactory) mcf;
this.cm = cm;
if (cm == null)
this.cm = new JmsConnectionManager();
else
this.cm = cm;
this.type = type;
if (trace)
log.trace("mcf=" + mcf + ", cm=" + cm + ", type=" + type);
}
public void setReference(final Reference reference)
{
this.reference = reference;
}
public Reference getReference()
{
return reference;
}
public void setUserName(final String name)
{
userName = name;
}
public void setPassword(final String password)
{
this.password = password;
}
public QueueSession createQueueSession(final boolean transacted,
final int acknowledgeMode)
throws JMSException
{
checkClosed();
if (type == JmsConnectionFactory.TOPIC)
throw new IllegalStateException("Can not get a queue session from a topic connection");
return allocateConnection(transacted, acknowledgeMode, type);
}
public ConnectionConsumer createConnectionConsumer
(Queue queue,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages)
throws JMSException
{
throw new IllegalStateException(ISE);
}
public TopicSession createTopicSession(final boolean transacted,
final int acknowledgeMode)
throws JMSException
{
checkClosed();
if (type == JmsConnectionFactory.QUEUE)
throw new IllegalStateException("Can not get a topic session from a queue connection");
return allocateConnection(transacted, acknowledgeMode, type);
}
public ConnectionConsumer createConnectionConsumer
(Topic topic,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages)
throws JMSException
{
throw new IllegalStateException(ISE);
}
public ConnectionConsumer createDurableConnectionConsumer(
Topic topic,
String subscriptionName,
String messageSelector,
ServerSessionPool sessionPool,
int maxMessages)
throws JMSException
{
throw new IllegalStateException(ISE);
}
public String getClientID() throws JMSException
{
checkClosed();
return clientID;
}
public void setClientID(String cID) throws JMSException
{
if (mcf.isStrict())
throw new IllegalStateException(ISE);
checkClosed();
if (clientID != null)
throw new IllegalStateException("Cannot change client id");
clientID = cID;
}
public ConnectionMetaData getMetaData() throws JMSException
{
checkClosed();
return mcf.getMetaData();
}
public ExceptionListener getExceptionListener() throws JMSException
{
throw new IllegalStateException(ISE);
}
public void setExceptionListener(ExceptionListener listener)
throws JMSException
{
throw new IllegalStateException(ISE);
}
public void start() throws JMSException
{
checkClosed();
if (trace)
log.trace("start() " + this);
synchronized (sessions)
{
if (started)
return;
started = true;
for (Iterator i = sessions.iterator(); i.hasNext();)
{
JmsSession session = (JmsSession) i.next();
session.start();
}
}
}
public void stop() throws JMSException
{
if (mcf.isStrict())
throw new IllegalStateException(ISE);
checkClosed();
if (trace)
log.trace("stop() " + this);
synchronized (sessions)
{
if (started == false)
return;
started = true;
for (Iterator i = sessions.iterator(); i.hasNext();)
{
JmsSession session = (JmsSession) i.next();
session.stop();
}
}
}
public void close() throws JMSException
{
if (closed)
return;
closed = true;
if (trace)
log.trace("close() " + this);
synchronized (sessions)
{
for (Iterator i = sessions.iterator(); i.hasNext();)
{
JmsSession session = (JmsSession) i.next();
try
{
session.closeSession();
}
catch (Throwable t)
{
log.trace("Error closing session", t);
}
i.remove();
}
}
synchronized (tempQueues)
{
for (Iterator i = tempQueues.iterator(); i.hasNext();)
{
TemporaryQueue temp = (TemporaryQueue) i.next();
try
{
if (trace)
log.trace("Closing temporary queue " + temp + " for " + this);
temp.delete();
}
catch (Throwable t)
{
log.trace("Error deleting temporary queue", t);
}
i.remove();
}
}
synchronized (tempTopics)
{
for (Iterator i = tempTopics.iterator(); i.hasNext();)
{
TemporaryTopic temp = (TemporaryTopic) i.next();
try
{
if (trace)
log.trace("Closing temporary topic " + temp + " for " + this);
temp.delete();
}
catch (Throwable t)
{
log.trace("Error deleting temporary queue", t);
}
i.remove();
}
}
}
public void closeSession(JmsSession session) throws JMSException
{
synchronized (sessions)
{
sessions.remove(session);
}
}
public void addTemporaryQueue(TemporaryQueue temp)
{
synchronized(tempQueues)
{
tempQueues.add(temp);
}
}
public void addTemporaryTopic(TemporaryTopic temp)
{
synchronized(tempTopics)
{
tempTopics.add(temp);
}
}
public ConnectionConsumer createConnectionConsumer(Destination destination, ServerSessionPool pool, int maxMessages) throws JMSException
{
throw new IllegalStateException(ISE);
}
public ConnectionConsumer createConnectionConsumer(Destination destination, String name, ServerSessionPool pool, int maxMessages) throws JMSException
{
throw new IllegalStateException(ISE);
}
public Session createSession(boolean transacted, int acknowledgeMode)
throws JMSException
{
checkClosed();
return allocateConnection(transacted, acknowledgeMode, type);
}
protected JmsSession allocateConnection(boolean transacted, int acknowledgeMode, int sessionType) throws JMSException
{
try
{
synchronized (sessions)
{
if (mcf.isStrict() && sessions.isEmpty() == false)
throw new IllegalStateException("Only allowed one session per connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6");
if (transacted)
acknowledgeMode = Session.SESSION_TRANSACTED;
JmsConnectionRequestInfo info = new JmsConnectionRequestInfo(transacted, acknowledgeMode, sessionType);
info.setUserName(userName);
info.setPassword(password);
info.setClientID(clientID);
if (trace)
log.trace("Allocating session for " + this + " with request info=" + info);
JmsSession session = (JmsSession) cm.allocateConnection(mcf, info);
if (trace)
log.trace("Allocated " + this + " session=" + session);
session.setJmsSessionFactory(this);
if (started)
session.start();
sessions.add(session);
return session;
}
}
catch (ResourceException e)
{
log.error("could not create session", e);
JMSException je = new JMSException
("Could not create a session: " + e);
je.setLinkedException(e);
throw je;
}
}
protected void checkClosed() throws IllegalStateException
{
if (closed)
throw new IllegalStateException("The connection is closed");
}
}