package org.jboss.jms.asf;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ServerSession;
import javax.jms.Session;
import javax.jms.XASession;
import javax.naming.InitialContext;
import javax.transaction.Status;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.jboss.logging.Logger;
import org.jboss.tm.TransactionManagerService;
import org.jboss.tm.XidFactoryMBean;
public class StdServerSession
implements Runnable, ServerSession, MessageListener
{
static Logger log = Logger.getLogger(StdServerSession.class);
private StdServerSessionPool serverSessionPool;
private Session session;
private XASession xaSession;
private TransactionManager tm;
private boolean useLocalTX;
private MessageListener delegateListener;
private XidFactoryMBean xidFactory;
StdServerSession(final StdServerSessionPool pool,
final Session session,
final XASession xaSession,
final MessageListener delegateListener,
boolean useLocalTX,
final XidFactoryMBean xidFactory)
throws JMSException
{
this.serverSessionPool = pool;
this.session = session;
this.xaSession = xaSession;
this.delegateListener = delegateListener;
if( xaSession == null )
useLocalTX = false;
this.useLocalTX = useLocalTX;
this.xidFactory = xidFactory;
if (log.isDebugEnabled())
log.debug("initializing (pool, session, xaSession, useLocalTX): " +
pool + ", " + session + ", " + xaSession + ", " + useLocalTX);
if (xaSession != null)
xaSession.setMessageListener(this);
else
session.setMessageListener(this);
InitialContext ctx = null;
try
{
ctx = new InitialContext();
tm = (TransactionManager) ctx.lookup(TransactionManagerService.JNDI_NAME);
}
catch (Exception e)
{
throw new JMSException("Transation manager was not found");
}
finally
{
if (ctx != null)
{
try
{
ctx.close();
}
catch (Exception ignore)
{
}
}
}
}
public Session getSession() throws JMSException
{
if (xaSession != null)
return xaSession;
else
return session;
}
public void run()
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("running...");
try
{
if (xaSession != null)
xaSession.run();
else
session.run();
}
finally
{
if (trace)
log.trace("recycling...");
recycle();
if (trace)
log.trace("finished run");
}
}
public void onMessage(Message msg)
{
boolean trace = log.isTraceEnabled();
if( trace )
log.trace("onMessage running (pool, session, xaSession, useLocalTX): " +
", " + session + ", " + xaSession + ", " + useLocalTX);
Xid localXid = null;
boolean localRollbackFlag=false;
Transaction trans = null;
try
{
if (useLocalTX)
{
localXid = xidFactory.newXid(); XAResource res = xaSession.getXAResource();
res.start(localXid, XAResource.TMNOFLAGS);
if( trace )
log.trace("Using optimized 1p commit to control TX.");
}
else
{
tm.begin();
trans = tm.getTransaction();
if (xaSession != null)
{
XAResource res = xaSession.getXAResource();
if (!trans.enlistResource(res))
{
throw new JMSException("could not enlist resource");
}
if( trace )
log.trace("XAResource '" + res + "' enlisted.");
}
}
delegateListener.onMessage(msg);
}
catch (Exception e)
{
log.error("session failed to run; setting rollback only", e);
if (useLocalTX)
{
localRollbackFlag = true;
}
else
{
try
{
if( trace )
log.trace("Using TM to mark TX for rollback.");
trans.setRollbackOnly();
}
catch (Exception x)
{
log.error("failed to set rollback only", x);
}
}
}
finally
{
try
{
if (useLocalTX)
{
if( localRollbackFlag == true )
{
if( trace )
log.trace("Using optimized 1p commit to rollback TX.");
XAResource res = xaSession.getXAResource();
res.end(localXid, XAResource.TMSUCCESS);
res.rollback(localXid);
}
else
{
if( trace )
log.trace("Using optimized 1p commit to commit TX.");
XAResource res = xaSession.getXAResource();
res.end(localXid, XAResource.TMSUCCESS);
res.commit(localXid, true);
}
}
else
{
Transaction currentTx = tm.getTransaction();
if (trans.equals(currentTx) == false)
throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
{
if( trace )
log.trace("Rolling back JMS transaction");
tm.rollback();
if (xaSession == null && serverSessionPool.isTransacted())
{
session.rollback();
}
}
else if (trans.getStatus() == Status.STATUS_ACTIVE)
{
if( trace )
log.trace("Commiting the JMS transaction");
tm.commit();
if (xaSession == null && serverSessionPool.isTransacted())
{
session.commit();
}
}
}
}
catch (Exception e)
{
log.error("failed to commit/rollback", e);
}
}
if( trace )
log.trace("onMessage done");
}
public void start() throws JMSException
{
log.trace("starting invokes on server session");
if (session != null)
{
try
{
serverSessionPool.getExecutor().execute(this);
}
catch (InterruptedException ignore)
{
}
}
else
{
throw new JMSException("No listener has been specified");
}
}
void close()
{
if (session != null)
{
try
{
session.close();
}
catch (Exception ignore)
{
}
session = null;
}
if (xaSession != null)
{
try
{
xaSession.close();
}
catch (Exception ignore)
{
}
xaSession = null;
}
log.debug("closed");
}
void recycle()
{
serverSessionPool.recycle(this);
}
}