package org.jboss.cache.interceptors;
import org.jboss.cache.*;
import org.jboss.util.NestedRuntimeException;
import org.jgroups.Address;
import org.jgroups.blocks.MethodCall;
import javax.transaction.*;
import java.util.*;
import java.lang.reflect.Method;
public class ReplicationInterceptor extends Interceptor implements Replicatable {
TransactionManager tx_mgr=null;
TransactionTable tx_table=null;
private List transactions=Collections.synchronizedList(new ArrayList());
private Set remote_transactions=Collections.synchronizedSet(new HashSet());
public ReplicationInterceptor() {
}
public void setCache(TreeCache cache) {
super.setCache(cache);
tx_mgr=cache.getTransactionManager();
cache.setReplicationHandler(this);
tx_table=cache.getTransactionTable();
}
public Object invoke(MethodCall m) throws Throwable {
Transaction tx=null;
Object retval=super.invoke(m);
if(tx_mgr != null && (tx=tx_mgr.getTransaction()) != null && isValid(tx)) { if(!transactions.contains(tx)) {
GlobalTransaction gtx=cache.getCurrentTransaction(tx);
if(gtx == null)
throw new Exception("failed to get global transaction");
if(remote_transactions.contains(gtx)) {
if(log.isTraceEnabled())
log.trace("ReplicationInterceptor: won't register for TX completion as " +
"GlobalTransaction is result of a PREPARE");
}
else {
OrderedSynchronizationHandler handler=OrderedSynchronizationHandler.getInstance(tx);
SynchronizationHandler myHandler=new SynchronizationHandler(gtx, tx, cache);
if(log.isTraceEnabled())
log.trace("registering for TX completion: SynchronizationHandler(" + myHandler + ")");
handler.registerAtHead(myHandler); }
transactions.add(tx);
}
return retval;
}
else if(TreeCache.isCrudMethod(m.getMethod())) {
handleReplicatedMethod(m, cache.getCacheModeInternal());
}
return retval;
}
void handleReplicatedMethod(MethodCall m, int mode) throws Throwable {
if(mode == TreeCache.REPL_SYNC && m.equals(TreeCache.putFailFastKeyValueMethodLocal)) {
if(log.isTraceEnabled())
log.trace("forcing asynchronous replication for putFailFast()");
mode=TreeCache.REPL_ASYNC;
}
if(log.isTraceEnabled()) {
log.trace(new StringBuffer().append("invoking method ").append(m.getName()).append("(").append(m.getArgs()).
append(")").append(", members=").append(cache.getMembers()).append(", mode=").
append(cache.getCacheMode()).append(", exclude_self=").append(true).append(", timeout=").
append(cache.getSyncReplTimeout()).toString());
}
switch(cache.getCacheModeInternal()) {
case TreeCache.REPL_ASYNC:
if(cache.getUseReplQueue() && cache.getReplQueue() != null) {
cache.getReplQueue().add(m);
}
else
cache.callRemoteMethods(cache.getMembers(), TreeCache.replicateMethod,
new Object[]{m},
false, true, cache.getSyncReplTimeout());
break;
case TreeCache.REPL_SYNC:
List rsps=cache.callRemoteMethods(cache.getMembers(), TreeCache.replicateMethod,
new Object[]{m}, true, true, cache.getSyncReplTimeout());
if(log.isTraceEnabled())
log.trace("responses=" + rsps);
checkResponses(rsps);
break;
}
}
private void checkResponses(List rsps) throws Exception {
Object rsp;
if(rsps != null) {
for(Iterator it=rsps.iterator(); it.hasNext();) {
rsp=it.next();
if(rsp != null) {
if(rsp instanceof RuntimeException)
throw (RuntimeException)rsp;
if(rsp instanceof Exception)
throw (Exception)rsp;
}
}
}
}
public Object replicate(MethodCall method_call) throws Throwable {
if(method_call == null)
throw new NullPointerException("method call is null");
if(log.isTraceEnabled())
log.trace("replicate(): received " + method_call);
Method meth=method_call.getMethod();
if(meth.equals(TreeCache.prepareMethod)) {
Object[] args=method_call.getArgs();
GlobalTransaction gtx=(GlobalTransaction)args[0]; List modifications=(List)args[1];
Address coordinator=(Address)args[2];
boolean commit=((Boolean)args[3]).booleanValue();
if(coordinator != null && coordinator.equals(cache.getLocalAddress())) {
if(log.isTraceEnabled())
log.trace("received my own message (discarding it)");
return null;
}
remote_transactions.add(gtx);
handlePrepare(gtx, modifications, commit);
return null;
}
else if(meth.equals(TreeCache.commitMethod) || meth.equals(TreeCache.rollbackMethod)) {
Object[] args=method_call.getArgs();
GlobalTransaction gtx=(GlobalTransaction)args[0]; Transaction ltx=tx_table.getLocalTransaction(gtx), curr_tx=null;
if(ltx != null) {
if(log.isTraceEnabled())
log.trace("received " + meth.getName() + ": local TX=" + ltx + ", global TX=" + gtx);
}
else {
throw new IllegalStateException("found no local TX for global TX=" + gtx);
}
try {
curr_tx=tx_mgr.suspend();
if(log.isTraceEnabled())
log.trace("executing " + meth.getName() + "() with local TX " + ltx);
tx_mgr.resume(ltx);
if(meth.equals(TreeCache.rollbackMethod))
ltx.rollback();
else
ltx.commit();
remote_transactions.remove(gtx);
}
finally {
tx_mgr.suspend(); if(curr_tx != null)
tx_mgr.resume(curr_tx);
}
return null;
}
return super.invoke(method_call);
}
public void replicate(List method_calls) throws Throwable {
if(method_calls == null) return;
for(Iterator it=method_calls.iterator(); it.hasNext();) {
MethodCall c=(MethodCall)it.next();
try {
super.invoke(c);
}
catch(Throwable t) {
log.error("failed executing method call", t);
}
}
}
private Transaction createNewLocalTransaction(GlobalTransaction gtx) throws Exception {
Transaction local_tx;
if(tx_mgr == null)
throw new Exception("failed to create local transaction: TransactionManager is null");
tx_mgr.begin();
local_tx=tx_mgr.getTransaction();
tx_table.put(local_tx, gtx);
return local_tx;
}
private void handlePrepare(GlobalTransaction gtx, List modifications, boolean commit) throws Exception {
MethodCall method_call;
boolean success=true;
Transaction curr_tx, ltx;
TransactionEntry entry;
ltx=tx_table.getLocalTransaction(gtx);
curr_tx=tx_mgr.suspend();
try {
if(ltx == null) {
ltx=createNewLocalTransaction(gtx); if(log.isTraceEnabled())
log.trace("(" + cache.getLocalAddress() + "): started new local TX as result of PREPARE: local TX=" +
ltx + ", global TX=" + gtx);
}
else {
tx_mgr.resume(ltx);
if(log.isTraceEnabled())
log.trace("resuming existing transaction " + ltx + ", global TX=" + gtx);
}
if((entry=tx_table.get(gtx)) == null) {
entry=new TransactionEntry();
entry.setTransaction(ltx);
tx_table.put(gtx, entry);
}
try {
if(modifications != null) {
Object retval;
for(Iterator it=modifications.iterator(); it.hasNext();) {
method_call=(MethodCall)it.next();
try {
retval=super.invoke(method_call); }
catch(Throwable t) {
log.error("method invocation failed", t);
retval=t;
}
if(retval != null && retval instanceof Exception) {
success=false;
throw (Exception)retval;
}
}
}
}
finally {
if(commit) {
if(success)
ltx.commit();
else
ltx.rollback();
}
}
}
finally {
tx_mgr.suspend(); if(curr_tx != null)
tx_mgr.resume(curr_tx);
}
}
class SynchronizationHandler implements Synchronization {
Transaction tx=null;
GlobalTransaction gtx=null;
TreeCache cache=null;
List modifications=null;
SynchronizationHandler(GlobalTransaction gtx, Transaction tx, TreeCache cache) {
this.gtx=gtx;
this.tx=tx;
this.cache=cache;
}
public void beforeCompletion() {
TransactionEntry entry=tx_table.get(gtx);
if(entry == null)
throw new IllegalStateException("cannot find transaction entry for " + gtx);
modifications=new LinkedList(entry.getModifications());
if(modifications.size() == 0) return;
if(cache.getCacheModeInternal() != TreeCache.REPL_SYNC)
return;
try {
int status=tx.getStatus();
switch(status) {
case Status.STATUS_ACTIVE: case Status.STATUS_COMMITTING:
case Status.STATUS_PREPARING: try {
MethodCall prepare_method;
prepare_method=new MethodCall(TreeCache.prepareMethod,
new Object[]{gtx, modifications, (Address)cache.getLocalAddress(),
Boolean.FALSE}); runPreparePhase(gtx,
prepare_method,
(Address)cache.getLocalAddress(),
modifications,
false); }
catch(Throwable t) {
log.warn("runPreparePhase() failed. Transaction is marked as rolled back", t);
tx.setRollbackOnly();
throw t;
}
break;
}
}
catch(Throwable t) {
throw new NestedRuntimeException("", t);
}
}
public void afterCompletion(int status) {
int cache_mode=cache.getCacheModeInternal();
transactions.remove(tx);
switch(status) {
case Status.STATUS_COMMITTED:
if(modifications != null && modifications.size() > 0) {
switch(cache_mode) {
case TreeCache.REPL_ASYNC:
replicateAsynchronously(gtx, modifications);
break;
case TreeCache.REPL_SYNC:
runCommitPhase(gtx); break;
}
}
break;
case Status.STATUS_MARKED_ROLLBACK: case Status.STATUS_ROLLEDBACK:
if(log.isDebugEnabled())
log.debug("afterCompletion(): rolling back transaction");
if(modifications != null && modifications.size() > 0) {
switch(cache_mode) {
case TreeCache.REPL_SYNC:
runRollbackPhase(gtx);
break;
}
}
break;
default:
throw new IllegalStateException("illegal status: " + status);
}
}
public String toString() {
return "ReplicationInterceptor(gtx=" + gtx + ", tx=" + tx + ")";
}
}
protected void runPreparePhase(GlobalTransaction tx,
MethodCall prepare_method,
Address coordinator,
List modifications,
boolean async) throws Exception {
List rsps;
int num_mods=modifications != null? modifications.size() : 0;
if(log.isTraceEnabled())
log.trace("(" + cache.getLocalAddress() + "): running remote prepare for " + tx + " with async mode=" + async + " and coord=" + coordinator +
" (" + num_mods + " modifications): " + modifications); rsps=cache.callRemoteMethods(cache.getMembers(),
TreeCache.replicateMethod,
new Object[]{prepare_method},
!async, true, cache.getSyncReplTimeout());
if(!async && rsps != null)
checkResponses(rsps); }
protected void runCommitPhase(GlobalTransaction gtx) {
boolean sync_commit_phase=cache.getSyncCommitPhase();
try {
MethodCall commit_method=new MethodCall(TreeCache.commitMethod, new Object[]{gtx});
if(log.isTraceEnabled())
log.trace("running remote commit for " + gtx + " with async mode=" + !sync_commit_phase +
" and coord=" + cache.getLocalAddress());
cache.callRemoteMethods(cache.getMembers(), TreeCache.replicateMethod, new Object[]{commit_method},
sync_commit_phase, true, cache.getSyncReplTimeout());
}
catch(Throwable e) {
log.error("commit failed", e);
}
}
protected void runRollbackPhase(GlobalTransaction gtx) {
boolean sync_rollback_phase=cache.getSyncRollbackPhase();
try {
MethodCall rollback_method=new MethodCall(TreeCache.rollbackMethod, new Object[]{gtx});
if(log.isTraceEnabled())
log.trace("running remote rollback for " + gtx + " with async mode=" + sync_rollback_phase +
" and coord=" + cache.getLocalAddress());
cache.callRemoteMethods(cache.getMembers(), TreeCache.replicateMethod, new Object[]{rollback_method},
sync_rollback_phase, true, cache.getSyncReplTimeout());
}
catch(Throwable e) {
log.error("rollback failed", e);
}
}
void replicateAsynchronously(GlobalTransaction gtx, List modifications) {
MethodCall mc;
try {
mc=new MethodCall(TreeCache.prepareMethod,
new Object[]{gtx, modifications, (Address)cache.getLocalAddress(),
Boolean.TRUE}); if(cache.getUseReplQueue() && cache.getReplQueue() != null) {
cache.getReplQueue().add(new MethodCall(TreeCache.replicateMethod, new Object[]{mc}));
}
else {
runPreparePhase(gtx,
mc,
(Address)cache.getLocalAddress(),
modifications,
true); }
}
catch(Throwable t) {
log.warn("failed to replicate asynchronously", t);
}
}
}