/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.interceptors;

import org.jboss.cache.*;
import org.jboss.util.NestedRuntimeException;
import org.jgroups.Address;
import org.jgroups.blocks.MethodCall;

import javax.transaction.*;
import java.util.*;
import java.lang.reflect.Method;

/**
 * Takes care of replicating modifications to other nodes in a cluster. Also listens for prepare(),
 * commit() and rollback() messages which are received 'side-ways' (see docs/design/Refactoring.txt).
 * @author Bela Ban
 * @version $Id: ReplicationInterceptor.java,v 1.6.4.4 2005/04/06 21:06:41 starksm Exp $
 */
public class ReplicationInterceptor extends Interceptor implements Replicatable {
   TransactionManager tx_mgr=null;
   TransactionTable tx_table=null;

   /** List<Transaction> that we have registered for */
   private List transactions=Collections.synchronizedList(new ArrayList());

   /** Set<GlobalTransaction> of GlobalTransactions that originated somewhere else (we didn't create them).
    * This is a result of a PREPARE phase. GlobalTransactions in this list should be ignored by this
    * interceptor when registering for TX completion
    */
   private Set remote_transactions=Collections.synchronizedSet(new HashSet());


   /** Map<Transaction,List<Modification>>. Mapping between local transactions and their modifications,
    * populated on reception of PREPARE. Elements are removed on TX commit or rollback */
   // Map prepare_modifications=new IdentityHashMap();


   public ReplicationInterceptor() {

   }

   public void setCache(TreeCache cache) {
      super.setCache(cache);
      tx_mgr=cache.getTransactionManager();
      cache.setReplicationHandler(this);
      tx_table=cache.getTransactionTable();
   }


   public Object invoke(MethodCall m) throws Throwable {
      Transaction tx=null;
      Object retval=super.invoke(m); // invoke first

      // could be potentially TRANSACTIONAL. If so, we register for transaction completion callbacks (if we
      // have not yet done so
      if(tx_mgr != null && (tx=tx_mgr.getTransaction()) != null && isValid(tx)) { // ACTIVE or PREPARING
         // synchronized(this) { // no sync needed as each thread has at most 1 (different) TX !
         if(!transactions.contains(tx)) {
            // GlobalTransaction gtx=tx_table.get(tx);
            GlobalTransaction gtx=cache.getCurrentTransaction(tx);
            if(gtx == null)
               throw new Exception("failed to get global transaction");

            if(remote_transactions.contains(gtx)) {
               if(log.isTraceEnabled())
                  log.trace("ReplicationInterceptor: won't register for TX completion as " +
                            "GlobalTransaction is result of a PREPARE");
            }
            else {
               OrderedSynchronizationHandler handler=OrderedSynchronizationHandler.getInstance(tx);
               SynchronizationHandler myHandler=new SynchronizationHandler(gtx, tx, cache);
               if(log.isTraceEnabled())
                  log.trace("registering for TX completion: SynchronizationHandler(" + myHandler + ")");
               handler.registerAtHead(myHandler); // needs to be invoked first on TX commit
            }
            transactions.add(tx);
         }
         // }
         return retval;
      }
      // NON-TRANSACTIONAL and CRUD method; we simply discard non-modifying methods
      else if(TreeCache.isCrudMethod(m.getMethod())) {
         handleReplicatedMethod(m, cache.getCacheModeInternal());
      }
      return retval;
   }


