/*
 * JBoss, the OpenSource J2EE webOS
 *
 * Distributable under LGPL license.
 * See terms of license at gnu.org.
 */
package org.jboss.cache.aop;

import org.jboss.aop.Advised;
import org.jboss.aop.ClassInstanceAdvisor;
import org.jboss.aop.InstanceAdvisor;
import org.jboss.aop.advice.Interceptor;
import org.jboss.aop.proxy.ClassProxy;
import org.jboss.cache.*;
import org.jboss.cache.transaction.BatchModeTransactionManager;
import org.jboss.cache.eviction.AopEvictionPolicy;
import org.jboss.util.NestedRuntimeException;
import org.jgroups.JChannel;

import javax.transaction.*;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.*;

/**
 * Implementation of the cache using dyanmic aop interceptors. The basic idea is that the caller only
 * uses the {@link #putObject(String,Object)}, {@link #getObject(String)} and
 * {@link #removeObject(String)} methods.<br>
 * When putting an object into the cache, the cache essentially takes care of how the object
 * will be replicated. It 'breaks' the object apart to map it onto the tree, and adds an
 * interceptor recursively for each object reachable from the root object. Those interceptor
 * 'know' to which part of the tree they are mapped. Whenever there is state change
 * ("set*" interceptors), the tree is modified using the superclass. All "get*" operations
 * to the object are intercepted and redirected to the tree.<br>
 * Bottom line is that a user simply puts <em>any</em> object into the cache, and the object
 * will be replicated to all caches in the cluster. The only requirement on cachable objects
 * is that they access all state through getters and setters.
 *
 * @author Harald Gliebe
 * @author Ben Wang
 * @jmx.mbean extends="org.jboss.cache.TreeCacheMBean"
 */
public class TreeCacheAop extends TreeCache implements TreeCacheAopMBean
{
   // Class -> CachedType
   // use WeakHashMap to allow class reloading
   protected Map cachedTypes = new WeakHashMap();
   public static final String CLASS_INTERNAL = "jboss:internal:class";
   public static final Fqn JBOSS_INTERNAL = new Fqn("JBossInternal");
   public static final String DUMMY = "dummy";
   // Use batch mode tm to simulate the automicity.
   TransactionManager localTm_ = BatchModeTransactionManager.getInstance();


   public TreeCacheAop(String cluster_name,
                       String props,
                       long state_fetch_timeout)
         throws Exception
   {
      super(cluster_name, props, state_fetch_timeout);
   }

   public TreeCacheAop() throws Exception
   {
   }

   public TreeCacheAop(JChannel channel) throws Exception
   {
      super(channel);
   }

   /**
    * Over-ride to make sure we are using an eviction policy specific to aop.
    *
    * @jmx.managed-attribute
    */
   public void setEvictionPolicyClass(String eviction_policy_class) {
      this.eviction_policy_class=eviction_policy_class;
      if(eviction_policy_class == null || eviction_policy_class.length() ==0)
         return;

      try {
         Object obj = getClass().getClassLoader().loadClass(eviction_policy_class).newInstance();
         if( ! (obj instanceof AopEvictionPolicy))
            throw new RuntimeException("TreeCacheAop.setEvictionPolicyClass(). Eviction policy provider:" +
                  eviction_policy_class +" is not an instance of AopEvictionPolicy.");
         eviction_policy_provider =(TreeCacheListener) obj;
         this.addTreeCacheListener(eviction_policy_provider );
      }
      catch(RuntimeException ex) {
         log.error("setEvictionPolicyClass(): failed creating instance of  " + eviction_policy_class, ex);
         throw ex;
      }
      catch(Throwable t) {
         log.error("setEvictionPolicyClass(): failed creating instance of  " + eviction_policy_class, t);
      }
   }

   /**
    * Insert an aop-enabled object into the cache.
    * It will also recursively put the any sub-object that is
    * declared as aop-capable (i.e., in <code>jboss-aop.xml</code>).
    * Note that <code>List</code>, <code>Map</code>, <code>Set</code>
    * attributes are aop-enabled, by default, as well.
    *
    * @param fqn The fqn string name to associate with the object in the cache.
    * @param obj aop-enabled object to be inerted into the cache. If null,
    *            it will nullify the fqn node.
    * @param obj Return the previous content under fqn.
    * @throws CacheException
    * @jmx.managed-operation
    */
   public Object putObject(String fqn, Object obj) throws CacheException
   {
      return putObject(Fqn.fromString(fqn), obj);
   }

