package org.jboss.resource.adapter.jms;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.Vector;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.XAConnection;
import javax.jms.XAQueueConnection;
import javax.jms.XAQueueSession;
import javax.jms.XASession;
import javax.jms.XATopicConnection;
import javax.jms.XATopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.resource.NotSupportedException;
import javax.resource.ResourceException;
import javax.resource.spi.CommException;
import javax.resource.spi.ConnectionEvent;
import javax.resource.spi.ConnectionEventListener;
import javax.resource.spi.ConnectionRequestInfo;
import javax.resource.spi.IllegalStateException;
import javax.resource.spi.LocalTransaction;
import javax.resource.spi.ManagedConnection;
import javax.resource.spi.ManagedConnectionMetaData;
import javax.resource.spi.SecurityException;
import javax.security.auth.Subject;
import javax.transaction.xa.XAResource;
import org.jboss.jms.ConnectionFactoryHelper;
import org.jboss.jms.jndi.JMSProviderAdapter;
import org.jboss.logging.Logger;
import org.jboss.resource.JBossResourceException;
public class JmsManagedConnection
implements ManagedConnection, ExceptionListener
{
private static final Logger log = Logger.getLogger(JmsManagedConnection.class);
private JmsManagedConnectionFactory mcf;
private JmsConnectionRequestInfo info;
private String user;
private String pwd;
private boolean isDestroyed;
private Connection con;
private Session session;
private TopicSession topicSession;
private QueueSession queueSession;
private XASession xaSession;
private XATopicSession xaTopicSession;
private XAQueueSession xaQueueSession;
private XAResource xaResource;
private boolean xaTransacted;
private Set handles = Collections.synchronizedSet(new HashSet());
private Vector listeners = new Vector();
public JmsManagedConnection(final JmsManagedConnectionFactory mcf,
final ConnectionRequestInfo info,
final String user,
final String pwd)
throws ResourceException
{
this.mcf = mcf;
this.info = (JmsConnectionRequestInfo)info;
this.user = user;
this.pwd = pwd;
setup();
}
public Object getConnection(final Subject subject,
final ConnectionRequestInfo info)
throws ResourceException
{
JmsCred cred = JmsCred.getJmsCred(mcf,subject,info);
if (user != null && !user.equals(cred.name))
throw new SecurityException
("Password credentials not the same, reauthentication not allowed");
if (cred.name != null && user == null) {
throw new SecurityException
("Password credentials not the same, reauthentication not allowed");
}
user = cred.name;
if (isDestroyed)
throw new IllegalStateException("ManagedConnection already destroyd");
JmsSession handle = new JmsSession(this, (JmsConnectionRequestInfo) info);
handles.add(handle);
return handle;
}
private void destroyHandles() throws ResourceException
{
try
{
if (con != null)
con.stop();
}
catch (Throwable t)
{
log.trace("Ignored error stopping connection", t);
}
Iterator iter = handles.iterator();
while (iter.hasNext())
((JmsSession)iter.next()).destroy();
handles.clear();
}
public void destroy() throws ResourceException
{
if (isDestroyed) return;
isDestroyed = true;
try
{
con.setExceptionListener(null);
}
catch (JMSException e)
{
log.debug("Error unsetting the exception listener " + this, e);
}
destroyHandles();
try
{
try
{
if (info.getType() == JmsConnectionFactory.TOPIC)
{
topicSession.close();
if (xaTransacted) {
xaTopicSession.close();
}
}
else if (info.getType() == JmsConnectionFactory.QUEUE)
{
queueSession.close();
if (xaTransacted)
xaQueueSession.close();
}
else
{
session.close();
if (xaTransacted)
xaSession.close();
}
}
catch (JMSException e)
{
log.debug("Error closing session " +this, e);
}
con.close();
}
catch (JMSException e)
{
throw new JBossResourceException
("Could not properly close the session and connection", e);
}
}
public void cleanup() throws ResourceException
{
if (isDestroyed)
throw new IllegalStateException("ManagedConnection already destroyed");
destroyHandles();
}
public void associateConnection(final Object obj)
throws ResourceException
{
if (!isDestroyed && obj instanceof JmsSession)
{
JmsSession h = (JmsSession)obj;
h.setManagedConnection(this);
handles.add(h);
}
else
throw new IllegalStateException
("ManagedConnection in an illegal state");
}
public void addConnectionEventListener(final ConnectionEventListener l)
{
listeners.addElement(l);
if (log.isDebugEnabled())
log.debug("ConnectionEvent listener added: " + l);
}
public void removeConnectionEventListener(final ConnectionEventListener l)
{
listeners.removeElement(l);
}
public XAResource getXAResource() throws ResourceException
{
if (!xaTransacted)
throw new NotSupportedException("Non XA transaction not supported");
if (xaResource == null)
{
if (info.getType() == JmsConnectionFactory.TOPIC)
xaResource = xaTopicSession.getXAResource();
else if (info.getType() == JmsConnectionFactory.QUEUE)
xaResource = xaQueueSession.getXAResource();
else
xaResource = xaSession.getXAResource();
}
if (log.isTraceEnabled())
log.trace("XAResource=" + xaResource);
return xaResource;
}
public LocalTransaction getLocalTransaction() throws ResourceException
{
LocalTransaction tx = new JmsLocalTransaction(this);
if (log.isTraceEnabled())
log.trace("LocalTransaction=" + tx);
return tx;
}
public ManagedConnectionMetaData getMetaData() throws ResourceException
{
if (isDestroyed)
throw new IllegalStateException("ManagedConnection already destroyd");
return new JmsMetaData(this);
}
public void setLogWriter(final PrintWriter out) throws ResourceException
{
}
public PrintWriter getLogWriter() throws ResourceException
{
return null;
}
public void onException(JMSException exception)
{
log.warn("Handling jms exception failure: " + this, exception);
try
{
con.setExceptionListener(null);
}
catch (JMSException e)
{
log.debug("Unable to unset exception listener", e);
}
ConnectionEvent event = new ConnectionEvent(this, ConnectionEvent.CONNECTION_ERROR_OCCURRED, exception);
sendEvent(event);
}
protected Session getSession()
{
if (info.getType() == JmsConnectionFactory.TOPIC)
return topicSession;
else if (info.getType() == JmsConnectionFactory.QUEUE)
return queueSession;
else
return session;
}
protected void sendEvent(final ConnectionEvent event)
{
int type = event.getId();
if (log.isTraceEnabled())
log.trace("Sending connection event: " + type);
ConnectionEventListener[] list =
(ConnectionEventListener[])listeners.toArray(new ConnectionEventListener[listeners.size()]);
for (int i=0; i<list.length; i++)
{
switch (type) {
case ConnectionEvent.CONNECTION_CLOSED:
list[i].connectionClosed(event);
break;
case ConnectionEvent.LOCAL_TRANSACTION_STARTED:
list[i].localTransactionStarted(event);
break;
case ConnectionEvent.LOCAL_TRANSACTION_COMMITTED:
list[i].localTransactionCommitted(event);
break;
case ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK:
list[i].localTransactionRolledback(event);
break;
case ConnectionEvent.CONNECTION_ERROR_OCCURRED:
list[i].connectionErrorOccurred(event);
break;
default:
throw new IllegalArgumentException("Illegal eventType: " + type);
}
}
}
protected void removeHandle(final JmsSession handle)
{
handles.remove(handle);
}
protected ConnectionRequestInfo getInfo()
{
return info;
}
protected JmsManagedConnectionFactory getManagedConnectionFactory()
{
return mcf;
}
void start() throws JMSException
{
con.start();
}
void stop() throws JMSException
{
con.stop();
}
protected String getUserName()
{
return user;
}
private JMSProviderAdapter getProviderAdapter() throws NamingException
{
JMSProviderAdapter adapter;
if (mcf.getJmsProviderAdapterJNDI() != null)
{
Context ctx = new InitialContext();
try
{
adapter = (JMSProviderAdapter)
ctx.lookup(mcf.getJmsProviderAdapterJNDI());
}
finally
{
ctx.close();
}
}
else
adapter = mcf.getJmsProviderAdapter();
return adapter;
}
private void setup() throws ResourceException
{
boolean debug = log.isDebugEnabled();
try
{
JMSProviderAdapter adapter = getProviderAdapter();
Context context = adapter.getInitialContext();
Object factory;
boolean transacted = info.isTransacted();
int ack = Session.AUTO_ACKNOWLEDGE;
if (info.getType() == JmsConnectionFactory.TOPIC)
{
factory = context.lookup(adapter.getTopicFactoryRef());
con = ConnectionFactoryHelper.createTopicConnection(factory, user, pwd);
if (info.getClientID() != null)
con.setClientID(info.getClientID());
con.setExceptionListener(this);
if (debug) log.debug("created connection: " + con);
if (con instanceof XATopicConnection)
{
xaTopicSession = ((XATopicConnection)con).createXATopicSession();
topicSession = xaTopicSession.getTopicSession();
xaTransacted = true;
}
else if (con instanceof TopicConnection)
{
topicSession =
((TopicConnection)con).createTopicSession(transacted, ack);
if (debug)
log.debug("Using a non-XA TopicConnection. " +
"It will not be able to participate in a Global UOW");
}
else
throw new JBossResourceException("Connection was not recognizable: " + con);
if (debug)
log.debug("xaTopicSession=" + xaTopicSession + ", topicSession=" + topicSession);
}
else if (info.getType() == JmsConnectionFactory.QUEUE)
{
factory = context.lookup(adapter.getQueueFactoryRef());
con = ConnectionFactoryHelper.createQueueConnection(factory, user, pwd);
if (info.getClientID() != null)
con.setClientID(info.getClientID());
con.setExceptionListener(this);
if (debug) log.debug("created connection: " + con);
if (con instanceof XAQueueConnection)
{
xaQueueSession =
((XAQueueConnection)con).createXAQueueSession();
queueSession = xaQueueSession.getQueueSession();
xaTransacted = true;
}
else if (con instanceof QueueConnection)
{
queueSession =
((QueueConnection)con).createQueueSession(transacted, ack);
if (debug)
log.debug("Using a non-XA QueueConnection. " +
"It will not be able to participate in a Global UOW");
}
else
throw new JBossResourceException("Connection was not reconizable: " + con);
if (debug)
log.debug("xaQueueSession=" + xaQueueSession + ", queueSession=" + queueSession);
}
else
{
factory = context.lookup(adapter.getFactoryRef());
con = ConnectionFactoryHelper.createConnection(factory, user, pwd);
if (info.getClientID() != null)
con.setClientID(info.getClientID());
con.setExceptionListener(this);
if (debug) log.debug("created connection: " + con);
if (con instanceof XAConnection)
{
xaSession =
((XAConnection)con).createXASession();
session = xaSession.getSession();
xaTransacted = true;
}
else
{
session = con.createSession(transacted, ack);
if (debug)
log.debug("Using a non-XA Connection. " +
"It will not be able to participate in a Global UOW");
}
if (debug)
log.debug("xaSession=" + xaQueueSession + ", Session=" + session);
}
if (debug)
log.debug("transacted=" + transacted + ", ack=" + ack);
}
catch (NamingException e)
{
CommException ce = new CommException(e.toString());
ce.setLinkedException(e);
throw ce;
}
catch (JMSException e)
{
CommException ce = new CommException(e.toString());
ce.setLinkedException(e);
throw ce;
}
}
}