/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.interceptors;

import org.jboss.cache.*;
import org.jboss.cache.lock.IdentityLock;
import org.jboss.cache.lock.LockingException;
import org.jboss.cache.lock.TimeoutException;
import org.jgroups.blocks.MethodCall;

import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.lang.reflect.Method;
import java.util.*;

import EDU.oswego.cs.dl.util.concurrent.ReentrantLock;

/**
 * Handles locking. When a TX is associated, we register for TX completion and unlock the locks acquired within the
 * scope of the TX. When no TX is present, we keep track of the locks acquired during the current method and unlock
 * when the method returns
 * @author Bela Ban
 * @version $Id: LockInterceptor.java,v 1.8.2.2 2005/04/06 21:06:41 starksm Exp $
 */
public class LockInterceptor extends Interceptor {
   private TransactionManager tx_mgr=null;
   TransactionTable           tx_table=null;
   HashMap                    lock_table;
   private long               lock_acquisition_timeout;

   ReentrantLock              create_lock=new ReentrantLock();

   /** List<Transaction> that we have registered for */
   private List               transactions=Collections.synchronizedList(new ArrayList());

   final static int NONE  = 0;
   final static int READ  = 1;
   final static int WRITE = 2;


   public void setCache(TreeCache cache) {
      super.setCache(cache);
      tx_mgr=cache.getTransactionManager();
      tx_table=cache.getTransactionTable();
      lock_table=cache.getLockTable();
      lock_acquisition_timeout=cache.getLockAcquisitionTimeout();
   }



   public Object invoke(MethodCall m) throws Throwable {
      Transaction       tx=null;
      GlobalTransaction gtx=null;
      Object            retval=null;
      Fqn               fqn=null;
      int               lock_type=NONE;
      long              lock_timeout=lock_acquisition_timeout;
      Method            meth=m.getMethod();
      Object[]          args=m.getArgs();

      /** List<IdentityLock> locks. Locks acquired during the current method; will be released later by UnlockInterceptor.
       *  This list is only populated when there is no TX, otherwise the TransactionTable maintains the locks
       * (keyed by TX) */
      List locks=null;

      boolean recursive=false;
      boolean createIfNotExists=false;

      if(tx_mgr != null && (tx=tx_mgr.getTransaction()) != null && isValid(tx)) { // ACTIVE or PREPARING
         if(!transactions.contains(tx)) {
            gtx=cache.getCurrentTransaction(tx);
            if(gtx == null)
               throw new Exception("failed to get global transaction");
            try {
               OrderedSynchronizationHandler handler=OrderedSynchronizationHandler.getInstance(tx);
               SynchronizationHandler myHandler=new SynchronizationHandler(gtx, tx, cache);
               handler.registerAtTail(myHandler); // needs to be invoked last on TX commit
               transactions.add(tx);
            }
            catch(Exception e) {
               log.error("registration for tx=" + tx + " with transaction manager failed, running without TX", e);
            }
         }
         else {
            gtx=cache.getTransactionTable().get(tx);
         }
      }
      else { // no TX
         // we don't need synchronization on lock_table because the same thread won't enter concurrently
         locks=(List)lock_table.get(Thread.currentThread());
         if(locks == null) {
            locks=new LinkedList();
            lock_table.put(Thread.currentThread(), locks);
         }
      }

      // 1. Determine the type of lock (read, write, or none) depending on the method. If no lock is required, invoke
      //    the method, then return immediately
      //    Set the Fqn
      if(meth.equals(TreeCache.putDataMethodLocal) || meth.equals(TreeCache.putDataEraseMethodLocal) ||
            meth.equals(TreeCache.putKeyValMethodLocal) || meth.equals(TreeCache.putFailFastKeyValueMethodLocal)) {
         createIfNotExists=true;
         fqn=(Fqn)args[1];
         lock_type=WRITE;
         if(meth.equals(TreeCache.putFailFastKeyValueMethodLocal))
            lock_timeout=((Long)args[5]).longValue();
      }
      else if(meth.equals(TreeCache.removeNodeMethodLocal)) {
         fqn=(Fqn)args[1];
         lock_type=WRITE;
         recursive=true; // remove node and *all* child nodes
      }
      else if(meth.equals(TreeCache.removeKeyMethodLocal) || meth.equals(TreeCache.removeDataMethodLocal)) {
         fqn=(Fqn)args[1];
         lock_type=WRITE;
      }
      else if(meth.equals(TreeCache.evictNodeMethodLocal)) {
         fqn=(Fqn)args[0];
         lock_type=WRITE;
      }
      else if(meth.equals(TreeCache.addChildMethodLocal)) {
         fqn=(Fqn)args[1];
         lock_type=WRITE;
      }
      else if(meth.equals(TreeCache.getKeyValueMethodLocal)) {
         fqn=(Fqn)args[0];
         lock_type=READ;
      }
      else if(meth.equals(TreeCache.getNodeMethodLocal)) {
         fqn=(Fqn)args[0];
         lock_type=READ;
      }
      else if(meth.equals(TreeCache.getKeysMethodLocal)) {
         fqn=(Fqn)args[0];
         lock_type=READ;
      }
      else if(meth.equals(TreeCache.getChildrenNamesMethodLocal) || meth.equals(TreeCache.releaseAllLocksMethodLocal) ||
            meth.equals(TreeCache.printMethodLocal)) {
         fqn=(Fqn)args[0];
         lock_type=READ;
      }
      else if(meth.equals(TreeCache.lockMethodLocal)) {
         fqn=(Fqn)args[0];
         lock_type=((Integer)args[1]).intValue();
         recursive=((Boolean)args[2]).booleanValue();
      }


      // Lock the node (must be either read or write if we get here)
      // If no TX: add each acquired lock to the list of locks for this method (locks)
      // If TX: [merge code from TransactionInterceptor]: register with TxManager, on commit/rollback,
      // release the locks for the given TX
      if(fqn != null) {
         if(createIfNotExists) {
            do {
               lock(fqn, gtx, lock_type, locks, recursive, lock_timeout, createIfNotExists);
            }
            while(cache.exists(fqn) == false); // keep trying until we have the lock (fixes concurrent remove())
                                               // terminates successfully, or with (Timeout)Exception
         }
         else
            lock(fqn, gtx, lock_type, locks, recursive, lock_timeout, createIfNotExists);
      }
      else {
         if(log.isTraceEnabled())
            log.trace("bypassed locking as method " + m.getName() + "() doesn't require locking");
      }
      if(meth.equals(TreeCache.lockMethodLocal))
         return null;
      retval=super.invoke(m);
      return retval;
   }