   void handleReplicatedMethod(MethodCall m, int mode) throws Throwable {
      if(mode == TreeCache.REPL_SYNC && m.equals(TreeCache.putFailFastKeyValueMethodLocal)) {
         if(log.isTraceEnabled())
            log.trace("forcing asynchronous replication for putFailFast()");
         mode=TreeCache.REPL_ASYNC;
      }
      if(log.isTraceEnabled()) {
         log.trace(new StringBuffer().append("invoking method ").append(m.getName()).append("(").append(m.getArgs()).
                   append(")").append(", members=").append(cache.getMembers()).append(", mode=").
                   append(cache.getCacheMode()).append(", exclude_self=").append(true).append(", timeout=").
                   append(cache.getSyncReplTimeout()).toString());
      }
      switch(cache.getCacheModeInternal()) {
         case TreeCache.REPL_ASYNC:

            // 2. Replicate change to all *other* members (exclude self !)
            if(cache.getUseReplQueue() && cache.getReplQueue() != null) {
               cache.getReplQueue().add(m);
            }
            else
               cache.callRemoteMethods(cache.getMembers(), TreeCache.replicateMethod,
                                       new Object[]{m},
                                       false, // synchronous ?
                                       true,  // exclude self ?
                                       cache.getSyncReplTimeout());
            break;
         case TreeCache.REPL_SYNC:
            // REVISIT Needs to exclude itself and apply the local change manually.
            // This is needed such that transient field is modified properly in-VM.
            List rsps=cache.callRemoteMethods(cache.getMembers(), TreeCache.replicateMethod,
                                              new Object[]{m}, true, true, cache.getSyncReplTimeout());
            if(log.isTraceEnabled())
               log.trace("responses=" + rsps);
            checkResponses(rsps);
            break;
      }
   }


   /**
    * Checks whether any of the responses are exceptions. If yes, re-throws
    * them (as exceptions or runtime exceptions).
    *
    * @param rsps
    * @throws Exception
    */
   private void checkResponses(List rsps) throws Exception {
      Object rsp;
      if(rsps != null) {
         for(Iterator it=rsps.iterator(); it.hasNext();) {
            rsp=it.next();
            if(rsp != null) {
               if(rsp instanceof RuntimeException)
                  throw (RuntimeException)rsp;
               if(rsp instanceof Exception)
                  throw (Exception)rsp;
            }
         }
      }
   }




   /**
    * Received as result of replication sent from another node to this node
    * @param method_call
    * @throws Throwable
    */
   public Object replicate(MethodCall method_call) throws Throwable {
      if(method_call == null)
         throw new NullPointerException("method call is null");

      if(log.isTraceEnabled())
         log.trace("replicate(): received " + method_call);

      Method meth=method_call.getMethod();
      if(meth.equals(TreeCache.prepareMethod)) {
         Object[] args=method_call.getArgs();
         GlobalTransaction gtx=(GlobalTransaction)args[0]; // guaranteed to be non-null
         List modifications=(List)args[1];
         Address coordinator=(Address)args[2];
         boolean commit=((Boolean)args[3]).booleanValue();

         // discard own message. should never happen because runPreparePhase() calls every member except itself !
         if(coordinator != null && coordinator.equals(cache.getLocalAddress())) {
            if(log.isTraceEnabled())
               log.trace("received my own message (discarding it)");
            return null;
         }

         remote_transactions.add(gtx);
         handlePrepare(gtx, modifications, commit);
         return null;
      }
      else if(meth.equals(TreeCache.commitMethod) || meth.equals(TreeCache.rollbackMethod)) {
         // Find the local transactions associated with gtx, commit or rollback TX
         Object[] args=method_call.getArgs();
         GlobalTransaction gtx=(GlobalTransaction)args[0]; // guaranteed to be non-null
         Transaction ltx=tx_table.getLocalTransaction(gtx), curr_tx=null;
         if(ltx != null) {
            if(log.isTraceEnabled())
               log.trace("received " + meth.getName() + ": local TX=" + ltx + ", global TX=" + gtx);
         }
         else {
            throw new IllegalStateException("found no local TX for global TX=" + gtx);
         }

         // here we have a non-null ltx
         try {
            curr_tx=tx_mgr.suspend();
            if(log.isTraceEnabled())
               log.trace("executing " + meth.getName() + "() with local TX " + ltx);
            tx_mgr.resume(ltx);

            if(meth.equals(TreeCache.rollbackMethod))
               ltx.rollback();
            else
               ltx.commit();
            remote_transactions.remove(gtx);
         }
         finally {
            tx_mgr.suspend(); // should be a no-op as ltx.commit()/rollback() should have disassociated thread already
            if(curr_tx != null)
               tx_mgr.resume(curr_tx);
         }

         return null;
      }
      return super.invoke(method_call);
   }



