package org.jboss.ejb.plugins.lock;
import java.lang.reflect.Method;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.ArrayList;
import javax.transaction.Transaction;
import javax.transaction.Status;
import org.jboss.invocation.Invocation;
import org.jboss.ejb.Container;
import org.jboss.ejb.EntityContainer;
import org.jboss.ejb.EntityEnterpriseContext;
import org.jboss.monitor.LockMonitor;
import org.jboss.util.deadlock.DeadlockDetector;
public class QueuedPessimisticEJBLock extends BeanLockSupport
{
private HashMap txLocks = new HashMap();
private LinkedList txWaitQueue = new LinkedList();
private boolean isReadOnlyTxLock = true;
private int txIdGen = 0;
protected LockMonitor lockMonitor = null;
protected boolean deadlockDetection = true;
public void setContainer(Container container)
{
this.container = container;
lockMonitor = container.getLockManager().getLockMonitor();
}
public boolean getDeadlockDetection()
{
return deadlockDetection;
}
public void setDeadlockDetection(boolean flag)
{
this.deadlockDetection = flag;
}
private class TxLock
{
public Transaction waitingTx = null;
public int id = 0;
public String threadName;
public boolean isQueued;
public Object deadlocker;
public TxLock(Transaction trans)
{
this.threadName = Thread.currentThread().toString();
this.waitingTx = trans;
if (trans == null)
{
if (txIdGen < 0) txIdGen = 0;
this.id = txIdGen++;
deadlocker = Thread.currentThread();
}
else
{
deadlocker = trans;
}
this.isQueued = true;
}
public boolean equals(Object obj)
{
if (obj == this) return true;
TxLock lock = (TxLock) obj;
if (lock.waitingTx == null && this.waitingTx == null)
{
return lock.id == this.id;
}
else if (lock.waitingTx != null && this.waitingTx != null)
{
return lock.waitingTx.equals(this.waitingTx);
}
return false;
}
public int hashCode()
{
return this.id;
}
public String toString()
{
StringBuffer buffer = new StringBuffer(100);
buffer.append("TXLOCK waitingTx=").append(waitingTx);
buffer.append(" id=").append(id);
buffer.append(" thread=").append(threadName);
buffer.append(" queued=").append(isQueued);
return buffer.toString();
}
}
protected TxLock getTxLock(Transaction miTx)
{
TxLock lock = null;
if (miTx == null)
{
lock = new TxLock(null);
txWaitQueue.addLast(lock);
}
else
{
TxLock key = new TxLock(miTx);
lock = (TxLock) txLocks.get(key);
if (lock == null)
{
txLocks.put(key, key);
txWaitQueue.addLast(key);
lock = key;
}
}
return lock;
}
protected boolean isTxExpired(Transaction miTx) throws Exception
{
if (miTx != null && miTx.getStatus() == Status.STATUS_MARKED_ROLLBACK)
{
return true;
}
return false;
}
public void schedule(Invocation mi) throws Exception
{
boolean threadScheduled = false;
while (!threadScheduled)
{
threadScheduled = doSchedule(mi);
}
if (mi.getTransaction() != null)
{
Method method = mi.getMethod();
isReadOnlyTxLock =
isReadOnlyTxLock &&
(
((EntityContainer) container).isReadOnly() ||
(
method != null &&
container.getBeanMetaData().isMethodReadOnly(method.getName())
)
);
}
}
protected boolean doSchedule(Invocation mi)
throws Exception
{
boolean wasThreadScheduled = false;
Transaction miTx = mi.getTransaction();
boolean trace = log.isTraceEnabled();
this.sync();
try
{
if (trace) log.trace("Begin schedule, key=" + mi.getId());
if (isTxExpired(miTx))
{
log.error("Saw rolled back tx=" + miTx);
throw new RuntimeException("Transaction marked for rollback, possibly a timeout");
}
long startWait = System.currentTimeMillis();
try
{
wasThreadScheduled = waitForTx(miTx, trace);
if (wasThreadScheduled && lockMonitor != null)
{
long endWait = System.currentTimeMillis() - startWait;
lockMonitor.finishedContending(endWait);
}
}
catch (Exception throwable)
{
if (lockMonitor != null && isTxExpired(miTx))
{
lockMonitor.increaseTimeouts();
}
if (lockMonitor != null)
{
long endWait = System.currentTimeMillis() - startWait;
lockMonitor.finishedContending(endWait);
}
throw throwable;
}
}
finally
{
if (miTx == null && wasThreadScheduled)
{
nextTransaction();
}
this.releaseSync();
}
return true;
}
protected boolean waitForTx(Transaction miTx, boolean trace) throws Exception
{
boolean wasScheduled = false;
TxLock txLock = null;
Object deadlocker = miTx;
if (deadlocker == null) deadlocker = Thread.currentThread();
while (getTransaction() != null &&
!getTransaction().equals(miTx))
{
try
{
if( deadlockDetection == true )
DeadlockDetector.singleton.deadlockDetection(deadlocker, this);
}
catch (Exception e)
{
if (txLock != null && txLock.isQueued)
{
txLocks.remove(txLock);
txWaitQueue.remove(txLock);
}
throw e;
}
wasScheduled = true;
if (lockMonitor != null) lockMonitor.contending();
if (trace) log.trace("Transactional contention on context" + id);
if (txLock == null)
txLock = getTxLock(miTx);
if (trace) log.trace("Begin wait on Tx=" + getTransaction());
synchronized (txLock)
{
releaseSync();
try
{
txLock.wait(txTimeout);
}
catch (InterruptedException ignored)
{
}
}
this.sync();
if (trace) log.trace("End wait on TxLock=" + getTransaction());
if (isTxExpired(miTx))
{
log.error(Thread.currentThread() + "Saw rolled back tx=" + miTx + " waiting for txLock"
);
if (txLock.isQueued)
{
txLocks.remove(txLock);
txWaitQueue.remove(txLock);
}
else if (getTransaction() != null && getTransaction().equals(miTx))
{
nextTransaction();
}
if (miTx != null)
{
if( deadlockDetection == true )
DeadlockDetector.singleton.removeWaiting(deadlocker);
}
throw new RuntimeException("Transaction marked for rollback, possibly a timeout");
}
}
if (!wasScheduled)
{
setTransaction(miTx);
}
return wasScheduled;
}
protected void nextTransaction()
{
if (synched == null)
{
throw new IllegalStateException("do not call nextTransaction while not synched!");
}
setTransaction(null);
this.isReadOnlyTxLock = true;
if (!txWaitQueue.isEmpty())
{
TxLock thelock = (TxLock) txWaitQueue.removeFirst();
txLocks.remove(thelock);
thelock.isQueued = false;
setTransaction(thelock.waitingTx);
if( deadlockDetection == true )
DeadlockDetector.singleton.removeWaiting(thelock.deadlocker);
synchronized (thelock)
{
thelock.notifyAll();
}
}
else
{
}
}
public void endTransaction(Transaction transaction)
{
nextTransaction();
}
public void wontSynchronize(Transaction trasaction)
{
nextTransaction();
}
public void endInvocation(Invocation mi)
{
Transaction tx = mi.getTransaction();
if (tx != null && tx.equals(getTransaction()))
{
EntityEnterpriseContext ctx = (EntityEnterpriseContext) mi.getEnterpriseContext();
if (ctx == null || ctx.hasTxSynchronization() == false)
endTransaction(tx);
}
}
public void removeRef()
{
refs--;
if (refs == 0 && txWaitQueue.size() > 0)
{
log.error("removing bean lock and it has tx's in QUEUE! " + toString());
throw new IllegalStateException("removing bean lock and it has tx's in QUEUE!");
}
else if (refs == 0 && getTransaction() != null)
{
log.error("removing bean lock and it has tx set! " + toString());
throw new IllegalStateException("removing bean lock and it has tx set!");
}
else if (refs < 0)
{
log.error("negative lock reference count should never happen !");
throw new IllegalStateException("negative lock reference count !");
}
}
public String toString()
{
StringBuffer buffer = new StringBuffer(100);
buffer.append(super.toString());
buffer.append(", bean=").append(container.getBeanMetaData().getEjbName());
buffer.append(", id=").append(id);
buffer.append(", refs=").append(refs);
buffer.append(", tx=").append(getTransaction());
buffer.append(", synched=").append(synched);
buffer.append(", timeout=").append(txTimeout);
buffer.append(", queue=").append(new ArrayList(txWaitQueue));
return buffer.toString();
}
}