/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.interceptors;

import EDU.oswego.cs.dl.util.concurrent.ReentrantLock;
import org.jboss.cache.*;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.util.NestedRuntimeException;
import org.jgroups.blocks.MethodCall;

import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.lang.reflect.Method;
import java.util.*;

/**
 * Writes modifications back to the store on the way out: stores modifications back
 * through the CacheLoader, either after each method call (no TXs), or at TX commit.
 * @author Bela Ban
 * @version $Id: CacheStoreInterceptor.java,v 1.5.2.2 2005/04/06 21:06:41 starksm Exp $
 */
public class CacheStoreInterceptor extends Interceptor {
   private CacheLoader        loader=null;
   private TransactionManager tx_mgr=null;
   private TransactionTable   tx_table=null;

   /** List<Transaction> that we have registered for */
   private List        transactions=Collections.synchronizedList(new ArrayList());

   /** used to synchronize cache storing between threads, write modifications back to the CacheLoader only once */
   final ReentrantLock lock=new ReentrantLock();



   public void setCache(TreeCache cache) {
      super.setCache(cache);
      this.loader=cache.getCacheLoader();
      tx_mgr=cache.getTransactionManager();
      tx_table=cache.getTransactionTable();
   }


   /**
    * Pass the method on. When it returns, store the modification back to the store using the CacheLoader.
    * In case of a transaction, register for TX completion (2PC) and at TX commit, write modifications made
    * under the given TX to the CacheLoader
    * @param m
    * @return
    * @throws Throwable
    */
   public Object invoke(MethodCall m) throws Throwable {
      Fqn          fqn;
      Map          attributes;
      Object       key, value;
      Method       meth=m.getMethod();
      Object[]     args=m.getArgs();
      Transaction  tx;
      Object       retval=null, tmp_retval=null;
      boolean      use_tmp_retval=false;


      // could be potentially TRANSACTIONAL. If so, we register for TX completion (if we haven't done so yet)
      if(tx_mgr != null && (tx=tx_mgr.getTransaction()) != null && isValid(tx)) { // ACTIVE or PREPARING
         lock.acquire();
         try {
            if(!transactions.contains(tx)) {
               transactions.add(tx);
               // GlobalTransaction gtx=tx_table.get(tx);
               GlobalTransaction gtx=cache.getCurrentTransaction(tx);
               if(gtx == null)
                  throw new Exception("failed to get global transaction");
               tx.registerSynchronization(new SynchronizationHandler(gtx, tx, cache));
            }
            lock.release();
            return super.invoke(m);
         }
         finally {
            if(lock.holds() > 0)
               lock.release();
         }
      }

      // if we're here we don't run in a transaction

      // remove() methods need to be applied to the CacheLoader before passing up the call: a listener might
      // access an element just removed, causing the CacheLoader to *load* the element before *removing* it.
      synchronized(this) {
         if(meth.equals(TreeCache.removeNodeMethodLocal)) {
            fqn=(Fqn)args[1];
            loader.remove(fqn);
         }
         else if(meth.equals(TreeCache.removeKeyMethodLocal)) {
            fqn=(Fqn)args[1];
            key=args[2];
            tmp_retval=loader.remove(fqn, key);
            use_tmp_retval=true;
         }
         else if(meth.equals(TreeCache.removeDataMethodLocal)) {
            fqn=(Fqn)args[1];
            loader.removeData(fqn);
         }
      }

      retval=super.invoke(m);

      // put() methods need to be applied *after* the call
      synchronized(this) {
         if(meth.equals(TreeCache.putDataMethodLocal) || meth.equals(TreeCache.putDataEraseMethodLocal)) {
            fqn=(Fqn)args[1];
            attributes=(Map)args[2];
            loader.put(fqn, attributes);
         }
         else if(meth.equals(TreeCache.putKeyValMethodLocal)) {
            fqn=(Fqn)args[1];
            key=args[2];
            value=args[3];
            loader.put(fqn, key, value);
         }
      }

      if(use_tmp_retval)
         return tmp_retval;
      else
         return retval;
   }


   private CacheLoader getLoader() {
      return loader != null? loader : (loader=cache.getCacheLoader());
   }

