package org.jboss.cache.interceptors;
import org.jboss.cache.*;
import org.jboss.cache.lock.IdentityLock;
import org.jboss.cache.lock.LockingException;
import org.jboss.cache.lock.TimeoutException;
import org.jgroups.blocks.MethodCall;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.lang.reflect.Method;
import java.util.*;
import EDU.oswego.cs.dl.util.concurrent.ReentrantLock;
public class LockInterceptor extends Interceptor {
private TransactionManager tx_mgr=null;
TransactionTable tx_table=null;
HashMap lock_table;
private long lock_acquisition_timeout;
ReentrantLock create_lock=new ReentrantLock();
private List transactions=Collections.synchronizedList(new ArrayList());
final static int NONE = 0;
final static int READ = 1;
final static int WRITE = 2;
public void setCache(TreeCache cache) {
super.setCache(cache);
tx_mgr=cache.getTransactionManager();
tx_table=cache.getTransactionTable();
lock_table=cache.getLockTable();
lock_acquisition_timeout=cache.getLockAcquisitionTimeout();
}
public Object invoke(MethodCall m) throws Throwable {
Transaction tx=null;
GlobalTransaction gtx=null;
Object retval=null;
Fqn fqn=null;
int lock_type=NONE;
long lock_timeout=lock_acquisition_timeout;
Method meth=m.getMethod();
Object[] args=m.getArgs();
List locks=null;
boolean recursive=false;
boolean createIfNotExists=false;
if(tx_mgr != null && (tx=tx_mgr.getTransaction()) != null && isValid(tx)) { if(!transactions.contains(tx)) {
gtx=cache.getCurrentTransaction(tx);
if(gtx == null)
throw new Exception("failed to get global transaction");
try {
OrderedSynchronizationHandler handler=OrderedSynchronizationHandler.getInstance(tx);
SynchronizationHandler myHandler=new SynchronizationHandler(gtx, tx, cache);
handler.registerAtTail(myHandler); transactions.add(tx);
}
catch(Exception e) {
log.error("registration for tx=" + tx + " with transaction manager failed, running without TX", e);
}
}
else {
gtx=cache.getTransactionTable().get(tx);
}
}
else { locks=(List)lock_table.get(Thread.currentThread());
if(locks == null) {
locks=new LinkedList();
lock_table.put(Thread.currentThread(), locks);
}
}
if(meth.equals(TreeCache.putDataMethodLocal) || meth.equals(TreeCache.putDataEraseMethodLocal) ||
meth.equals(TreeCache.putKeyValMethodLocal) || meth.equals(TreeCache.putFailFastKeyValueMethodLocal)) {
createIfNotExists=true;
fqn=(Fqn)args[1];
lock_type=WRITE;
if(meth.equals(TreeCache.putFailFastKeyValueMethodLocal))
lock_timeout=((Long)args[5]).longValue();
}
else if(meth.equals(TreeCache.removeNodeMethodLocal)) {
fqn=(Fqn)args[1];
lock_type=WRITE;
recursive=true; }
else if(meth.equals(TreeCache.removeKeyMethodLocal) || meth.equals(TreeCache.removeDataMethodLocal)) {
fqn=(Fqn)args[1];
lock_type=WRITE;
}
else if(meth.equals(TreeCache.evictNodeMethodLocal)) {
fqn=(Fqn)args[0];
lock_type=WRITE;
}
else if(meth.equals(TreeCache.addChildMethodLocal)) {
fqn=(Fqn)args[1];
lock_type=WRITE;
}
else if(meth.equals(TreeCache.getKeyValueMethodLocal)) {
fqn=(Fqn)args[0];
lock_type=READ;
}
else if(meth.equals(TreeCache.getNodeMethodLocal)) {
fqn=(Fqn)args[0];
lock_type=READ;
}
else if(meth.equals(TreeCache.getKeysMethodLocal)) {
fqn=(Fqn)args[0];
lock_type=READ;
}
else if(meth.equals(TreeCache.getChildrenNamesMethodLocal) || meth.equals(TreeCache.releaseAllLocksMethodLocal) ||
meth.equals(TreeCache.printMethodLocal)) {
fqn=(Fqn)args[0];
lock_type=READ;
}
else if(meth.equals(TreeCache.lockMethodLocal)) {
fqn=(Fqn)args[0];
lock_type=((Integer)args[1]).intValue();
recursive=((Boolean)args[2]).booleanValue();
}
if(fqn != null) {
if(createIfNotExists) {
do {
lock(fqn, gtx, lock_type, locks, recursive, lock_timeout, createIfNotExists);
}
while(cache.exists(fqn) == false); }
else
lock(fqn, gtx, lock_type, locks, recursive, lock_timeout, createIfNotExists);
}
else {
if(log.isTraceEnabled())
log.trace("bypassed locking as method " + m.getName() + "() doesn't require locking");
}
if(meth.equals(TreeCache.lockMethodLocal))
return null;
retval=super.invoke(m);
return retval;
}
private void lock(Fqn fqn, GlobalTransaction gtx, int lock_type, List locks, boolean recursive,
long lock_timeout, boolean createIfNotExists)
throws TimeoutException, LockingException, InterruptedException {
Node n, child_node=null;
Object child_name;
Fqn tmp_fqn=new Fqn();
int treeNodeSize;
Object owner=gtx != null? gtx : (Object)Thread.currentThread();
boolean acquired=false;
if(fqn == null) {
log.error("fqn is null - this should not be the case");
return;
}
if((treeNodeSize=fqn.size()) == 0)
return;
n=cache.getRoot();
for(int i=0; i < treeNodeSize; i++) {
child_name=fqn.get(i);
tmp_fqn=new Fqn(tmp_fqn, child_name);
if(createIfNotExists)
create_lock.acquire();
try {
child_node=n.getChild(child_name);
if(child_node == null) {
if(createIfNotExists) {
child_node=n.createChild(child_name, tmp_fqn, n);
if(log.isTraceEnabled())
log.trace("created child " + child_name);
if(gtx != null) {
cache.addNode(gtx, (Fqn)tmp_fqn.clone());
}
create_lock.release();
cache.notifyNodeCreated(tmp_fqn);
}
else {
if(log.isTraceEnabled())
log.trace("failed finding child " + child_name + " of node " + n.getFqn());
return;
}
}
}
finally {
if(create_lock.holds() > 0)
create_lock.release();
}
if(lock_type == NONE) {
;
}
else {
if(lock_type == WRITE && i == (treeNodeSize - 1)) {
acquired=child_node.acquire(owner, lock_timeout, Node.LOCK_TYPE_WRITE);
}
else {
acquired=child_node.acquire(owner, lock_timeout, Node.LOCK_TYPE_READ);
}
}
if(acquired) {
if(gtx != null) {
cache.getTransactionTable().addLock(gtx, child_node.getLock());
}
else {
IdentityLock l=child_node.getLock();
if(!locks.contains(l))
locks.add(l);
}
}
if(recursive && i == (treeNodeSize - 1)) {
Set acquired_locks=child_node.acquireAll(owner, lock_timeout, lock_type);
if(acquired_locks.size() > 0) {
if(gtx != null) {
cache.getTransactionTable().addLocks(gtx, acquired_locks);
}
else {
locks.addAll(acquired_locks);
}
}
}
n=child_node;
}
}
private void commit(GlobalTransaction gtx) {
if(log.isTraceEnabled())
log.trace("committing cache with gtx " + gtx);
TransactionEntry entry=tx_table.get(gtx);
if(entry == null) {
log.error("entry for transaction " + gtx + " not found (maybe already committed)");
return;
}
List list=new LinkedList(entry.getLocks());
for(int i=list.size() - 1; i >= 0; i--) {
IdentityLock lock=(IdentityLock)list.get(i);
if(log.isTraceEnabled())
log.trace("releasing lock for " + lock.getFqn() + " (" + lock + ")");
lock.release(gtx);
}
entry.getLocks().clear();
Transaction ltx=entry.getTransaction();
if(log.isTraceEnabled())
log.trace("removing local transaction " + ltx + " and global transaction " + gtx);
tx_table.remove(ltx);
tx_table.remove(gtx);
}
private void rollback(GlobalTransaction tx) {
List undo_ops;
TransactionEntry entry=tx_table.get(tx);
MethodCall undo_op;
Object retval;
Fqn node_name;
if(log.isTraceEnabled())
log.trace("called to rollback cache with GlobalTransaction=" + tx);
if(entry == null) {
log.error("entry for transaction " + tx + " not found (transaction has possibly already been rolled back)");
return;
}
undo_ops=new LinkedList(entry.getUndoOperations());
for(ListIterator it=undo_ops.listIterator(undo_ops.size()); it.hasPrevious();) {
undo_op=(MethodCall)it.previous();
try {
retval=undo_op.invoke(cache);
if(retval != null && retval instanceof Throwable) {
log.error("undo operation failed, error=" + retval);
}
}
catch(Throwable t) {
log.error("undo operation failed", t);
}
}
for(ListIterator it=new LinkedList(entry.getNodes()).listIterator(entry.getNodes().size());
it.hasPrevious();) {
node_name=(Fqn)it.previous();
try {
cache._remove(tx, node_name, false);
}
catch(Throwable t) {
log.error("failed removing node \"" + node_name + "\"", t);
}
}
List list=new LinkedList(entry.getLocks());
for(int i=list.size() - 1; i >= 0; i--) {
IdentityLock lock=(IdentityLock)list.get(i);
if(log.isTraceEnabled())
log.trace("releasing lock for " + lock.getFqn() + " (" + lock + ")");
lock.release(tx);
}
entry.getLocks().clear();
Transaction ltx=entry.getTransaction();
if(log.isTraceEnabled())
log.trace("removing local transaction " + ltx + " and global transaction " + tx);
tx_table.remove(ltx);
tx_table.remove(tx);
}
class SynchronizationHandler implements Synchronization {
GlobalTransaction gtx=null;
Transaction tx=null;
TreeCache cache=null;
SynchronizationHandler(GlobalTransaction gtx, Transaction tx, TreeCache cache) {
this.gtx=gtx;
this.cache=cache;
this.tx=tx;
}
public void beforeCompletion() {
}
public void afterCompletion(int status) {
transactions.remove(tx);
switch(status) {
case Status.STATUS_COMMITTED:
commit(gtx);
break;
case Status.STATUS_MARKED_ROLLBACK: case Status.STATUS_ROLLEDBACK:
if(log.isDebugEnabled())
log.debug("rolling back transaction");
rollback(gtx); break;
default:
rollback(gtx); throw new IllegalStateException("failed rolling back transaction: " + status);
}
}
}
}