   /**
    *
    * @param method_calls List<MethodCall>
    * @throws Throwable
    */
   public void replicate(List method_calls) throws Throwable {
      if(method_calls == null) return;
      for(Iterator it=method_calls.iterator(); it.hasNext();) {
         MethodCall c=(MethodCall)it.next();
         try {
            super.invoke(c);
         }
         catch(Throwable t) {
            log.error("failed executing method call", t);
         }
      }
   }




   private Transaction createNewLocalTransaction(GlobalTransaction gtx) throws Exception {
      Transaction local_tx;
      if(tx_mgr == null)
         throw new Exception("failed to create local transaction: TransactionManager is null");
      tx_mgr.begin();
      local_tx=tx_mgr.getTransaction();
      tx_table.put(local_tx, gtx);
      return local_tx;
   }

   /**
    * @param gtx     The global transaction under which all updates are coordinated across a cluster.
    * @param modifications List<MethodCall>. A list of {@link MethodCall} objects
    * @param commit        Commit or rollback immediately ?
    * @throws Exception
    */
   private void handlePrepare(GlobalTransaction gtx, List modifications, boolean commit) throws Exception {
      MethodCall method_call;
      boolean success=true;
      Transaction curr_tx, ltx;
      TransactionEntry entry;

      // Is there a local transaction associated with GTX ?
      ltx=tx_table.getLocalTransaction(gtx);
      curr_tx=tx_mgr.suspend(); // might be null if not LTX running

      try {
         if(ltx == null) {
            ltx=createNewLocalTransaction(gtx); // creates new LTX and associates it with GTX
            if(log.isTraceEnabled())
               log.trace("(" + cache.getLocalAddress() + "): started new local TX as result of PREPARE: local TX=" +
                         ltx + ", global TX=" + gtx);
         }
         else {
            tx_mgr.resume(ltx);
            if(log.isTraceEnabled())
               log.trace("resuming existing transaction " + ltx + ", global TX=" + gtx);
         }
         // at this point we have a non-null
         // ltx

         //  2. Asssociate the local TX with the global TX. Create new entry for TX in tx_table, the modifications
         //     below will need this entry to add their modifications under the GlobalTx key
         if((entry=tx_table.get(gtx)) == null) {
            entry=new TransactionEntry();
            entry.setTransaction(ltx);
            tx_table.put(gtx, entry);
         }

         // execute calls
         // 3. Invoke all MethodCalls in modifications
         try {
            if(modifications != null) {
               Object retval;
               for(Iterator it=modifications.iterator(); it.hasNext();) {
                  method_call=(MethodCall)it.next();
                  try {
                     retval=super.invoke(method_call); // pass invocation up the interceptor stack
                  }
                  catch(Throwable t) {
                     log.error("method invocation failed", t);
                     retval=t;
                  }
                  if(retval != null && retval instanceof Exception) {
                     success=false;
                     throw (Exception)retval;
                  }
               }
            }
         }

         // 4. If commit == true (one-phase-commit): commit (or rollback) the TX; this will cause
         //    {before/after}Completion() to be called in all registered interceptors: the TransactionInterceptor
         //    will then commit/rollback against the cache
         finally {
            if(commit) {
               if(success)
                  ltx.commit();
               else
                  ltx.rollback();
            }
         }
      }
      finally {
         tx_mgr.suspend(); // suspends ltx
         if(curr_tx != null)
            tx_mgr.resume(curr_tx);
      }
   }







   class SynchronizationHandler implements Synchronization {
      Transaction       tx=null;
      GlobalTransaction gtx=null;
      TreeCache         cache=null;
      List              modifications=null;