   /**
    * Insert an aop-enabled object into the cache.
    * It will also recursively put the any sub-object that is
    * declared as aop-capable (i.e., in <code>jboss-aop.xml</code>).
    * Note that <code>List</code>, <code>Map</code>, <code>Set</code>
    * attributes are aop-enabled, by default, as well.
    *
    * @param fqn The fqn instance to associate with the object in the cache.
    * @param obj aop-enabled object to be inerted into the cache. If null,
    *            it will nullify the fqn node.
    * @param obj Return the previous content under fqn.
    * @throws CacheException
    * @jmx.managed-operation
    */
   public Object putObject(Fqn fqn, Object obj) throws CacheException
   {
      if( hasCurrentTransaction() )  // We have a transaction context going on now.
      {
         return _putObject(fqn, obj);
      } else
      {
         // Start a new transaction, we need transaction so the operation is atomic.
         try
         {
            localTm_.begin();
            // TODO Need to find a btter way to lock the node.
            put(fqn, DUMMY, DUMMY);
            Object objOld = _putObject(fqn,obj);
            remove(fqn, DUMMY);
            return objOld;
         }
         catch (Exception e)
         {
            e.printStackTrace();
            try
            {
               localTm_.setRollbackOnly();
            }
            catch (Exception exn)
            {
               exn.printStackTrace();
            }
            throw new NestedRuntimeException("TreeCacheAop.putObject(): ", e);
         }
         finally
         {
            endTransaction();
         }
      }
   }

   protected void endTransaction()
   {
      try {
         if(localTm_.getTransaction().getStatus() != Status.STATUS_MARKED_ROLLBACK)
         {
            localTm_.commit();
         } else
         {
            localTm_.rollback();
         }
      } catch (Exception e) {
         e.printStackTrace();
         throw new NestedRuntimeException("TreeCacheAop.endTransaction(): ", e);
      }
   }

