package org.jboss.resource.connectionmanager;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import javax.management.ObjectName;
import javax.naming.InitialContext;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionEvent;
import javax.resource.spi.ConnectionRequestInfo;
import javax.resource.spi.ManagedConnection;
import javax.security.auth.Subject;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.jboss.logging.Logger;
import org.jboss.tm.LastResource;
import org.jboss.tm.TxUtils;
public class TxConnectionManager
extends BaseConnectionManager2
implements TxConnectionManagerMBean
{
private ObjectName transactionManagerService;
private String tmName;
private TransactionManager tm;
private boolean trackConnectionByTx = false;
private boolean localTransactions;
public TxConnectionManager()
{
}
public TxConnectionManager (final CachedConnectionManager ccm,
final ManagedConnectionPool poolingStrategy,
final TransactionManager tm)
{
super(ccm, poolingStrategy);
this.tm = tm;
}
public ObjectName getTransactionManagerService()
{
return transactionManagerService;
}
public void setTransactionManagerService(ObjectName transactionManagerService)
{
this.transactionManagerService = transactionManagerService;
}
public void setTransactionManager(final String tmName)
{
this.tmName = tmName;
}
public String getTransactionManager()
{
return this.tmName;
}
public TransactionManager getTransactionManagerInstance()
{
return tm;
}
public void setTransactionManagerInstance(TransactionManager tm)
{
this.tm = tm;
}
public boolean isTrackConnectionByTx()
{
return trackConnectionByTx;
}
public void setTrackConnectionByTx(boolean trackConnectionByTx)
{
this.trackConnectionByTx = trackConnectionByTx;
}
public boolean isLocalTransactions()
{
return localTransactions;
}
public void setLocalTransactions(boolean localTransactions)
{
this.localTransactions = localTransactions;
}
protected void startService() throws Exception
{
if (transactionManagerService != null)
{
tm = (TransactionManager)getServer().getAttribute(transactionManagerService, "TransactionManager");
} else
{
log.warn("----------------------------------------------------------");
log.warn("----------------------------------------------------------");
log.warn("Please change your datasource setup to use <depends optional-attribute-name\"TransactionManagerService\">jboss:service=TransactionManager</depends>");
log.warn("instead of <attribute name=\"TransactionManager\">java:/TransactionManager</attribute>");
log.warn("Better still, use a *-ds.xml file");
log.warn("----------------------------------------------------------");
log.warn("----------------------------------------------------------");
tm = (TransactionManager)new InitialContext().lookup(tmName);
}
super.startService();
}
protected void stopService() throws Exception
{
this.tm = null;
super.stopService();
}
public ConnectionListener getManagedConnection(Subject subject, ConnectionRequestInfo cri)
throws ResourceException
{
Transaction tx = null;
try
{
if (trackConnectionByTx && tm.getStatus() != Status.STATUS_NO_TRANSACTION)
tx = tm.getTransaction();
}
catch (Throwable t)
{
rethrowAsResourceException("Couldn't find current tx", t);
}
if (trace)
log.trace("getManagedConnection trackByTx=" + trackConnectionByTx + " tx=" + tx);
return super.getManagedConnection(tx, subject, cri);
}
public void transactionStarted(Collection crs) throws SystemException
{
Set cls = new HashSet();
for (Iterator i = crs.iterator(); i.hasNext(); )
{
ConnectionRecord cr = (ConnectionRecord)i.next();
ConnectionListener cl = cr.cl;
if (!cls.contains(cl))
{
cls.add(cl);
cl.enlist();
}
} }
protected void managedConnectionReconnected(ConnectionListener cl) throws ResourceException
{
try
{
cl.enlist();
}
catch (SystemException se)
{
log.info("Could not enlist in transaction on entering meta-aware object!", se);
throw new ResourceException("Could not enlist in transaction on entering meta-aware object!" + se);
}
}
protected void managedConnectionDisconnected(ConnectionListener cl) throws ResourceException
{
Throwable throwable = null;
try
{
cl.delist();
}
catch (Throwable t)
{
throwable = t;
}
boolean isFree = cl.isManagedConnectionFree();
if (trace)
log.trace("Disconnected isManagedConnectionFree=" + isFree + " cl=" + cl);
if (isFree)
returnManagedConnection(cl, false);
if (throwable != null)
rethrowAsResourceException("Could not delist resource, probably a transaction rollback?", throwable);
}
public ConnectionListener createConnectionListener(ManagedConnection mc, Object context)
throws ResourceException
{
XAResource xaResource = null;
if (localTransactions)
{
xaResource = new LocalXAResource(log);
} else
{
xaResource = mc.getXAResource();
}
ConnectionListener cli = new TxConnectionEventListener(mc, poolingStrategy, context, log, xaResource);
mc.addConnectionEventListener(cli);
return cli;
}
public boolean isTransactional()
{
return TxUtils.isActive(tm);
}
protected class TxConnectionEventListener
extends BaseConnectionManager2.BaseConnectionEventListener
{
protected Logger log;
protected Transaction currentTx;
private final XAResource xaResource;
public TxConnectionEventListener(final ManagedConnection mc, final ManagedConnectionPool mcp, final Object context, Logger log, final XAResource xaResource) throws ResourceException
{
super(mc, mcp, context, log);
this.log = log;
this.xaResource = xaResource;
if (xaResource instanceof LocalXAResource)
((LocalXAResource) xaResource).setConnectionListener(this);
}
public void enlist() throws SystemException
{
if (isTrackByTx() == false && currentTx != null)
{
log.warn("in Enlisting tx, illegal state: " + currentTx + " ManagedConnection=" + this.getManagedConnection());
throw new IllegalStateException("Can't enlist - already a tx!");
}
if (tm.getStatus() != Status.STATUS_NO_TRANSACTION)
{
Transaction newCurrentTx = tm.getTransaction();
if (currentTx != null && currentTx.equals(newCurrentTx) == false)
{
log.warn("in Enlisting tx, trying to change tx. illegal state: old: " + currentTx + ", new: " + newCurrentTx + ", cel: " + this);
throw new IllegalStateException("Trying to change Tx in enlist!");
}
if (currentTx != null)
{
if (trace)
log.trace("currenttx: " + currentTx + ", already enlisted for ManagedConnection: " + this.getManagedConnection());
return;
}
currentTx = newCurrentTx;
if (trace)
log.trace("enlisting currenttx: " + currentTx + ", ManagedConnection: " + this.getManagedConnection() + " trackByTx=" + isTrackByTx());
}
if (currentTx != null)
{
boolean succeeded = false;
try
{
succeeded = currentTx.enlistResource(getXAResource());
}
catch (Throwable t)
{
setTrackByTx(false);
throw new SystemException("Could not get XAResource from ManagedConnection!" + t);
}
if (succeeded == false)
{
setTrackByTx(false);
throw new SystemException("enlistResource failed");
}
try
{
TransactionSynchronizer.registerTxRemoverSynchronization(currentTx, new TxRemover(currentTx, isTrackByTx()));
}
catch (Throwable t)
{
setTrackByTx(false);
throw new SystemException("Could not register synchronization with tx: " + t);
}
}
}
public void delist() throws ResourceException
{
if (trace)
log.trace("delisting currenttx: " + currentTx + ", ManagedConnection: " + this.getManagedConnection() + " trackByTx=" + isTrackByTx());
try
{
if (isTrackByTx() == false && currentTx != null)
{
try
{
if (TxUtils.isActive(currentTx))
{
if (currentTx.delistResource(getXAResource(), XAResource.TMSUSPEND) == false)
throw new ResourceException("Failure to delist resource");
}
}
finally
{
currentTx = null;
}
}
}
catch (Throwable t)
{
rethrowAsResourceException("Error in delist!", t);
}
}
protected XAResource getXAResource()
{
return xaResource;
}
public void connectionClosed(ConnectionEvent ce)
{
if (trace)
log.trace("connectionClosed called mc=" + this.getManagedConnection());
if (this.getManagedConnection() != (ManagedConnection)ce.getSource())
throw new IllegalArgumentException("ConnectionClosed event received from wrong ManagedConnection! Expected: " + this.getManagedConnection() + ", actual: " + ce.getSource());
try
{
getCcm().unregisterConnection(TxConnectionManager.this, ce.getConnectionHandle());
}
catch (Throwable t)
{
log.info("throwable from unregister connection", t);
}
try
{
unregisterAssociation(this, ce.getConnectionHandle());
boolean isFree = isManagedConnectionFree();
if (trace)
log.trace("isManagedConnectionFree=" + isFree + " mc=" + this.getManagedConnection());
if (isFree)
{
delist();
returnManagedConnection(this, false);
}
}
catch (Throwable t)
{
log.error("Error while closing connection handle!", t);
returnManagedConnection(this, true);
}
}
public void localTransactionStarted(ConnectionEvent ce)
{
}
public void localTransactionCommitted(ConnectionEvent ce)
{
}
public void localTransactionRolledback(ConnectionEvent ce)
{
}
public void connectionErrorOccurred(ConnectionEvent ce)
{
currentTx = null;
super.connectionErrorOccurred(ce);
}
public boolean isManagedConnectionFree()
{
if (isTrackByTx() && currentTx != null)
return false;
return super.isManagedConnectionFree();
}
private class TxRemover implements Synchronization
{
private Transaction tx;
private boolean wasTrackByTx;
public TxRemover(final Transaction tx, final boolean isTrackByTx)
{
this.tx = tx;
wasTrackByTx = isTrackByTx;
}
public void beforeCompletion()
{
}
public void afterCompletion(int status)
{
if (getState() == DESTROYED)
return;
if (currentTx == null || currentTx.equals(tx) == false)
{
if (wasTrackByTx == false)
return;
else
{
String message = "afterCompletion called with wrong tx! Expected: " + currentTx + ", actual: " + tx;
IllegalStateException e = new IllegalStateException(message);
log.error("There is something wrong with the pooling?", e);
}
}
currentTx = null;
if (wasTrackByTx)
{
setTrackByTx(false);
if (isManagedConnectionFree())
returnManagedConnection(TxConnectionEventListener.this, false);
}
}
}
}
private class LocalXAResource implements XAResource, LastResource
{
protected Logger log;
private ConnectionListener cl;
private boolean warned = false;
private Xid currentXid;
public LocalXAResource(final Logger log)
{
this.log = log;
}
void setConnectionListener(ConnectionListener cl)
{
this.cl = cl;
}
public void start(Xid xid, int flags) throws XAException
{
if (trace)
log.trace("start, xid: " + xid + ", flags: " + flags);
if (currentXid != null && flags == XAResource.TMNOFLAGS)
throw new JBossLocalXAException("Trying to start a new tx when old is not complete! old: " + currentXid + ", new " + xid + ", flags " + flags);
if (currentXid == null && flags != XAResource.TMNOFLAGS)
throw new JBossLocalXAException("Trying to start a new tx with wrong flags! new " + xid + ", flags " + flags);
if (currentXid == null)
{
try
{
cl.getManagedConnection().getLocalTransaction().begin();
}
catch (ResourceException re)
{
throw new JBossLocalXAException("Error trying to start local tx: ", re);
} catch (Throwable t)
{
log.info("Throwable trying to start local transaction!", t);
throw new JBossLocalXAException("Throwable trying to start local transaction!", t);
}
currentXid = xid;
} }
public void end(Xid xid, int flags) throws XAException
{
if (trace)
log.trace("end on xid: " + xid + " called with flags " + flags);
}
public void commit(Xid xid, boolean onePhase) throws XAException
{
if (xid.equals(currentXid) == false)
{
throw new JBossLocalXAException("wrong xid in commit: expected: " + currentXid + ", got: " + xid);
} currentXid = null;
try
{
cl.getManagedConnection().getLocalTransaction().commit();
}
catch (ResourceException re)
{
returnManagedConnection(cl, true);
if (trace)
log.trace("commit problem: ", re);
throw new JBossLocalXAException("could not commit local tx", re);
} }
public void forget(Xid xid) throws XAException
{
throw new JBossLocalXAException("forget not supported in local tx");
}
public int getTransactionTimeout() throws XAException
{
return 0;
}
public boolean isSameRM(XAResource xaResource) throws XAException
{
return xaResource == this;
}
public int prepare(Xid xid) throws XAException
{
if (!warned)
{
log.warn("Prepare called on a local tx. Use of local transactions on a jta transaction with more than one branch may result in inconsistent data in some cases of failure.");
} warned = true;
return XAResource.XA_OK;
}
public Xid[] recover(int flag) throws XAException
{
throw new JBossLocalXAException("no recover with local-tx only resource managers");
}
public void rollback(Xid xid) throws XAException
{
if (xid.equals(currentXid) == false)
{
throw new JBossLocalXAException("wrong xid in rollback: expected: " + currentXid + ", got: " + xid);
} currentXid = null;
try
{
cl.getManagedConnection().getLocalTransaction().rollback();
}
catch (ResourceException re)
{
returnManagedConnection(cl, true);
if (trace)
log.trace("rollback problem: ", re);
throw new JBossLocalXAException("could not rollback local tx", re);
} }
public boolean setTransactionTimeout(int seconds) throws XAException {
return false;
}
}
}