| ReplicationInterceptor.java |
/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.interceptors;
import org.jboss.cache.*;
import org.jboss.util.NestedRuntimeException;
import org.jgroups.Address;
import org.jgroups.blocks.MethodCall;
import javax.transaction.*;
import java.util.*;
import java.lang.reflect.Method;
/**
* Takes care of replicating modifications to other nodes in a cluster. Also listens for prepare(),
* commit() and rollback() messages which are received 'side-ways' (see docs/design/Refactoring.txt).
* @author Bela Ban
* @version $Id: ReplicationInterceptor.java,v 1.6.4.4 2005/04/06 21:06:41 starksm Exp $
*/
public class ReplicationInterceptor extends Interceptor implements Replicatable {
TransactionManager tx_mgr=null;
TransactionTable tx_table=null;
/** List<Transaction> that we have registered for */
private List transactions=Collections.synchronizedList(new ArrayList());
/** Set<GlobalTransaction> of GlobalTransactions that originated somewhere else (we didn't create them).
* This is a result of a PREPARE phase. GlobalTransactions in this list should be ignored by this
* interceptor when registering for TX completion
*/
private Set remote_transactions=Collections.synchronizedSet(new HashSet());
/** Map<Transaction,List<Modification>>. Mapping between local transactions and their modifications,
* populated on reception of PREPARE. Elements are removed on TX commit or rollback */
// Map prepare_modifications=new IdentityHashMap();
public ReplicationInterceptor() {
}
public void setCache(TreeCache cache) {
super.setCache(cache);
tx_mgr=cache.getTransactionManager();
cache.setReplicationHandler(this);
tx_table=cache.getTransactionTable();
}
public Object invoke(MethodCall m) throws Throwable {
Transaction tx=null;
Object retval=super.invoke(m); // invoke first
// could be potentially TRANSACTIONAL. If so, we register for transaction completion callbacks (if we
// have not yet done so
if(tx_mgr != null && (tx=tx_mgr.getTransaction()) != null && isValid(tx)) { // ACTIVE or PREPARING
// synchronized(this) { // no sync needed as each thread has at most 1 (different) TX !
if(!transactions.contains(tx)) {
// GlobalTransaction gtx=tx_table.get(tx);
GlobalTransaction gtx=cache.getCurrentTransaction(tx);
if(gtx == null)
throw new Exception("failed to get global transaction");
if(remote_transactions.contains(gtx)) {
if(log.isTraceEnabled())
log.trace("ReplicationInterceptor: won't register for TX completion as " +
"GlobalTransaction is result of a PREPARE");
}
else {
OrderedSynchronizationHandler handler=OrderedSynchronizationHandler.getInstance(tx);
SynchronizationHandler myHandler=new SynchronizationHandler(gtx, tx, cache);
if(log.isTraceEnabled())
log.trace("registering for TX completion: SynchronizationHandler(" + myHandler + ")");
handler.registerAtHead(myHandler); // needs to be invoked first on TX commit
}
transactions.add(tx);
}
// }
return retval;
}
// NON-TRANSACTIONAL and CRUD method; we simply discard non-modifying methods
else if(TreeCache.isCrudMethod(m.getMethod())) {
handleReplicatedMethod(m, cache.getCacheModeInternal());
}
return retval;
}
void handleReplicatedMethod(MethodCall m, int mode) throws Throwable {
if(mode == TreeCache.REPL_SYNC && m.equals(TreeCache.putFailFastKeyValueMethodLocal)) {
if(log.isTraceEnabled())
log.trace("forcing asynchronous replication for putFailFast()");
mode=TreeCache.REPL_ASYNC;
}
if(log.isTraceEnabled()) {
log.trace(new StringBuffer().append("invoking method ").append(m.getName()).append("(").append(m.getArgs()).
append(")").append(", members=").append(cache.getMembers()).append(", mode=").
append(cache.getCacheMode()).append(", exclude_self=").append(true).append(", timeout=").
append(cache.getSyncReplTimeout()).toString());
}
switch(cache.getCacheModeInternal()) {
case TreeCache.REPL_ASYNC:
// 2. Replicate change to all *other* members (exclude self !)
if(cache.getUseReplQueue() && cache.getReplQueue() != null) {
cache.getReplQueue().add(m);
}
else
cache.callRemoteMethods(cache.getMembers(), TreeCache.replicateMethod,
new Object[]{m},
false, // synchronous ?
true, // exclude self ?
cache.getSyncReplTimeout());
break;
case TreeCache.REPL_SYNC:
// REVISIT Needs to exclude itself and apply the local change manually.
// This is needed such that transient field is modified properly in-VM.
List rsps=cache.callRemoteMethods(cache.getMembers(), TreeCache.replicateMethod,
new Object[]{m}, true, true, cache.getSyncReplTimeout());
if(log.isTraceEnabled())
log.trace("responses=" + rsps);
checkResponses(rsps);
break;
}
}
/**
* Checks whether any of the responses are exceptions. If yes, re-throws
* them (as exceptions or runtime exceptions).
*
* @param rsps
* @throws Exception
*/
private void checkResponses(List rsps) throws Exception {
Object rsp;
if(rsps != null) {
for(Iterator it=rsps.iterator(); it.hasNext();) {
rsp=it.next();
if(rsp != null) {
if(rsp instanceof RuntimeException)
throw (RuntimeException)rsp;
if(rsp instanceof Exception)
throw (Exception)rsp;
}
}
}
}
/**
* Received as result of replication sent from another node to this node
* @param method_call
* @throws Throwable
*/
public Object replicate(MethodCall method_call) throws Throwable {
if(method_call == null)
throw new NullPointerException("method call is null");
if(log.isTraceEnabled())
log.trace("replicate(): received " + method_call);
Method meth=method_call.getMethod();
if(meth.equals(TreeCache.prepareMethod)) {
Object[] args=method_call.getArgs();
GlobalTransaction gtx=(GlobalTransaction)args[0]; // guaranteed to be non-null
List modifications=(List)args[1];
Address coordinator=(Address)args[2];
boolean commit=((Boolean)args[3]).booleanValue();
// discard own message. should never happen because runPreparePhase() calls every member except itself !
if(coordinator != null && coordinator.equals(cache.getLocalAddress())) {
if(log.isTraceEnabled())
log.trace("received my own message (discarding it)");
return null;
}
remote_transactions.add(gtx);
handlePrepare(gtx, modifications, commit);
return null;
}
else if(meth.equals(TreeCache.commitMethod) || meth.equals(TreeCache.rollbackMethod)) {
// Find the local transactions associated with gtx, commit or rollback TX
Object[] args=method_call.getArgs();
GlobalTransaction gtx=(GlobalTransaction)args[0]; // guaranteed to be non-null
Transaction ltx=tx_table.getLocalTransaction(gtx), curr_tx=null;
if(ltx != null) {
if(log.isTraceEnabled())
log.trace("received " + meth.getName() + ": local TX=" + ltx + ", global TX=" + gtx);
}
else {
throw new IllegalStateException("found no local TX for global TX=" + gtx);
}
// here we have a non-null ltx
try {
curr_tx=tx_mgr.suspend();
if(log.isTraceEnabled())
log.trace("executing " + meth.getName() + "() with local TX " + ltx);
tx_mgr.resume(ltx);
if(meth.equals(TreeCache.rollbackMethod))
ltx.rollback();
else
ltx.commit();
remote_transactions.remove(gtx);
}
finally {
tx_mgr.suspend(); // should be a no-op as ltx.commit()/rollback() should have disassociated thread already
if(curr_tx != null)
tx_mgr.resume(curr_tx);
}
return null;
}
return super.invoke(method_call);
}
/**
*
* @param method_calls List<MethodCall>
* @throws Throwable
*/
public void replicate(List method_calls) throws Throwable {
if(method_calls == null) return;
for(Iterator it=method_calls.iterator(); it.hasNext();) {
MethodCall c=(MethodCall)it.next();
try {
super.invoke(c);
}
catch(Throwable t) {
log.error("failed executing method call", t);
}
}
}
private Transaction createNewLocalTransaction(GlobalTransaction gtx) throws Exception {
Transaction local_tx;
if(tx_mgr == null)
throw new Exception("failed to create local transaction: TransactionManager is null");
tx_mgr.begin();
local_tx=tx_mgr.getTransaction();
tx_table.put(local_tx, gtx);
return local_tx;
}
/**
* @param gtx The global transaction under which all updates are coordinated across a cluster.
* @param modifications List<MethodCall>. A list of {@link MethodCall} objects
* @param commit Commit or rollback immediately ?
* @throws Exception
*/
private void handlePrepare(GlobalTransaction gtx, List modifications, boolean commit) throws Exception {
MethodCall method_call;
boolean success=true;
Transaction curr_tx, ltx;
TransactionEntry entry;
// Is there a local transaction associated with GTX ?
ltx=tx_table.getLocalTransaction(gtx);
curr_tx=tx_mgr.suspend(); // might be null if not LTX running
try {
if(ltx == null) {
ltx=createNewLocalTransaction(gtx); // creates new LTX and associates it with GTX
if(log.isTraceEnabled())
log.trace("(" + cache.getLocalAddress() + "): started new local TX as result of PREPARE: local TX=" +
ltx + ", global TX=" + gtx);
}
else {
tx_mgr.resume(ltx);
if(log.isTraceEnabled())
log.trace("resuming existing transaction " + ltx + ", global TX=" + gtx);
}
// at this point we have a non-null
// ltx
// 2. Asssociate the local TX with the global TX. Create new entry for TX in tx_table, the modifications
// below will need this entry to add their modifications under the GlobalTx key
if((entry=tx_table.get(gtx)) == null) {
entry=new TransactionEntry();
entry.setTransaction(ltx);
tx_table.put(gtx, entry);
}
// execute calls
// 3. Invoke all MethodCalls in modifications
try {
if(modifications != null) {
Object retval;
for(Iterator it=modifications.iterator(); it.hasNext();) {
method_call=(MethodCall)it.next();
try {
retval=super.invoke(method_call); // pass invocation up the interceptor stack
}
catch(Throwable t) {
log.error("method invocation failed", t);
retval=t;
}
if(retval != null && retval instanceof Exception) {
success=false;
throw (Exception)retval;
}
}
}
}
// 4. If commit == true (one-phase-commit): commit (or rollback) the TX; this will cause
// {before/after}Completion() to be called in all registered interceptors: the TransactionInterceptor
// will then commit/rollback against the cache
finally {
if(commit) {
if(success)
ltx.commit();
else
ltx.rollback();
}
}
}
finally {
tx_mgr.suspend(); // suspends ltx
if(curr_tx != null)
tx_mgr.resume(curr_tx);
}
}
class SynchronizationHandler implements Synchronization {
Transaction tx=null;
GlobalTransaction gtx=null;
TreeCache cache=null;
List modifications=null;
SynchronizationHandler(GlobalTransaction gtx, Transaction tx, TreeCache cache) {
this.gtx=gtx;
this.tx=tx;
this.cache=cache;
}
/**
* Multicast a PREPARE to all nodes in the cluster (except myself), and collect the responses (only if synchronous).
*/
public void beforeCompletion() {
// fetch the modifications before the transaction is committed (and thus removed from the tx_table)
TransactionEntry entry=tx_table.get(gtx);
if(entry == null)
throw new IllegalStateException("cannot find transaction entry for " + gtx);
modifications=new LinkedList(entry.getModifications());
if(modifications.size() == 0) // we don't replicate if there are no modifications in the current transaction
return;
// we only handle sync repl here; async repl is handled in afterCompletion()
if(cache.getCacheModeInternal() != TreeCache.REPL_SYNC)
return;
// REPL_SYNC only from now on
try {
int status=tx.getStatus();
switch(status) {
case Status.STATUS_ACTIVE: // added Feb 2 2004 (bela)
case Status.STATUS_COMMITTING:
case Status.STATUS_PREPARING: // added Nov 18 2004
try {
MethodCall prepare_method;
prepare_method=new MethodCall(TreeCache.prepareMethod,
new Object[]{gtx, modifications, (Address)cache.getLocalAddress(),
Boolean.FALSE}); // don't commit or rollback - wait for call
runPreparePhase(gtx,
prepare_method,
(Address)cache.getLocalAddress(),
modifications,
false); // synchronous
}
catch(Throwable t) {
log.warn("runPreparePhase() failed. Transaction is marked as rolled back", t);
tx.setRollbackOnly();
// change Bela Nov 5 2003: we need to rethrow the exception so the Transaction
// knows it has to set status of TX to rolled back
throw t;
}
break;
}
}
catch(Throwable t) {
throw new NestedRuntimeException("", t);
}
}
public void afterCompletion(int status) {
int cache_mode=cache.getCacheModeInternal();
transactions.remove(tx);
switch(status) {
case Status.STATUS_COMMITTED:
if(modifications != null && modifications.size() > 0) {
switch(cache_mode) {
case TreeCache.REPL_ASYNC:
replicateAsynchronously(gtx, modifications);
break;
case TreeCache.REPL_SYNC:
runCommitPhase(gtx); // commit locally, then send async commit() message to other mbrs
break;
}
}
break;
case Status.STATUS_MARKED_ROLLBACK: // this one is probably not needed
case Status.STATUS_ROLLEDBACK:
if(log.isDebugEnabled())
log.debug("afterCompletion(): rolling back transaction");
if(modifications != null && modifications.size() > 0) {
switch(cache_mode) {
case TreeCache.REPL_SYNC:
runRollbackPhase(gtx);
break;
}
}
break;
default:
throw new IllegalStateException("illegal status: " + status);
}
}
public String toString() {
return "ReplicationInterceptor(gtx=" + gtx + ", tx=" + tx + ")";
}
}
/**
* Calls {@link #prepare(GlobalTransaction,List,org.jgroups.Address,boolean))} in all members except self.
* Waits for all responses. If one of the members failed to prepare, its return value
* will be an exception. If there is one exception we rethrow it. This will mark the
* current transaction as rolled back, which will cause the
* {@link #afterCompletion(int)} callback to have a status
* of <tt>MARKED_ROLLBACK</tt>. When we get that call, we simply roll back the
* transaction.<br/>
* If everything runs okay, the {@link #afterCompletion(int)}
* callback will trigger the {@link #runCommitPhase(GlobalTransaction))}.
* <br/>
*
* @param tx
* @param coordinator
* @param prepare_method The MethodCall representing the prepare() method
* @param async Run this asynchronously
* @throws Exception
*/
protected void runPreparePhase(GlobalTransaction tx,
MethodCall prepare_method,
Address coordinator,
List modifications,
boolean async) throws Exception {
List rsps;
int num_mods=modifications != null? modifications.size() : 0;
// this method will return immediately if we're the only member (because exclude_self=true)
if(log.isTraceEnabled())
log.trace("(" + cache.getLocalAddress() + "): running remote prepare for " + tx + " with async mode=" + async + " and coord=" + coordinator +
" (" + num_mods + " modifications): " + modifications); // todo: remove printing entire mod list !
rsps=cache.callRemoteMethods(cache.getMembers(),
TreeCache.replicateMethod,
new Object[]{prepare_method},
!async, // sync or async call ?
true, // exclude self
cache.getSyncReplTimeout());
if(!async && rsps != null)
checkResponses(rsps); // throws an exception if one of the rsps is an exception
}
/**
* Asynchronously calls {@link #commit(GlobalTransaction)} in all members
*
* @param tx
*/
protected void runCommitPhase(GlobalTransaction gtx) {
boolean sync_commit_phase=cache.getSyncCommitPhase();
// 1. Multicast commit() to all members (exclude myself though)
try {
MethodCall commit_method=new MethodCall(TreeCache.commitMethod, new Object[]{gtx});
if(log.isTraceEnabled())
log.trace("running remote commit for " + gtx + " with async mode=" + !sync_commit_phase +
" and coord=" + cache.getLocalAddress());
cache.callRemoteMethods(cache.getMembers(), TreeCache.replicateMethod, new Object[]{commit_method},
sync_commit_phase, // this is async by default, can be changed in TreeCache
true, // exclude self
cache.getSyncReplTimeout());
}
catch(Throwable e) {
log.error("commit failed", e);
}
}
/**
* Asynchronously calls {@link #rollback(GlobalTransaction)} in all members
* @param tx
*/
protected void runRollbackPhase(GlobalTransaction gtx) {
boolean sync_rollback_phase=cache.getSyncRollbackPhase();
// 1. Multicast rollback() to all other members (excluding myself)
try {
MethodCall rollback_method=new MethodCall(TreeCache.rollbackMethod, new Object[]{gtx});
if(log.isTraceEnabled())
log.trace("running remote rollback for " + gtx + " with async mode=" + sync_rollback_phase +
" and coord=" + cache.getLocalAddress());
cache.callRemoteMethods(cache.getMembers(), TreeCache.replicateMethod, new Object[]{rollback_method},
sync_rollback_phase, // this is by default an async call; can be changed in TreeCache
true, // exclude self
cache.getSyncReplTimeout());
}
catch(Throwable e) {
log.error("rollback failed", e);
}
}
void replicateAsynchronously(GlobalTransaction gtx, List modifications) {
MethodCall mc;
try {
mc=new MethodCall(TreeCache.prepareMethod,
new Object[]{gtx, modifications, (Address)cache.getLocalAddress(),
Boolean.TRUE}); // do commit or rollback after call
if(cache.getUseReplQueue() && cache.getReplQueue() != null) {
cache.getReplQueue().add(new MethodCall(TreeCache.replicateMethod, new Object[]{mc}));
}
else {
// asynchronous PREPARE multicast, excluding self
runPreparePhase(gtx,
mc,
(Address)cache.getLocalAddress(),
modifications,
true); // async
}
}
catch(Throwable t) {
log.warn("failed to replicate asynchronously", t);
}
}
}
| ReplicationInterceptor.java |