   /**
    *
    * @param fqn
    * @param obj
    * @return
    * @throws CacheException
    */
   public Object _putObject(Fqn fqn, Object obj) throws CacheException
   {
      checkObjectType(obj);
      AOPInstance aopInstance = (AOPInstance) peek(fqn, AOPInstance.KEY);
      // Skip some un-necessary update if obj is the same class as the old one
      boolean isSameClass = false;
      Object oldValue = null;
      // Remember not to print obj here since it will trigger the CacheInterceptor.
      if(log.isDebugEnabled()) {
         log.debug("putObject(): fqn: " + fqn);
      }

      if (aopInstance != null) {
         if (aopInstance.get() == obj) {
            // obj is already cached
            return obj;
         }

         if (obj == null) {
            return removeObject(fqn);
         }

         // Optimization that won't remove the underlying object if
         // they are the same object type.
         log.debug("putObject(): old class type: " + aopInstance.get());
         if (obj.getClass().isInstance(aopInstance.get())) {
            if (log.isDebugEnabled()) {
               log.debug("putObject(): obj is same class type as the old one");
            }
            oldValue = getObject(fqn);
            isSameClass = true;
         } else {
            oldValue = removeObject(fqn);
         }
      } else {
         // If obj is a primitive type or other un-delcared classes
         Class claz1 = (Class) peek(fqn, CLASS_INTERNAL);
         if (claz1 != null &&
               obj.getClass().getName().equals(claz1.getName())) {
            if (log.isDebugEnabled()) {
               log.debug("putObject(): obj is same class type as the old one");
            }
            isSameClass = true;
         }
      }


      if (obj == null) { // means aopInstance is null as well
         return null;
      }

      // store object in cache
      if (obj instanceof Advised) {
         CachedType type = getCachedType(obj.getClass());
         // Let's put it right away so the object can be locked by the cache.
//         put(fqn, CLASS_INTERNAL, type.getType());

         Fqn internalFqn = null;
         // add interceptor
         InstanceAdvisor advisor = ((Advised) obj)._getInstanceAdvisor();
         // Let's check for object graph, e.g., multiple and circular references first
         if ((internalFqn = checkCircularReference(fqn, advisor, obj)) != null) {
            // Remove existing one
            removeObject(fqn);
            aopInstance = new AOPInstance();
            aopInstance.setRefFqn(internalFqn.toString());
            put(fqn, AOPInstance.KEY, aopInstance);
            put(fqn, CLASS_INTERNAL, type.getType());
            return oldValue;
         } else if ((internalFqn = handleObjectGraph(fqn, advisor, type, obj)) != null) { // found cross references
            // Let's set the reference fqn for getObject later.
            if (log.isDebugEnabled()) {
               log.debug("putObject(): detected multiple references. Will use as a reference instead. Current fqn: "
                     + fqn + " Reference fqn: " + internalFqn);
            }
            // Remove existing one if any
            removeObject(fqn);
            aopInstance = new AOPInstance();
            setRefFqn(aopInstance, internalFqn.toString());
            put(fqn, AOPInstance.KEY, aopInstance);
            put(fqn, CLASS_INTERNAL, type.getType());
            return oldValue;
         }

         // workaround for deserialiased objects
         if (advisor == null) {
            advisor = new ClassInstanceAdvisor(obj);
            ((Advised) obj)._setInstanceAdvisor(advisor);
         }

         // Insert interceptor at runtime
         advisor.appendInterceptor(new CacheInterceptor(this, fqn, type));

         // Let's deflate the objet here. If the field is another "aspectized" object,
         // we will do it recursively.
         put(fqn, CLASS_INTERNAL, type.getType());
         for (Iterator i = type.getFields().iterator(); i.hasNext();) {
            Field field = (Field) i.next();
            Object value = null;
            try {
               value=field.get(obj);
            }
            catch(IllegalAccessException e) {
               throw new CacheException("field access failed", e);
            }
            CachedType fieldType = getCachedType(field.getType());
            if (fieldType.isImmediate()) {
               put(fqn, field.getName(), value);
            } else {
               Fqn tmpFqn = new Fqn(fqn, field.getName());
               _putObject(tmpFqn, value);
               // If value (field member) is of Collection type, e.g., composite class
               // that contains Collection member, we will swap out the old reference
               // with the proxy one.
               // This can probably be optimized with check for instanceof proxy
               if( value instanceof Map || value instanceof List || value instanceof Set ) {
                  Object newValue = getObject(tmpFqn);
                  try {
                     field.set(obj, newValue);
                  } catch (IllegalAccessException e) {
                     log.error("_putObject(): Can't swap out the Collection class of field " +field.getName() +
                           "Exception " +e);
                     throw new CacheException("_putObject(): Can't swap out the Collection class of field \" +field.getName(),"
                           +e);
                  }
               }
            }
         }
         // Put AOPInstance
         put(fqn, AOPInstance.KEY, new AOPInstance(obj));


         // the empty AOPInstance will be replicated and invalidate outdated references
//            put(fqn, AOPInstance.KEY, new AOPInstance(obj));

         // Need to make sure this is behind put such that obj.toString is done correctly.
         if (log.isDebugEnabled()) {
            log.debug("putObject(): inserting with fqn: " + fqn.toString());
         }

         /**
          * Handling collection classes here.
          * First check if obj has been aspectized? That is, if it is a ClassProxy or not.
          * If not, we will need to create a proxy first for the Collection classes
          */
      } else if (obj instanceof Map) {
         if (log.isDebugEnabled()) {
            log.debug("putObject(): aspectized obj is a Map type of size: " + ((Map) obj).size());
         }

         // Need to remove the existing ones first.
         removeObject(fqn);
         put(fqn, CLASS_INTERNAL, obj.getClass());

         // Let's replace it with a proxy if necessary
         Object oldObj = obj;
         if( !(obj instanceof ClassProxy)) {
            Class clazz = obj.getClass();
            try {
               obj=CollectionInterceptorUtil.createProxy(clazz, new CachedMapInterceptor(this, fqn, clazz));
            } catch (Exception e) {
               throw new CacheException("failure creating proxy", e);
            }
         }

         Map map = (Map) oldObj;

         for (Iterator i = map.entrySet().iterator(); i.hasNext();) {
            Map.Entry entry = (Map.Entry) i.next();
            _putObject(new Fqn(fqn, entry.getKey()), entry.getValue());
         }

         put(fqn, AOPInstance.KEY, new AOPInstance(obj));
      } else if (obj instanceof List) {
         if (log.isDebugEnabled()) {
            log.debug("putObject(): aspectized obj is a List type of size: "
                  + ((List) obj).size());
         }

         List list = (List) obj;
         // Need to remove the existing ones first.
         removeObject(fqn);
         put(fqn, CLASS_INTERNAL, obj.getClass());

         // Let's replace it with a proxy if necessary
         if( !(obj instanceof ClassProxy)) {
            Class clazz = obj.getClass();
            try {
               obj=CollectionInterceptorUtil.createProxy(clazz, new CachedListInterceptor(this, fqn, clazz));
            } catch (Exception e) {
               throw new CacheException("failure creating proxy", e);
            }
         }

         int idx = 0;
         for (Iterator i = list.iterator(); i.hasNext();) {
            _putObject(new Fqn(fqn, new Integer(idx++)), i.next());
         }
         //        } else if (CachedType.isImmediate(obj.getClass())) {

         put(fqn, AOPInstance.KEY, new AOPInstance(obj));
      } else if (obj instanceof Set) {
         if (log.isDebugEnabled()) {
            log.debug("putObject(): aspectized obj is a Set type of size: "
                  + ((Set) obj).size());
         }

         Set set = (Set) obj;
         // Need to remove the existing ones first.
         removeObject(fqn);
         put(fqn, CLASS_INTERNAL, obj.getClass());

         // Let's replace it with a proxy if necessary
         if( !(obj instanceof ClassProxy)) {
            Class clazz = obj.getClass();
            try {
               obj=CollectionInterceptorUtil.createProxy(clazz, new CachedSetInterceptor(this, fqn, clazz));
            } catch (Exception e) {
               throw new CacheException("failure creating proxy", e);
            }
         }

         int idx = 0;
         for (Iterator i = set.iterator(); i.hasNext();) {
            _putObject(new Fqn(fqn, new Integer(idx++)), i.next());
         }
         //        } else if (CachedType.isImmediate(obj.getClass())) {

         put(fqn, AOPInstance.KEY, new AOPInstance(obj));
      } else if (obj instanceof Serializable) { // must be Serializable, including pritimive types
         if (log.isDebugEnabled()) {
            log.debug("putObject(): obj (" + obj.getClass() + ") is non-advisable but is Serializable. ");
         }

         if (!isSameClass)
            put(fqn, CLASS_INTERNAL, obj.getClass());

         put(fqn, AOPInstance.KEY, new AOPInstance(obj));
         put(fqn, "value", obj);
      }

      return oldValue;
   }

