package org.jboss.tm;
import java.rmi.UnexpectedException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkCompletedException;
import javax.resource.spi.work.WorkException;
import javax.transaction.Status;
import javax.transaction.TransactionManager;
import javax.transaction.Transaction;
import javax.transaction.NotSupportedException;
import javax.transaction.SystemException;
import javax.transaction.RollbackException;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.InvalidTransactionException;
import javax.transaction.xa.XAException;
import javax.transaction.xa.Xid;
import org.jboss.logging.Logger;
import org.jboss.util.UnexpectedThrowable;
import org.jboss.util.UnreachableStatementException;
public class TxManager
implements TransactionManager,
TransactionPropagationContextImporter,
TransactionPropagationContextFactory,
TransactionLocalDelegate,
TransactionTimeoutConfiguration,
JBossXATerminator
{
private boolean globalIdsEnabled = false;
private boolean interruptThreads = false;
private Logger log = Logger.getLogger(this.getClass());
private boolean trace = log.isTraceEnabled();
private long timeOut = 5 * 60 * 1000;
private volatile int commitCount;
private volatile int rollbackCount;
private static TxManager singleton = new TxManager();
public static TxManager getInstance()
{
return singleton;
}
private TxManager()
{
TransactionImpl.defaultXidFactory();
}
public void setGlobalIdsEnabled(boolean newValue)
{
XidImpl.setTrulyGlobalIdsEnabled(newValue);
globalIdsEnabled = newValue;
}
public boolean getGlobalIdsEnabled()
{
return globalIdsEnabled;
}
public void setInterruptThreads(boolean interruptThreads)
{
this.interruptThreads = interruptThreads;
}
public boolean isInterruptThreads()
{
return interruptThreads;
}
public void begin()
throws NotSupportedException, SystemException
{
trace = log.isTraceEnabled();
ThreadInfo ti = getThreadInfo();
TransactionImpl current = ti.tx;
if (current != null)
{
if (current.isDone())
disassociateThread(ti);
else
throw new NotSupportedException
("Transaction already active, cannot nest transactions.");
}
long timeout = (ti.timeout == 0) ? timeOut : ti.timeout;
TransactionImpl tx = new TransactionImpl(timeout);
associateThread(ti, tx);
localIdTx.put(tx.getLocalId(), tx);
if (globalIdsEnabled)
globalIdTx.put(tx.getGlobalId(), tx);
if (trace)
log.trace("began tx: " + tx);
}
public void commit()
throws RollbackException,
HeuristicMixedException,
HeuristicRollbackException,
SecurityException,
IllegalStateException,
SystemException
{
ThreadInfo ti = getThreadInfo();
TransactionImpl current = ti.tx;
if (current != null)
{
current.commit();
disassociateThread(ti);
if (trace)
log.trace("commited tx: " + current);
}
else
throw new IllegalStateException("No transaction.");
}
public int getStatus() throws SystemException
{
ThreadInfo ti = getThreadInfo();
TransactionImpl current = ti.tx;
if (current != null)
{
if (current.isDone())
disassociateThread(ti);
else
return current.getStatus();
}
return Status.STATUS_NO_TRANSACTION;
}
public Transaction getTransaction() throws SystemException
{
ThreadInfo ti = getThreadInfo();
TransactionImpl current = ti.tx;
if (current != null && current.isDone())
{
current = null;
disassociateThread(ti);
}
return current;
}
public void resume(Transaction transaction)
throws InvalidTransactionException,
IllegalStateException,
SystemException
{
if (transaction != null && !(transaction instanceof TransactionImpl))
throw new RuntimeException("Not a TransactionImpl, but a " +
transaction.getClass().getName());
ThreadInfo ti = getThreadInfo();
TransactionImpl current = ti.tx;
if (current != null)
{
if (current.isDone())
current = ti.tx = null;
else
throw new IllegalStateException("Already associated with a tx");
}
if (current != transaction)
{
associateThread(ti, (TransactionImpl)transaction);
}
if (trace)
log.trace("resumed tx: " + ti.tx);
}
public Transaction suspend() throws SystemException
{
ThreadInfo ti = getThreadInfo();
TransactionImpl current = ti.tx;
if (current != null)
{
ti.tx = null;
if (trace)
log.trace("suspended tx: " + current);
if (current.isDone())
current = null;
}
return current;
}
public void rollback()
throws IllegalStateException, SecurityException, SystemException
{
ThreadInfo ti = getThreadInfo();
TransactionImpl current = ti.tx;
if (current != null)
{
if (!current.isDone())
{
current.rollback();
if (trace)
log.trace("rolled back tx: " + current);
return;
}
disassociateThread(ti);
}
throw new IllegalStateException("No transaction.");
}
public void setRollbackOnly()
throws IllegalStateException, SystemException
{
ThreadInfo ti = getThreadInfo();
TransactionImpl current = ti.tx;
if (current != null)
{
if (!current.isDone())
{
current.setRollbackOnly();
if (trace)
log.trace("tx marked for rollback only: " + current);
return;
}
ti.tx = null;
}
throw new IllegalStateException("No transaction.");
}
public int getTransactionTimeout()
{
return (int) (getThreadInfo().timeout / 1000);
}
public void setTransactionTimeout(int seconds)
throws SystemException
{
getThreadInfo().timeout = 1000 * seconds;
if (trace)
log.trace("tx timeout is now: " + seconds + "s");
}
public void setDefaultTransactionTimeout(int seconds)
{
timeOut = 1000L * seconds;
if (trace)
log.trace("default tx timeout is now: " + seconds + "s");
}
public int getDefaultTransactionTimeout()
{
return (int) (timeOut / 1000);
}
public Transaction disassociateThread()
{
return disassociateThread(getThreadInfo());
}
private Transaction disassociateThread(ThreadInfo ti) {
TransactionImpl current = ti.tx;
ti.tx=null;
current.disassociateCurrentThread();
return current;
}
public void associateThread(Transaction transaction)
{
if (transaction != null && !(transaction instanceof TransactionImpl))
throw new RuntimeException("Not a TransactionImpl, but a " +
transaction.getClass().getName());
TransactionImpl transactionImpl = (TransactionImpl) transaction;
ThreadInfo ti = getThreadInfo();
ti.tx = transactionImpl;
transactionImpl.associateCurrentThread();
}
private void associateThread(ThreadInfo ti, TransactionImpl transaction)
{
ti.tx = transaction;
transaction.associateCurrentThread();
}
public int getTransactionCount()
{
return localIdTx.size();
}
public long getCommitCount()
{
return commitCount;
}
public long getRollbackCount()
{
return rollbackCount;
}
public Transaction importTransactionPropagationContext(Object tpc)
{
if (tpc instanceof LocalId)
{
LocalId id = (LocalId) tpc;
return (Transaction) localIdTx.get(id);
}
else if (globalIdsEnabled && tpc instanceof GlobalId)
{
GlobalId id = (GlobalId) tpc;
Transaction tx = (Transaction) globalIdTx.get(id);
if (trace)
{
if (tx != null)
log.trace("Successfully imported transaction context " + tpc);
else
log.trace("Could not import transaction context " + tpc);
}
return tx;
}
log.warn("Cannot import transaction propagation context: " + tpc);
return null;
}
public Object getTransactionPropagationContext()
{
return getTransactionPropagationContext(getThreadInfo().tx);
}
public Object getTransactionPropagationContext(Transaction tx)
{
if (tx == null)
return null;
if (!(tx instanceof TransactionImpl))
{
log.warn("Cannot export transaction propagation context: " + tx);
return null;
}
return ((TransactionImpl) tx).getLocalId();
}
public void registerWork(Work work, Xid xid, long timeout) throws WorkCompletedException
{
if (trace)
log.trace("registering work=" + work + " xid=" + xid + " timeout=" + timeout);
try
{
TransactionImpl tx = importExternalTransaction(xid, timeout);
tx.setWork(work);
}
catch (WorkCompletedException e)
{
throw e;
}
catch (Throwable t)
{
WorkCompletedException e = new WorkCompletedException("Error registering work", t);
e.setErrorCode(WorkException.TX_RECREATE_FAILED);
throw e;
}
if (trace)
log.trace("registered work= " + work + " xid=" + xid + " timeout=" + timeout);
}
public void startWork(Work work, Xid xid) throws WorkCompletedException
{
if (trace)
log.trace("starting work="+ work +" xid=" + xid);
TransactionImpl tx = getExternalTransaction(xid);
associateThread(tx);
if (trace)
log.trace("started work= " + work + " xid=" + xid);
}
public void endWork(Work work, Xid xid)
{
if (trace)
log.trace("ending work="+ work +" xid=" + xid);
try
{
TransactionImpl tx = getExternalTransaction(xid);
tx.setWork(null);
disassociateThread();
}
catch (WorkCompletedException e)
{
log.error("Unexpected error from endWork ", e);
throw new UnexpectedThrowable(e.toString());
}
if (trace)
log.trace("ended work="+ work +" xid=" + xid);
}
public void cancelWork(Work work, Xid xid)
{
if (trace)
log.trace("cancling work="+ work +" xid=" + xid);
try
{
TransactionImpl tx = getExternalTransaction(xid);
tx.setWork(null);
}
catch (WorkCompletedException e)
{
log.error("Unexpected error from cancelWork ", e);
throw new UnexpectedThrowable(e.toString());
}
if (trace)
log.trace("cancled work="+ work +" xid=" + xid);
}
public int prepare(Xid xid) throws XAException
{
if (trace)
log.trace("preparing xid=" + xid);
try
{
TransactionImpl tx = getExternalTransaction(xid);
int result = tx.prepare();
if (trace)
log.trace("prepared xid=" + xid + " result=" + result);
return result;
}
catch (Throwable t)
{
JBossXAException.rethrowAsXAException("Error during prepare", t);
throw new UnreachableStatementException();
}
}
public void rollback(Xid xid) throws XAException
{
if (trace)
log.trace("rolling back xid=" + xid);
try
{
TransactionImpl tx = getExternalTransaction(xid);
tx.rollback();
}
catch (Throwable t)
{
JBossXAException.rethrowAsXAException("Error during rollback", t);
}
if (trace)
log.trace("rolled back xid=" + xid);
}
public void commit(Xid xid, boolean onePhase) throws XAException
{
if (trace)
log.trace("committing xid=" + xid + " onePhase=" + onePhase);
try
{
TransactionImpl tx = getExternalTransaction(xid);
tx.commit(onePhase);
}
catch (Throwable t)
{
JBossXAException.rethrowAsXAException("Error during commit", t);
}
if (trace)
log.trace("committed xid=" + xid);
}
public void forget(Xid xid) throws XAException
{
if (trace)
log.trace("forgetting xid=" + xid);
try
{
TransactionImpl tx = getExternalTransaction(xid);
tx.rollback();
}
catch (Throwable t)
{
JBossXAException.rethrowAsXAException("Error during forget", t);
}
if (trace)
log.trace("forgot xid=" + xid);
}
public Xid[] recover(int flag) throws XAException
{
return new Xid[0];
}
TransactionImpl importExternalTransaction(Xid xid, long timeOut)
{
GlobalId gid = new GlobalId(xid);
TransactionImpl tx = (TransactionImpl) globalIdTx.get(gid);
if (tx != null)
{
if (trace)
log.trace("imported existing transaction xid: " + xid + " tx=" + tx);
}
else
{
ThreadInfo ti = getThreadInfo();
long timeout = (ti.timeout == 0) ? timeOut : ti.timeout;
tx = new TransactionImpl(gid, timeout);
localIdTx.put(tx.getLocalId(), tx);
if (globalIdsEnabled)
globalIdTx.put(gid, tx);
if (trace)
log.trace("imported new transaction xid: " + xid + " tx=" + tx + " timeout=" + timeout);
}
return tx;
}
TransactionImpl getExternalTransaction(Xid xid) throws WorkCompletedException
{
GlobalId gid = new GlobalId(xid);
TransactionImpl tx = (TransactionImpl) globalIdTx.get(gid);
if (tx == null)
throw new WorkCompletedException("Xid not found " + xid, WorkException.TX_RECREATE_FAILED);
return tx;
}
public Object getValue(TransactionLocal local, Transaction tx)
{
TransactionImpl tximpl = (TransactionImpl) tx;
return tximpl.getTransactionLocalValue(local);
}
public void storeValue(TransactionLocal local, Transaction tx, Object value)
{
TransactionImpl tximpl = (TransactionImpl) tx;
tximpl.putTransactionLocalValue(local, value);
}
public boolean containsValue(TransactionLocal local, Transaction tx)
{
TransactionImpl tximpl = (TransactionImpl) tx;
return tximpl.containsTransactionLocal(local);
}
void releaseTransactionImpl(TransactionImpl tx)
{
localIdTx.remove(tx.getLocalId());
if (globalIdsEnabled)
globalIdTx.remove(tx.getGlobalId());
}
void incCommitCount()
{
++commitCount;
}
void incRollbackCount()
{
++rollbackCount;
}
private ThreadLocal threadTx = new ThreadLocal();
private Map localIdTx = Collections.synchronizedMap(new HashMap());
private Map globalIdTx = Collections.synchronizedMap(new HashMap());
private ThreadInfo getThreadInfo()
{
ThreadInfo ret = (ThreadInfo) threadTx.get();
if (ret == null)
{
ret = new ThreadInfo();
ret.timeout = timeOut;
threadTx.set(ret);
}
return ret;
}
static class ThreadInfo
{
long timeout;
TransactionImpl tx;
}
}