   private void prepareCacheLoader(GlobalTransaction gtx) throws Exception {
      List modifications;
      TransactionEntry entry;

      entry=tx_table.get(gtx);
      if(entry == null)
         throw new Exception("entry for transaction " + gtx + " not found in transaction table");
      modifications=entry.getModifications();
      if(modifications.size() == 0)
         return;
      List cache_loader_modifications=new ArrayList();
      for(Iterator it=modifications.iterator(); it.hasNext();) {
         MethodCall methodCall=(MethodCall)it.next();
         Modification mod=convertMethodCallToModification(methodCall);
         cache_loader_modifications.add(mod);
      }
      if(cache_loader_modifications.size() > 0) {
         loader.prepare(gtx, cache_loader_modifications, false);
      }
   }



   private Modification convertMethodCallToModification(MethodCall methodCall) throws Exception {
      Method method=methodCall.getMethod();
      Object[] args;
      if(method == null)
         throw new Exception("method call has no method: " + methodCall);

      args=methodCall.getArgs();
      if(TreeCache.putDataMethodLocal.equals(method)) {
         return new Modification(Modification.PUT_DATA,
                                 (Fqn)args[1],      // fqn
                                 (Map)args[2]);     // data
      }
      else if(TreeCache.putDataEraseMethodLocal.equals(method)) {
         return new Modification(Modification.PUT_DATA_ERASE,
                                 (Fqn)args[1],      // fqn
                                 (Map)args[2]);     // data
      }
      else if(TreeCache.putKeyValMethodLocal.equals(method)) {
         return new Modification(Modification.PUT_KEY_VALUE,
                                 (Fqn)args[1],      // fqn
                                 args[2],           // key
                                 args[3]);          // value
      }
      else if(TreeCache.removeNodeMethodLocal.equals(method)) {
         return new Modification(Modification.REMOVE_NODE,
                                 (Fqn)args[1]);     // fqn
      }
      else if(TreeCache.removeKeyMethodLocal.equals(method)) {
         return new Modification(Modification.REMOVE_KEY_VALUE,
                                 (Fqn)args[1],      // fqn
                                 args[2]);          // key

      }
      else if(TreeCache.removeDataMethodLocal.equals(method)) {
         return new Modification(Modification.REMOVE_DATA,
                                 (Fqn)args[1]);     // fqn
      }
      else
         throw new Exception("method call " + method.getName() + " cannot be converted to a modification");
   }





   class SynchronizationHandler implements Synchronization {
      GlobalTransaction gtx=null;
      Transaction       tx=null;
      TreeCache         cache=null;
      List              modifications;


      SynchronizationHandler(GlobalTransaction gtx, Transaction tx, TreeCache cache) {
         this.gtx=gtx;
         this.tx=tx;
         this.cache=cache;
         TransactionEntry entry=tx_table.get(gtx);
         if(entry != null)
            modifications=entry.getModifications();
      }


      /**
       * Needs to call prepare() in the CacheLoader
       */
      public void beforeCompletion() {
         try {
            int status=tx.getStatus();
            switch(status) {
               case Status.STATUS_ACTIVE: // added Feb 2 2004 (bela)
               case Status.STATUS_COMMITTING:
               case Status.STATUS_PREPARING:
                  prepareCacheLoader(gtx);
                  break;
               case Status.STATUS_MARKED_ROLLBACK:
               case Status.STATUS_ROLLING_BACK:
                  // rollback(gtx); // changed by Bela Feb 5 2004: we can do this in the afterCompletion()
                  // new note: beforeCompletion() will not be called when a TX is rolled back, but we are called
                  // afterCompletion(false) directly
                  break;
               default:
                  log.error("beforeCompletion(). Illegal tx status: " + status);
                  throw new IllegalStateException("Illegal status: " + status);
            }
         }
         catch(Throwable t) {
            throw new NestedRuntimeException("", t);
         }
      }


      /**
       * Depending on the status (OK or FAIL), call commit() or rollback() on the CacheLoader
       *
       * @param status
       */
      public void afterCompletion(int status) {
         transactions.remove(tx);
         switch(status) {
            case Status.STATUS_COMMITTED:
               try {
                  if(modifications == null || modifications.size() == 0)
                     return;
                  getLoader().commit(gtx);
               }
               catch(Exception e) {
                  log.error("failed committing transaction to cache loader", e);
               }
               break;

            case Status.STATUS_MARKED_ROLLBACK: // this one is probably not needed
            case Status.STATUS_ROLLEDBACK:
               getLoader().rollback(gtx);
               break;
         }
      }
   }

}