   /**
    * Check whether the object type is valid. An object type is valid if it is either: aspectized,
    * Serializable, or primitive type. Otherwise a runtime exception is thrown.
    *
    * @param obj
    */
   protected void checkObjectType(Object obj) {
      if(obj == null) return;
      if( ! (obj instanceof Advised) ) {
          if( !(obj instanceof Serializable ) ) {
               throw new IllegalArgumentException("TreeCacheAop.putObject(): Object type is neither " +
                  " aspectized nor Serializable. Object class name is " +obj.getClass().getName());
          }
      }
   }

   protected Fqn checkCircularReference(Fqn fqn, InstanceAdvisor advisor, Object obj)
   {
      if (advisor == null) return null; // this can be a case of non-advised POJO.

      Fqn originalFqn = null;
      // Step Check for cross references
      org.jboss.aop.advice.Interceptor interceptor = findCacheInterceptor(advisor);
      if (interceptor == null) {
         return null;
      }

      // ah, found something. So this will be multiple referenced.
      originalFqn = ((CacheInterceptor) interceptor).getFqn();

      // if it is a circular reference,
      // we will simply return the fqn associated that
      if (fqn.isChildOf(originalFqn)) {
         if (log.isDebugEnabled()) {
            log.debug("checkCircularReference(): is child for circular ref fqn " + originalFqn);
         }
         return originalFqn;
      }
      return null;
   }