      SynchronizationHandler(GlobalTransaction gtx, Transaction tx, TreeCache cache) {
         this.gtx=gtx;
         this.tx=tx;
         this.cache=cache;
      }


      /**
       * Multicast a PREPARE to all nodes in the cluster (except myself), and collect the responses (only if synchronous).
       */
      public void beforeCompletion() {

         // fetch the modifications before the transaction is committed (and thus removed from the tx_table)
         TransactionEntry entry=tx_table.get(gtx);
         if(entry == null)
            throw new IllegalStateException("cannot find transaction entry for " + gtx);

         modifications=new LinkedList(entry.getModifications());
         if(modifications.size() == 0) // we don't replicate if there are no modifications in the current transaction
            return;

          // we only handle sync repl here; async repl is handled in afterCompletion()
         if(cache.getCacheModeInternal() != TreeCache.REPL_SYNC)
            return;

         // REPL_SYNC only from now on
         try {
            int status=tx.getStatus();
            switch(status) {
               case Status.STATUS_ACTIVE: // added Feb 2 2004 (bela)
               case Status.STATUS_COMMITTING:
               case Status.STATUS_PREPARING: // added Nov 18 2004
                  try {
                     MethodCall prepare_method;
                     prepare_method=new MethodCall(TreeCache.prepareMethod,
                                                   new Object[]{gtx, modifications, (Address)cache.getLocalAddress(),
                                                                Boolean.FALSE}); // don't commit or rollback - wait for call
                     runPreparePhase(gtx,
                                     prepare_method,
                                     (Address)cache.getLocalAddress(),
                                     modifications,
                                     false);   // synchronous
                  }
                  catch(Throwable t) {
                     log.warn("runPreparePhase() failed. Transaction is marked as rolled back", t);
                     tx.setRollbackOnly();
                     // change Bela Nov 5 2003: we need to rethrow the exception so the Transaction
                     // knows it has to set status of TX to rolled back
                     throw t;
                  }
                  break;
            }
         }
         catch(Throwable t) {
            throw new NestedRuntimeException("", t);
         }
      }



      public void afterCompletion(int status) {
         int cache_mode=cache.getCacheModeInternal();

         transactions.remove(tx);

         switch(status) {
            case Status.STATUS_COMMITTED:
               if(modifications != null && modifications.size() > 0) {
                  switch(cache_mode) {
                     case TreeCache.REPL_ASYNC:
                        replicateAsynchronously(gtx, modifications);
                        break;
                     case TreeCache.REPL_SYNC:
                        runCommitPhase(gtx); // commit locally, then send async commit() message to other mbrs
                        break;
                  }
               }
               break;

            case Status.STATUS_MARKED_ROLLBACK: // this one is probably not needed
            case Status.STATUS_ROLLEDBACK:
               if(log.isDebugEnabled())
                  log.debug("afterCompletion(): rolling back transaction");
               if(modifications != null && modifications.size() > 0) {
                  switch(cache_mode) {
                     case TreeCache.REPL_SYNC:
                        runRollbackPhase(gtx);
                        break;
                  }
               }
               break;
            default:
               throw new IllegalStateException("illegal status: " + status);
         }
      }

      public String toString() {
         return "ReplicationInterceptor(gtx=" + gtx + ", tx=" + tx + ")";
      }
   }



