package org.jboss.mq;
import java.io.Serializable;
import java.util.Map;
import java.util.ArrayList;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
import javax.jms.JMSException;
import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
import org.jboss.logging.Logger;
public class SpyXAResourceManager implements Serializable
{
static final long serialVersionUID = -6268132972627753772L;
private static final Logger log = Logger.getLogger(SpyXAResourceManager.class);
private static boolean trace = log.isTraceEnabled();
private final static byte TX_OPEN = 0;
private final static byte TX_ENDED = 1;
private final static byte TX_PREPARED = 3;
private final static byte TX_COMMITED = 4;
private final static byte TX_ROLLEDBACK = 5;
private Connection connection;
private Map transactions = new ConcurrentReaderHashMap();
private long nextInternalXid = Long.MIN_VALUE;
public SpyXAResourceManager(Connection conn)
{
super();
connection = conn;
}
public void ackMessage(Object xid, SpyMessage msg) throws JMSException
{
if (xid == null)
{
if (trace)
log.trace("No Xid, acking message " + msg.header.jmsMessageID);
msg.doAcknowledge();
return;
}
if (trace)
log.trace("Adding acked message xid=" + xid + " " + msg.header.jmsMessageID);
TXState state = (TXState) transactions.get(xid);
if (state == null)
throw new JMSException("Invalid transaction id.");
AcknowledgementRequest item = msg.getAcknowledgementRequest(true);
state.ackedMessages.add(item);
}
public void addMessage(Object xid, SpyMessage msg) throws JMSException
{
if (xid == null)
{
if (trace)
log.trace("No Xid, sending message to server " + msg.header.jmsMessageID);
connection.sendToServer(msg);
return;
}
if (trace)
log.trace("Adding message xid=" + xid + ", message=" + msg.header.jmsMessageID);
TXState state = (TXState) transactions.get(xid);
if (trace)
log.trace("TXState=" + state);
if (state == null)
throw new JMSException("Invalid transaction id.");
state.sentMessages.add(msg);
}
public void commit(Object xid, boolean onePhase) throws XAException, JMSException
{
if (trace)
log.trace("Commiting xid=" + xid + ", onePhase=" + onePhase);
TXState state = (TXState) transactions.remove(xid);
if (state == null)
throw new XAException(XAException.XAER_NOTA);
if (onePhase)
{
TransactionRequest transaction = new TransactionRequest();
transaction.requestType = TransactionRequest.ONE_PHASE_COMMIT_REQUEST;
transaction.xid = null;
if (state.sentMessages.size() != 0)
{
SpyMessage job[] = new SpyMessage[state.sentMessages.size()];
job = (SpyMessage[]) state.sentMessages.toArray(job);
transaction.messages = job;
}
if (state.ackedMessages.size() != 0)
{
AcknowledgementRequest job[] = new AcknowledgementRequest[state.ackedMessages.size()];
job = (AcknowledgementRequest[]) state.ackedMessages.toArray(job);
transaction.acks = job;
}
connection.send(transaction);
}
else
{
if (state.txState != TX_PREPARED)
throw new XAException("The transaction had not been prepared");
TransactionRequest transaction = new TransactionRequest();
transaction.xid = xid;
transaction.requestType = TransactionRequest.TWO_PHASE_COMMIT_COMMIT_REQUEST;
connection.send(transaction);
}
state.txState = TX_COMMITED;
}
public void endTx(Object xid, boolean success) throws XAException
{
if (trace)
log.trace("Ending xid=" + xid + ", success=" + success);
TXState state = (TXState) transactions.get(xid);
if (state == null)
throw new XAException(XAException.XAER_NOTA);
state.txState = TX_ENDED;
}
public Object joinTx(Xid xid) throws XAException
{
if (trace)
log.trace("Joining tx xid=" + xid);
if (!transactions.containsKey(xid))
throw new XAException(XAException.XAER_NOTA);
return xid;
}
public int prepare(Object xid) throws XAException, JMSException
{
if (trace)
log.trace("Preparing xid=" + xid);
TXState state = (TXState) transactions.get(xid);
if (state == null)
throw new XAException(XAException.XAER_NOTA);
TransactionRequest transaction = new TransactionRequest();
transaction.requestType = TransactionRequest.TWO_PHASE_COMMIT_PREPARE_REQUEST;
transaction.xid = xid;
if (state.sentMessages.size() != 0)
{
SpyMessage job[] = new SpyMessage[state.sentMessages.size()];
job = (SpyMessage[]) state.sentMessages.toArray(job);
transaction.messages = job;
}
if (state.ackedMessages.size() != 0)
{
AcknowledgementRequest job[] = new AcknowledgementRequest[state.ackedMessages.size()];
job = (AcknowledgementRequest[]) state.ackedMessages.toArray(job);
transaction.acks = job;
}
connection.send(transaction);
state.txState = TX_PREPARED;
return javax.transaction.xa.XAResource.XA_OK;
}
public Object resumeTx(Xid xid) throws XAException
{
if (trace)
log.trace("Resuming tx xid=" + xid);
if (!transactions.containsKey(xid))
throw new XAException(XAException.XAER_NOTA);
return xid;
}
public void rollback(Object xid) throws XAException, JMSException
{
if (trace)
log.trace("Rolling back xid=" + xid);
TXState state = (TXState) transactions.remove(xid);
if (state == null)
throw new XAException(XAException.XAER_NOTA);
if (state.txState != TX_PREPARED)
{
TransactionRequest transaction = new TransactionRequest();
transaction.requestType = TransactionRequest.ONE_PHASE_COMMIT_REQUEST;
transaction.xid = null;
if (state.ackedMessages.size() != 0)
{
AcknowledgementRequest job[] = new AcknowledgementRequest[state.ackedMessages.size()];
job = (AcknowledgementRequest[]) state.ackedMessages.toArray(job);
transaction.acks = job;
for (int i = 0; i < transaction.acks.length; i++)
{
transaction.acks[i].isAck = false;
}
}
connection.send(transaction);
}
else
{
TransactionRequest transaction = new TransactionRequest();
transaction.xid = xid;
transaction.requestType = TransactionRequest.TWO_PHASE_COMMIT_ROLLBACK_REQUEST;
connection.send(transaction);
}
state.txState = TX_ROLLEDBACK;
}
public synchronized Long getNewXid()
{
return new Long(nextInternalXid++);
}
public Object startTx()
{
Long newXid = getNewXid();
transactions.put(newXid, new TXState());
if (trace)
log.trace("Starting tx with new xid=" + newXid);
return newXid;
}
public Object startTx(Xid xid) throws XAException
{
if (trace)
log.trace("Starting tx xid=" + xid);
if (transactions.containsKey(xid))
throw new XAException(XAException.XAER_DUPID);
transactions.put(xid, new TXState());
return xid;
}
public Object suspendTx(Xid xid) throws XAException
{
if (trace)
log.trace("Suppending tx xid=" + xid);
if (!transactions.containsKey(xid))
throw new XAException(XAException.XAER_NOTA);
return xid;
}
public Object convertTx(Long anonXid, Xid xid) throws XAException
{
if (trace)
log.trace("Converting tx anonXid=" + anonXid + ", xid=" + xid);
if (!transactions.containsKey(anonXid))
throw new XAException(XAException.XAER_NOTA);
if (transactions.containsKey(xid))
throw new XAException(XAException.XAER_DUPID);
TXState s = (TXState) transactions.remove(anonXid);
transactions.put(xid, s);
return xid;
}
static class TXState
{
byte txState = TX_OPEN;
ArrayList sentMessages = new ArrayList();
ArrayList ackedMessages = new ArrayList();
public String toString()
{
StringBuffer buffer = new StringBuffer(100);
buffer.append("TxState txState=").append(txState);
buffer.append(" sent=").append(sentMessages);
buffer.append(" acks=").append(ackedMessages);
return buffer.toString();
}
}
}