   /**
    * Handle cases where there is circular, e.g., parent refers to child and child to parent, or multiple references,
    * e.g., two objects reference the same
    * sub-object. In this case, we will remove the current sub-object and substitue with a ref. The
    * reference will point to an Jboss internal node where the sub-object is re-created and stored.
    * It will also have reference counting to keep track of garbage collection.
    *
    * @param fqn     Current fqn to store this sub-object
    * @param advisor The associated Advisor instance
    * @return The fqn that refers to the sub-object stored in JBoss internal node.
    */
   protected Fqn handleObjectGraph(Fqn fqn, InstanceAdvisor advisor, CachedType type, Object obj) throws CacheException {
      if (advisor == null) return null; // this can be a case of non-advised POJO.

      Fqn originalFqn = null;
      Fqn internalFqn = null;
      // Step Check for cross references
      org.jboss.aop.advice.Interceptor interceptor = findCacheInterceptor(advisor);
      if (interceptor == null) {
         if (log.isDebugEnabled()) {
            log.debug("handleMultipleReference(): No multiple refernce found for fqn: " + fqn);
         }
         // Nothing found. Means this is a fresh object. No need to handle.
         return null;
      }

      // ah, found something. So this will be multiple referenced.
      originalFqn = ((CacheInterceptor) interceptor).getFqn();
       // If we are the same fqn, this is not multiple referenced!
      // This can be the case when we will reconstruct the node behind the scence.
      if( fqn.equals(originalFqn) ) return null;

      if (log.isDebugEnabled()) {
         log.debug("handleObjectGraph(): Found multiple refernce at original fqn: " + originalFqn);
      }

      // Step Check if the reference node locates under JBoss internal already.
      // Or if it is a circular reference.
      // If it is, we will simply return the fqn associated that
      if (originalFqn.isChildOf(JBOSS_INTERNAL)) {
         if (log.isDebugEnabled()) {
            log.debug("handleObjectGraph(): is child for fqn " + originalFqn);
         }
         return originalFqn;
      } else {
         // If not, we will need to create the sub-object under JBoss internal
         internalFqn = createInternalNode(originalFqn);
         // Then we will remove the original sub-object node (leaves only AopInstance)
         Object oldValue = removeObject(originalFqn);
         // Put it under JBoss internal
         putObject(internalFqn, obj);
         // Need to call this after putObject so interceptor will be there.
         AOPInstance instance = new AOPInstance();
         setRefFqn(instance, internalFqn.toString());
         put(originalFqn, AOPInstance.KEY, instance);  // put aopInstance back to indicate a ref
         put(originalFqn, CLASS_INTERNAL, type.getType());
         if (log.isDebugEnabled()) {
            log.debug("handleObjectGraph(): relocate the original fqn: " + originalFqn +
                  " to JBossInternal: " + internalFqn + " with obj: " + oldValue);
//                log.debug("handleObjectGraph(): print cache nodes details: " +printDetails());
         }
         // Finally, we return with the refFqn.
         return internalFqn;
      }
   }

   /**
    * Find cache interceptor with exact fqn.
    * @param advisor
    * @param fqn
    * @return
    */
   protected Interceptor findCacheInterceptor(InstanceAdvisor advisor, Fqn fqn)
   {
      org.jboss.aop.advice.Interceptor[] interceptors = advisor.getInterceptors();
      // Step Check for cross references
      for (int i = 0; i < interceptors.length; i++) {
         Interceptor interceptor = interceptors[i];
         if (interceptor instanceof CacheInterceptor) {
            CacheInterceptor inter = (CacheInterceptor)interceptor;
            if (inter != null && inter.getFqn().equals(fqn))
            {
               return interceptor;
            }
         }
      }
      return null;
   }

   /**
    * Find existing cache interceptor. Since there is supposedly only one cache interceptor per
    * pojo, this call should suffice. In addition, in cases of cross or circular reference,
    * fqn can be different anyway.
    * @param advisor
    * @return
    */
   protected Interceptor findCacheInterceptor(InstanceAdvisor advisor)
   {
      org.jboss.aop.advice.Interceptor[] interceptors = advisor.getInterceptors();
      // Step Check for cross references
      for (int i = 0; i < interceptors.length; i++) {
         Interceptor interceptor = interceptors[i];
         if (interceptor instanceof CacheInterceptor) {
               return interceptor;
         }
      }
      return null;
   }

   // TODO Need to mangle the name to obtain uniqueness
   protected Fqn createInternalNode(Fqn storedFqn)
   {
      Fqn fqn = new Fqn(JBOSS_INTERNAL, storedFqn);
      return fqn;
   }