   /**
    * Locks a given node.
    * @param fqn
    * @param gtx
    * @param lock_type READ, WRITE or NONE
    * @param locks A List<Lock> of locks held, each new node locked is added
    * @param recursive Lock children recursively
    */
   private void lock(Fqn fqn, GlobalTransaction gtx, int lock_type, List locks, boolean recursive,
                     long lock_timeout, boolean createIfNotExists)
         throws TimeoutException, LockingException, InterruptedException {
      Node n, child_node=null;
      Object  child_name;
      Fqn     tmp_fqn=new Fqn();
      int     treeNodeSize;
      Object  owner=gtx != null? gtx : (Object)Thread.currentThread();
      boolean acquired=false;

      if(fqn == null) {
         log.error("fqn is null - this should not be the case");
         return;
      }

      if((treeNodeSize=fqn.size()) == 0)
         return;

      n=cache.getRoot();
      for(int i=0; i < treeNodeSize; i++) {
         child_name=fqn.get(i);
         tmp_fqn=new Fqn(tmp_fqn, child_name);
         if(createIfNotExists)
            create_lock.acquire();
         try {
            child_node=n.getChild(child_name);
            if(child_node == null) {
               if(createIfNotExists) {
                  child_node=n.createChild(child_name, tmp_fqn, n);
                  if(log.isTraceEnabled())
                     log.trace("created child " + child_name);
                  if(gtx != null) {
                     // add the node name to the list maintained for the current tx
                     // (needed for abort/rollback of transaction)
                     cache.addNode(gtx, (Fqn)tmp_fqn.clone());
                  }
                  create_lock.release();
                  cache.notifyNodeCreated(tmp_fqn);
               }
               else {
                  if(log.isTraceEnabled())
                     log.trace("failed finding child " + child_name + " of node " + n.getFqn());
                  return;
               }
            }
         }
         finally {
            if(create_lock.holds() > 0)
               create_lock.release();
         }

         if(lock_type == NONE) {
            ;
         }
         else {
            if(lock_type == WRITE && i == (treeNodeSize - 1)) {
               acquired=child_node.acquire(owner, lock_timeout, Node.LOCK_TYPE_WRITE);
            }
            else {
               acquired=child_node.acquire(owner, lock_timeout, Node.LOCK_TYPE_READ);
            }
         }

         if(acquired) {
            if(gtx != null) {
               // add the lock to the list of locks maintained for this transaction
               // (needed for release of locks on commit or rollback)
               cache.getTransactionTable().addLock(gtx, child_node.getLock());
            }
            else {
               IdentityLock l=child_node.getLock();
               if(!locks.contains(l))
                  locks.add(l);
            }
         }

         if(recursive && i == (treeNodeSize - 1)) {
            Set acquired_locks=child_node.acquireAll(owner, lock_timeout, lock_type);
            if(acquired_locks.size() > 0) {
               if(gtx != null) {
                  cache.getTransactionTable().addLocks(gtx, acquired_locks);
               }
               else {
                  locks.addAll(acquired_locks);
               }
            }
         }

         n=child_node;
      }
   }



//   private boolean acquire(Node node, Object owner, int lock_type, long lock_timeout)
//         throws TimeoutException, LockingException, InterruptedException {
//      switch(lock_type) {
//         case NONE:
//            return true;
//         case WRITE:
//            return node.acquire(owner, lock_timeout, Node.LOCK_TYPE_WRITE);
//         case READ:
//            return node.acquire(owner, lock_timeout, Node.LOCK_TYPE_READ);
//      }
//   }