   /**
    * Calls {@link #prepare(GlobalTransaction,List,org.jgroups.Address,boolean))} in all members except self.
    * Waits for all responses. If one of the members failed to prepare, its return value
    * will be an exception. If there is one exception we rethrow it. This will mark the
    * current transaction as rolled back, which will cause the
    * {@link #afterCompletion(int)} callback to have a status
    * of <tt>MARKED_ROLLBACK</tt>. When we get that call, we simply roll back the
    * transaction.<br/>
    * If everything runs okay, the {@link #afterCompletion(int)}
    * callback will trigger the {@link #runCommitPhase(GlobalTransaction))}.
    * <br/>
    *
    * @param tx
    * @param coordinator
    * @param prepare_method The MethodCall representing the prepare() method
    * @param async       Run this asynchronously
    * @throws Exception
    */
   protected void runPreparePhase(GlobalTransaction tx,
                                  MethodCall prepare_method,
                                  Address coordinator,
                                  List modifications,
                                  boolean async) throws Exception {
      List rsps;
      int num_mods=modifications != null? modifications.size() : 0;

      // this method will return immediately if we're the only member (because exclude_self=true)
      if(log.isTraceEnabled())
         log.trace("(" + cache.getLocalAddress() + "): running remote prepare for " + tx + " with async mode=" + async + " and coord=" + coordinator +
                   " (" + num_mods + " modifications): " + modifications); // todo: remove printing entire mod list !
      rsps=cache.callRemoteMethods(cache.getMembers(),
                                   TreeCache.replicateMethod,
                                   new Object[]{prepare_method},
                                   !async, // sync or async call ?
                                   true,   // exclude self
                                   cache.getSyncReplTimeout());
      if(!async && rsps != null)
         checkResponses(rsps); // throws an exception if one of the rsps is an exception
   }



   /**
     * Asynchronously calls {@link #commit(GlobalTransaction)} in all members
     *
     * @param tx
     */
    protected void runCommitPhase(GlobalTransaction gtx) {
      boolean sync_commit_phase=cache.getSyncCommitPhase();

      // 1. Multicast commit() to all members (exclude myself though)
      try {
         MethodCall commit_method=new MethodCall(TreeCache.commitMethod, new Object[]{gtx});
         if(log.isTraceEnabled())
            log.trace("running remote commit for " + gtx + " with async mode=" + !sync_commit_phase +
                      " and coord=" + cache.getLocalAddress());
         cache.callRemoteMethods(cache.getMembers(), TreeCache.replicateMethod, new Object[]{commit_method},
                                 sync_commit_phase, // this is async by default, can be changed in TreeCache
                                 true,  // exclude self
                                 cache.getSyncReplTimeout());
      }
      catch(Throwable e) {
         log.error("commit failed", e);
      }
   }


   /**
     * Asynchronously calls {@link #rollback(GlobalTransaction)} in all members
     * @param tx
     */
    protected void runRollbackPhase(GlobalTransaction gtx) {
      boolean sync_rollback_phase=cache.getSyncRollbackPhase();

      // 1. Multicast rollback() to all other members (excluding myself)
      try {
         MethodCall rollback_method=new MethodCall(TreeCache.rollbackMethod, new Object[]{gtx});
         if(log.isTraceEnabled())
            log.trace("running remote rollback for " + gtx + " with async mode=" + sync_rollback_phase +
                      " and coord=" + cache.getLocalAddress());
         cache.callRemoteMethods(cache.getMembers(), TreeCache.replicateMethod, new Object[]{rollback_method},
                                 sync_rollback_phase, // this is by default an async call; can be changed in TreeCache
                                 true, // exclude self
                                 cache.getSyncReplTimeout());
      }
      catch(Throwable e) {
         log.error("rollback failed", e);
      }
   }



   void replicateAsynchronously(GlobalTransaction gtx, List modifications) {
      MethodCall mc;
      try {
         mc=new MethodCall(TreeCache.prepareMethod,
                           new Object[]{gtx, modifications, (Address)cache.getLocalAddress(),
                                        Boolean.TRUE}); // do commit or rollback after call
         if(cache.getUseReplQueue() && cache.getReplQueue() != null) {
            cache.getReplQueue().add(new MethodCall(TreeCache.replicateMethod, new Object[]{mc}));
         }
         else {
            // asynchronous PREPARE multicast, excluding self
            runPreparePhase(gtx,
                            mc,
                            (Address)cache.getLocalAddress(),
                            modifications,
                            true);   // async
         }
      }
      catch(Throwable t) {
         log.warn("failed to replicate asynchronously", t);
      }
   }

}