   /**
    * Retrieve the aop-enabled object from the cache.
    *
    * @param fqn String name that associates with this node.
    * @return Current content value. Null if does not exist.
    * @throws CacheException
    * @jmx.managed-operation
    */
   public Object getObject(String fqn) throws CacheException
   {
      return getObject(Fqn.fromString(fqn));
   }

   protected boolean hasCurrentTransaction()
   {
      try {
         if(getCurrentTransaction() != null || localTm_.getTransaction() != null)
         {
            // We have transaction context. Return null to signify don't do anything
            return true;
         }
      } catch (SystemException e) {
         e.printStackTrace();
      }
      return false;
   }

   /**
    * Retrieve the aop-enabled object from the cache. Return null if object does not exist in the cache.
    *
    * @param fqn Instance that associates with this node.
    * @return Current content value. Null if does not exist.
    * @throws CacheException
    * @jmx.managed-operation
    */
   public Object getObject(Fqn fqn) throws CacheException
   {
      if( hasCurrentTransaction() )  // We have a transaction context going on now.
      {
         Object obj = _getObject(fqn);
         return obj;
      } else
      {
         // Start a new transaction, we need transaction so the operation is atomic.
         // getObject may not need it in the future.
         try
         {
            localTm_.begin();
            // TODO Need to find a btter way to lock the node.
            put(fqn, DUMMY, DUMMY);
            Object obj = _getObject(fqn);
            remove(fqn, DUMMY);
            return obj;
         }
         catch (Exception e)
         {
            e.printStackTrace();
            try
            {
               localTm_.setRollbackOnly();
            }
            catch (Exception exn)
            {
               exn.printStackTrace();
            }
            // We will need to alert Tomcat of this exception.
            throw new NestedRuntimeException("TreeCacheAop.getObject(): ", e);
         }
         finally
         {
            endTransaction();
         }

      }
   }

   private Object _getObject(Fqn fqn) throws CacheException
   {
      AOPInstance aopInstance = (AOPInstance) peek(fqn, AOPInstance.KEY);

      if (aopInstance != null && aopInstance.get() != null) {
         // we already have an advised instance
         return aopInstance.get();
      }

      // the class attribute is implicitly stored as an immutable read-only attribute
      Class clazz = (Class) peek(fqn, CLASS_INTERNAL);
      //  clazz and aopInstance can be not null if this node is the replicated brother node.
      if (clazz == null || aopInstance == null)
         return null;

      CachedType type = getCachedType(clazz);
      Object obj = null;
      if (Advised.class.isAssignableFrom(clazz)) {
         String refFqn = aopInstance.getRefFqn();
         if (refFqn == null) { // Create a new instance and also add in the CacheInterceptor for this fqn.
            try {
               obj = clazz.newInstance();
            }
            catch(Exception e) {
               throw new CacheException("failed creating instance of " + clazz.getName(), e);
            }
            // Insert interceptor at runtime
            InstanceAdvisor advisor = ((Advised) obj)._getInstanceAdvisor();
            advisor.appendInterceptor(new CacheInterceptor(this, fqn, type));
         } else {
            // this is recursive. Need to obtain the object from parent fqn
            // No need to add CacheInterceptor as a result. Everything is re-directed.
            // In addition, this op will not be recursive.
            if (log.isDebugEnabled()) {
               log.debug("getObject(): obtain value from reference fqn: " + refFqn);
            }
            obj = getObject(refFqn);
         }

      } else { // Must be Collection classes. We will use aop.ClassProxy instance instead.
         try {
            if(Map.class.isAssignableFrom(clazz)) {
               obj=CollectionInterceptorUtil.createProxy(clazz, new CachedMapInterceptor(this, fqn, clazz));
            } else if(List.class.isAssignableFrom(clazz)) {
               obj=CollectionInterceptorUtil.createProxy(clazz, new CachedListInterceptor(this, fqn, clazz));
            } else if(Set.class.isAssignableFrom(clazz)) {
               obj=CollectionInterceptorUtil.createProxy(clazz, new CachedSetInterceptor(this, fqn, clazz));
            } else {
               // Maybe it is just a serialized object.
               obj=peek(fqn, "value");
            }
         }
         catch(Exception e) {
            throw new CacheException("failure creating proxy", e);
         }
      }

      if (aopInstance == null) {
         // shouldn't happen since we serialize AopInstance now.
         throw new RuntimeException("getObject(): aopInstance is null");
      }
      aopInstance.set(obj);
      return obj;
   }