   /**
    * Remove all locks held by <tt>tx</tt>, remove the transaction from the transaction table
    * @param gtx
    */
   private void commit(GlobalTransaction gtx) {
      if(log.isTraceEnabled())
         log.trace("committing cache with gtx " + gtx);

      TransactionEntry entry=tx_table.get(gtx);
      if(entry == null) {
         log.error("entry for transaction " + gtx + " not found (maybe already committed)");
         return;
      }

      // Let's do it in stack style, LIFO
      List list=new LinkedList(entry.getLocks());
      for(int i=list.size() - 1; i >= 0; i--) {
         IdentityLock lock=(IdentityLock)list.get(i);
         if(log.isTraceEnabled())
            log.trace("releasing lock for " + lock.getFqn() + " (" + lock + ")");
         lock.release(gtx);
      }
      entry.getLocks().clear();

      Transaction ltx=entry.getTransaction();
      if(log.isTraceEnabled())
         log.trace("removing local transaction " + ltx + " and global transaction " + gtx);
      tx_table.remove(ltx);
      tx_table.remove(gtx);
   }


   /**
     * Revert all changes made inside this TX: invoke all method calls of the undo-ops
     * list. Then release all locks and remove the TX from the transaction table.
     * <ol>
     * <li>Revert all modifications done in the current TX<li/>
     * <li>Release all locks held by the current TX</li>
     * <li>Remove all temporary nodes created by the current TX</li>
     * </ol>
     *
     * @param tx
     */
   private void rollback(GlobalTransaction tx) {
      List undo_ops;
      TransactionEntry entry=tx_table.get(tx);
      MethodCall undo_op;
      Object retval;
      Fqn node_name;

      if(log.isTraceEnabled())
         log.trace("called to rollback cache with GlobalTransaction=" + tx);

      if(entry == null) {
         log.error("entry for transaction " + tx + " not found (transaction has possibly already been rolled back)");
         return;
      }

      // 1. Revert the modifications by running the undo-op list in reverse. This *cannot* throw any exceptions !
      undo_ops=new LinkedList(entry.getUndoOperations());
      for(ListIterator it=undo_ops.listIterator(undo_ops.size()); it.hasPrevious();) {
         undo_op=(MethodCall)it.previous();
         try {
            retval=undo_op.invoke(cache);
            if(retval != null && retval instanceof Throwable) {
               log.error("undo operation failed, error=" + retval);
            }
         }
         catch(Throwable t) {
            log.error("undo operation failed", t);
         }
      }

      // 2. Remove all temporary nodes. Need to do it backwards since node is LIFO.
      for(ListIterator it=new LinkedList(entry.getNodes()).listIterator(entry.getNodes().size());
          it.hasPrevious();) {
         node_name=(Fqn)it.previous();
         try {
            cache._remove(tx, node_name, false);
         }
         catch(Throwable t) {
            log.error("failed removing node \"" + node_name + "\"", t);
         }
      }


      // 3. Finally, release all locks held by this TX
      // Let's do it in stack style, LIFO
      // Note that the lock could have been released already so don't panic.
      List list=new LinkedList(entry.getLocks());
      for(int i=list.size() - 1; i >= 0; i--) {
         IdentityLock lock=(IdentityLock)list.get(i);
         if(log.isTraceEnabled())
            log.trace("releasing lock for " + lock.getFqn() + " (" + lock + ")");
         lock.release(tx);
      }
      entry.getLocks().clear();

      Transaction ltx=entry.getTransaction();
      if(log.isTraceEnabled())
         log.trace("removing local transaction " + ltx + " and global transaction " + tx);
      tx_table.remove(ltx);
      tx_table.remove(tx);
   }



   class SynchronizationHandler implements Synchronization {
      GlobalTransaction gtx=null;
      Transaction       tx=null;
      TreeCache         cache=null;


      SynchronizationHandler(GlobalTransaction gtx, Transaction tx, TreeCache cache) {
         this.gtx=gtx;
         this.cache=cache;
         this.tx=tx;
      }


      /**
       * This method is invoked before the start of the commit or rollback
       * process. We don't do anything because this method handles only the case of cache_mode
       * being LOCAL
       */
      public void beforeCompletion() {
      }


      /**
       * Depending on the status (OK or FAIL), call commit() or rollback() on the CacheLoader
       *
       * @param status
       */
      public void afterCompletion(int status) {
         transactions.remove(tx);
         switch(status) {
            case Status.STATUS_COMMITTED:
               commit(gtx);
               break;

            case Status.STATUS_MARKED_ROLLBACK: // this one is probably not needed
            case Status.STATUS_ROLLEDBACK:
               if(log.isDebugEnabled())
                  log.debug("rolling back transaction");
               rollback(gtx); // roll back locally
               break;
            default:
               rollback(gtx); // roll back locally
               throw new IllegalStateException("failed rolling back transaction: " + status);
         }
      }
   }


}