package org.jboss.cache.interceptors;
import EDU.oswego.cs.dl.util.concurrent.ReentrantLock;
import org.jboss.cache.*;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.util.NestedRuntimeException;
import org.jgroups.blocks.MethodCall;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.lang.reflect.Method;
import java.util.*;
public class CacheStoreInterceptor extends Interceptor {
private CacheLoader loader=null;
private TransactionManager tx_mgr=null;
private TransactionTable tx_table=null;
private List transactions=Collections.synchronizedList(new ArrayList());
final ReentrantLock lock=new ReentrantLock();
public void setCache(TreeCache cache) {
super.setCache(cache);
this.loader=cache.getCacheLoader();
tx_mgr=cache.getTransactionManager();
tx_table=cache.getTransactionTable();
}
public Object invoke(MethodCall m) throws Throwable {
Fqn fqn;
Map attributes;
Object key, value;
Method meth=m.getMethod();
Object[] args=m.getArgs();
Transaction tx;
Object retval=null, tmp_retval=null;
boolean use_tmp_retval=false;
if(tx_mgr != null && (tx=tx_mgr.getTransaction()) != null && isValid(tx)) { lock.acquire();
try {
if(!transactions.contains(tx)) {
transactions.add(tx);
GlobalTransaction gtx=cache.getCurrentTransaction(tx);
if(gtx == null)
throw new Exception("failed to get global transaction");
tx.registerSynchronization(new SynchronizationHandler(gtx, tx, cache));
}
lock.release();
return super.invoke(m);
}
finally {
if(lock.holds() > 0)
lock.release();
}
}
synchronized(this) {
if(meth.equals(TreeCache.removeNodeMethodLocal)) {
fqn=(Fqn)args[1];
loader.remove(fqn);
}
else if(meth.equals(TreeCache.removeKeyMethodLocal)) {
fqn=(Fqn)args[1];
key=args[2];
tmp_retval=loader.remove(fqn, key);
use_tmp_retval=true;
}
else if(meth.equals(TreeCache.removeDataMethodLocal)) {
fqn=(Fqn)args[1];
loader.removeData(fqn);
}
}
retval=super.invoke(m);
synchronized(this) {
if(meth.equals(TreeCache.putDataMethodLocal) || meth.equals(TreeCache.putDataEraseMethodLocal)) {
fqn=(Fqn)args[1];
attributes=(Map)args[2];
loader.put(fqn, attributes);
}
else if(meth.equals(TreeCache.putKeyValMethodLocal)) {
fqn=(Fqn)args[1];
key=args[2];
value=args[3];
loader.put(fqn, key, value);
}
}
if(use_tmp_retval)
return tmp_retval;
else
return retval;
}
private CacheLoader getLoader() {
return loader != null? loader : (loader=cache.getCacheLoader());
}
private void prepareCacheLoader(GlobalTransaction gtx) throws Exception {
List modifications;
TransactionEntry entry;
entry=tx_table.get(gtx);
if(entry == null)
throw new Exception("entry for transaction " + gtx + " not found in transaction table");
modifications=entry.getModifications();
if(modifications.size() == 0)
return;
List cache_loader_modifications=new ArrayList();
for(Iterator it=modifications.iterator(); it.hasNext();) {
MethodCall methodCall=(MethodCall)it.next();
Modification mod=convertMethodCallToModification(methodCall);
cache_loader_modifications.add(mod);
}
if(cache_loader_modifications.size() > 0) {
loader.prepare(gtx, cache_loader_modifications, false);
}
}
private Modification convertMethodCallToModification(MethodCall methodCall) throws Exception {
Method method=methodCall.getMethod();
Object[] args;
if(method == null)
throw new Exception("method call has no method: " + methodCall);
args=methodCall.getArgs();
if(TreeCache.putDataMethodLocal.equals(method)) {
return new Modification(Modification.PUT_DATA,
(Fqn)args[1], (Map)args[2]); }
else if(TreeCache.putDataEraseMethodLocal.equals(method)) {
return new Modification(Modification.PUT_DATA_ERASE,
(Fqn)args[1], (Map)args[2]); }
else if(TreeCache.putKeyValMethodLocal.equals(method)) {
return new Modification(Modification.PUT_KEY_VALUE,
(Fqn)args[1], args[2], args[3]); }
else if(TreeCache.removeNodeMethodLocal.equals(method)) {
return new Modification(Modification.REMOVE_NODE,
(Fqn)args[1]); }
else if(TreeCache.removeKeyMethodLocal.equals(method)) {
return new Modification(Modification.REMOVE_KEY_VALUE,
(Fqn)args[1], args[2]);
}
else if(TreeCache.removeDataMethodLocal.equals(method)) {
return new Modification(Modification.REMOVE_DATA,
(Fqn)args[1]); }
else
throw new Exception("method call " + method.getName() + " cannot be converted to a modification");
}
class SynchronizationHandler implements Synchronization {
GlobalTransaction gtx=null;
Transaction tx=null;
TreeCache cache=null;
List modifications;
SynchronizationHandler(GlobalTransaction gtx, Transaction tx, TreeCache cache) {
this.gtx=gtx;
this.tx=tx;
this.cache=cache;
TransactionEntry entry=tx_table.get(gtx);
if(entry != null)
modifications=entry.getModifications();
}
public void beforeCompletion() {
try {
int status=tx.getStatus();
switch(status) {
case Status.STATUS_ACTIVE: case Status.STATUS_COMMITTING:
case Status.STATUS_PREPARING:
prepareCacheLoader(gtx);
break;
case Status.STATUS_MARKED_ROLLBACK:
case Status.STATUS_ROLLING_BACK:
break;
default:
log.error("beforeCompletion(). Illegal tx status: " + status);
throw new IllegalStateException("Illegal status: " + status);
}
}
catch(Throwable t) {
throw new NestedRuntimeException("", t);
}
}
public void afterCompletion(int status) {
transactions.remove(tx);
switch(status) {
case Status.STATUS_COMMITTED:
try {
if(modifications == null || modifications.size() == 0)
return;
getLoader().commit(gtx);
}
catch(Exception e) {
log.error("failed committing transaction to cache loader", e);
}
break;
case Status.STATUS_MARKED_ROLLBACK: case Status.STATUS_ROLLEDBACK:
getLoader().rollback(gtx);
break;
}
}
}
}