   /**
    * Remove aop-enabled object from the cache.
    *
    * @param fqn String name that associates with this node.
    * @return Value object from this node.
    * @throws CacheException
    * @jmx.managed-operation
    */
   public Object removeObject(String fqn) throws CacheException
   {
      return removeObject(Fqn.fromString(fqn));
   }

   /**
    * Remove aop-enabled object from the cache.
    *
    * @param fqn Instance that associates with this node.
    * @return Original value object from this node.
    * @throws CacheException
    * @jmx.managed-operation
    */
   public Object removeObject(Fqn fqn) throws CacheException
   {
      if( hasCurrentTransaction() )  // We have a transaction context going on now.
      {
         return _removeObject(fqn, true);
      } else
      {
         // Start a new transaction, we need transaction so the operation is atomic.
         try
         {
            localTm_.begin();
            // TODO Need to find a btter way to lock the node.
            put(fqn, DUMMY, DUMMY);
            Object obj = _removeObject(fqn, true);
            remove(fqn, DUMMY);
            return obj;
         }
         catch (Exception e)
         {
            e.printStackTrace();
            try
            {
               localTm_.setRollbackOnly();
            }
            catch (Exception exn)
            {
               exn.printStackTrace();
            }
            // We will need to alert Tomcat of this exception.
            throw new NestedRuntimeException("TreeCacheAop.removeObject(): ", e);
         }
         finally
         {
            endTransaction();
         }
      }
   }

   protected Object _removeObject(Fqn fqn, boolean removeCacheInterceptor) throws CacheException
   {
      if (log.isDebugEnabled()) {
         log.debug("removeObject(): removing object from fqn: " + fqn);
      }

      Class clazz = (Class) peek(fqn, CLASS_INTERNAL);
      // Let's trigger the WL.
//      Class clazz = (Class) remove(fqn, CLASS_INTERNAL);
      if (clazz == null)
      {
         if (log.isTraceEnabled()) {
            log.trace("removeObject(): clasz is null. fqn: " + fqn);
         }
         return null;
      }

      Object result = getObject(fqn);
      AOPInstance aopInstance = (AOPInstance) peek(fqn, AOPInstance.KEY);
      if (Advised.class.isAssignableFrom(clazz)) {
         String refFqn = aopInstance.getRefFqn();
         InstanceAdvisor advisor = ((Advised) result)._getInstanceAdvisor();
         // check if this is a refernce
         if (refFqn != null) {
            if (log.isDebugEnabled()) {
               log.debug("removeObject(): removing object fqn: " + fqn + " but is actually from ref fqn: " + refFqn);
            }
            removeRefFqn(aopInstance, refFqn, removeCacheInterceptor);
         } else {
            CachedType type = getCachedType(clazz);
            for (Iterator i = type.getFields().iterator(); i.hasNext();) {
               Field field = (Field) i.next();
               CachedType fieldType = getCachedType(field.getType());
               if (!fieldType.isImmediate()) {
                  Object obj = _removeObject(new Fqn(fqn, field.getName()), removeCacheInterceptor);
                  if (obj == null) continue;
               }
            }
            // Determine if we want to keep the interceptor for later use.
            if(removeCacheInterceptor) {
               CacheInterceptor interceptor = (CacheInterceptor) findCacheInterceptor(advisor);
//               if (log.isDebugEnabled()) {
//                  log.debug("removeObject(): removing cache interceptor fqn: " + fqn);
//               }
               // Remember to remove the interceptor from in-memory object but make sure it belongs to me first.
               if (interceptor != null)
               {
                  if (log.isDebugEnabled()) {
                     log.debug("removeObject(): removed cache interceptor fqn: " + fqn + " interceptor: "+interceptor);
                  }
                  advisor.removeInterceptor(interceptor.getName());
               }
            }
         }
      } else if (Map.class.isAssignableFrom(clazz)) {
         Map values = get(fqn).getChildren();
         if (values != null) {
            ArrayList list = new ArrayList(values.keySet());   // need to clone it first.
            for (int i=0; i < list.size(); i++) {
               Object key = list.get(i);
               _removeObject(new Fqn(fqn, key), removeCacheInterceptor);
            }
         }
      } else if (Collection.class.isAssignableFrom(clazz)) {
         Map values = get(fqn).getChildren();
         int size = values == null ? 0 : values.size();
         for (int i = 0; i < size; i++) {
            _removeObject(new Fqn(fqn, new Integer(i)), removeCacheInterceptor);
         }
      }

      // kind of brute force now.
      remove(fqn);
      // remove the interceptor as well.
      return result;
   }


