package org.jboss.resource.adapter.jms.inflow;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ServerSession;
import javax.jms.Session;
import javax.jms.XAConnection;
import javax.jms.XASession;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkEvent;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkListener;
import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;
import org.jboss.logging.Logger;
public class JmsServerSession implements ServerSession, MessageListener, Work, WorkListener
{
private static final Logger log = Logger.getLogger(JmsServerSession.class);
JmsServerSessionPool pool;
boolean transacted;
int acknowledge;
Session session;
XASession xaSession;
MessageEndpoint endpoint;
DLQHandler dlqHandler;
public JmsServerSession(JmsServerSessionPool pool)
{
this.pool = pool;
}
public void setup() throws Exception
{
JmsActivation activation = pool.getActivation();
JmsActivationSpec spec = activation.getActivationSpec();
dlqHandler = activation.getDLQHandler();
Connection connection = activation.getConnection();
if (connection instanceof XAConnection)
{
xaSession = ((XAConnection) connection).createXASession();
session = xaSession.getSession();
}
else
{
transacted = spec.isSessionTransacted();
acknowledge = spec.getAcknowledgeModeInt();
session = connection.createSession(transacted, acknowledge);
}
MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
XAResource xaResource = null;
if (activation.isDeliveryTransacted() && xaSession != null)
xaResource = xaSession.getXAResource();
endpoint = endpointFactory.createEndpoint(xaResource);
session.setMessageListener(this);
}
public void teardown()
{
try
{
if (endpoint != null)
endpoint.release();
}
catch (Throwable t)
{
log.debug("Error releasing endpoint " + endpoint, t);
}
try
{
if (xaSession != null)
xaSession.close();
}
catch (Throwable t)
{
log.debug("Error releasing xaSession " + xaSession, t);
}
try
{
if (session != null)
session.close();
}
catch (Throwable t)
{
log.debug("Error releasing session " + session, t);
}
}
public void onMessage(Message message)
{
try
{
endpoint.beforeDelivery(JmsActivation.ONMESSAGE);
try
{
if (dlqHandler == null || dlqHandler.handleRedeliveredMessage(message) == false)
((MessageListener) endpoint).onMessage(message);
}
finally
{
endpoint.afterDelivery();
if (dlqHandler != null)
dlqHandler.messageDelivered(message);
}
}
catch (Throwable t)
{
log.error("Unexpected error delivering message " + message, t);
}
}
public Session getSession() throws JMSException
{
return session;
}
public void start() throws JMSException
{
JmsActivation activation = pool.getActivation();
WorkManager workManager = activation.getWorkManager();
try
{
workManager.scheduleWork(this, 0, null, this);
}
catch (WorkException e)
{
log.error("Unable to schedule work", e);
throw new JMSException("Unable to schedule work: " + e.toString());
}
}
public void run()
{
session.run();
}
public void release()
{
}
public void workAccepted(WorkEvent e)
{
}
public void workCompleted(WorkEvent e)
{
pool.returnServerSession(this);
}
public void workRejected(WorkEvent e)
{
pool.returnServerSession(this);
}
public void workStarted(WorkEvent e)
{
}
}