   /**
    * Obtain a cache aop type for user to traverse the defined "primitive" types in aop.
    *
    * @param clazz The original pojo class
    * @return CachedType
    * @jmx.managed-operation
    */
   public synchronized CachedType getCachedType(Class clazz)
   {
      CachedType type = (CachedType) cachedTypes.get(clazz);
      if (type == null) {
         type = new CachedType(clazz);
         cachedTypes.put(clazz, type);
      }
      return type;
   }

   void setRefFqn(AOPInstance instance, String refFqn) throws CacheException {
      AOPInstance refInstance = (AOPInstance) peek(Fqn.fromString(refFqn), AOPInstance.KEY);
      synchronized (this) {
         refInstance.incrementRefCount();
         // Will need to propagate the change. Now why can't we advise this as well. :-)
         put(refFqn, AOPInstance.KEY, refInstance);
      }
      instance.setRefFqn(refFqn);
   }

   void removeRefFqn(AOPInstance instance, String refFqn, boolean removeCacheInterceptor) throws CacheException {
      AOPInstance refInstance = (AOPInstance) peek(Fqn.fromString(refFqn), AOPInstance.KEY);
      // take care of reference counting
      // Idea is to track reference counting only for object stored in JBossInternal.
      synchronized (this) {
         if (refInstance.decrementRefCount() == 0) {
            _removeObject(Fqn.fromString(refFqn), removeCacheInterceptor);
         } else {
            // Will need to propagate the change
            put(refFqn, AOPInstance.KEY, refInstance);
         }
      }

      instance.removeRefFqn();
   }

   /** Override the base class to provide some aop specific behaviors.
    *
    */
   public Object _put(GlobalTransaction tx, Fqn fqn, Object key, Object value,
                      boolean create_undo_ops) throws CacheException
   {
      Object result = super._put(tx, fqn, key, value, create_undo_ops);
      // Special case
      if (key == AOPInstance.KEY)
         return result;

      AOPInstance aopInstance = (AOPInstance) _get(fqn, AOPInstance.KEY, false);
      Object instance = null;
      if (aopInstance == null || (instance = aopInstance.get()) == null) {
         return result;
      }

      // Besides putting in the cache, also update the in-memory version as well.
      CachedType type = getCachedType((Class) _get(fqn, CLASS_INTERNAL, false));

      if(type.isImmediate()) return result;

      Field f = type.getField((String) key);
      if(f == null) return result;  // we know this is Serializable then.

      try {
         f.set(instance, value);
      } catch (Exception e) {
         throw new NestedRuntimeException(e);
      }

      return result;
   }

   /** Override to provide aop specific eviction.
    *
    * <p>
    * Called by eviction policy provider. Note that eviction is done only in local mode,
    * that is, it doesn't replicate the node removal. This is will cause the replcation nodes
    * not synchronizing, but it is ok since user is supposed to add the node again when get is
    * null. After that, the contents will be in sync.
    * @param fqn Will remove everythign assoicated with this fqn.
    * @throws CacheException
    */
   public void evict(Fqn fqn) throws CacheException {
      // We will remove all children nodes as well since we assume all children nodes are part
      // of this "object" node.
      if(isAopNode(fqn)) {
         if(log.isDebugEnabled()) {
            log.debug("evict(): evicting whole aop node " +fqn);
         }
//         _remove(null, fqn, create_undo_ops, false);
         // TODO Why do we not want to remove the interceptor??? I think we should.
         // Because if we remove it, the caller has no idea that it has been taken off.
         // She will still think that caching is in effect. But this returns null value from cache.
         // What's is a good policy?
         boolean removeCacheInterceptor = false;
         _removeObject(fqn, removeCacheInterceptor);
      } else {
         super.evict(fqn);
      }
   }

   private boolean isAopNode(Fqn fqn)
   {
      try {
         return (peek(fqn, AOPInstance.KEY) != null) ? true: false;
      } catch (Exception e) {
         log.warn("isAopNode(): cache get operation generated exception " +e);
         return false;
      }
   }
}