/*
 * JBoss, the OpenSource J2EE webOS
 *
 * Distributable under LGPL license.
 * See terms of license at gnu.org.
 */
package org.jboss.cache;

import org.jboss.cache.eviction.LRUPolicy;
import org.jboss.cache.interceptors.Interceptor;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.lock.IsolationLevel;
import org.jboss.cache.lock.LockStrategyFactory;
import org.jboss.cache.lock.LockingException;
import org.jboss.cache.lock.TimeoutException;
import org.jboss.logging.Logger;
import org.jboss.system.ServiceMBeanSupport;
import org.jboss.util.NestedRuntimeException;
import org.jgroups.*;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.MethodCall;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
import org.jgroups.util.Util;
import org.w3c.dom.Attr;
import org.w3c.dom.Element;
import org.w3c.dom.NamedNodeMap;
import org.w3c.dom.NodeList;

import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.lang.reflect.Method;
import java.util.*;

/**
 * A tree-like structure that is replicated across several members. Updates will
 * be multicast to all group members reliably and in the same order. User has the
 * option to set transaction isolation level now (e.g., <code>SERIALIZABLE</code>, or
 * <code>REPEATABLE_READ</code>.
 *
 * @author Bela Ban
 * @author Ben Wang
 * @version $Id: TreeCache.java,v 1.158.2.6 2005/04/04 05:44:13 bwang00 Exp $
 * @jmx.mbean extends="org.jboss.system.ServiceMBean"
 * <p/>
 */
public class TreeCache extends ServiceMBeanSupport implements TreeCacheMBean, Cloneable, MembershipListener {
   protected Node root=new Node(SEPARATOR, Fqn.fromString(SEPARATOR), null, null, this);
   protected final Vector listeners=new Vector();
   protected JChannel channel=null;

   /** Am I the coordinator ? */
   protected boolean coordinator=false;

   protected String cluster_name="TreeCache-Group";
   protected String cluster_props=null;
   protected final Vector members=new Vector();
   protected RpcDispatcher disp=null;
   protected MessageListener ml=new MessageListenerAdaptor(this, log);
   protected long state_fetch_timeout=5000;
   protected long sync_repl_timeout=15000;
   protected boolean use_repl_queue=false;
   protected int repl_queue_max_elements=1000;
   protected long repl_queue_interval=5000;

   /** Maintains mapping of transactions (keys) and Modifications/Undo-Operations */
   private final TransactionTable tx_table=new TransactionTable();

   /** HashMap<Thread, List<Lock>, maintains locks acquired by threads (used when no TXs are used) */
   private final HashMap lock_table=new HashMap();

   protected boolean fetch_state_on_startup=true;
   protected long lock_acquisition_timeout=10000;
   protected String eviction_policy_class=null;
   protected TreeCacheListener eviction_policy_provider = null;
   protected int cache_mode=LOCAL;


   public static Method putDataMethodLocal=null;
   public static Method putDataEraseMethodLocal=null;
   public static Method putKeyValMethodLocal=null;
   public static Method putFailFastKeyValueMethodLocal=null;
   public static Method removeNodeMethodLocal=null;
   public static Method removeKeyMethodLocal=null;
   public static Method removeDataMethodLocal=null;
   public static Method evictNodeMethodLocal=null;
   // public static Method evictKeyValueMethodLocal=null;
   public static Method prepareMethod=null;
   public static Method commitMethod=null;
   public static Method rollbackMethod=null;
   public static Method replicateMethod=null;
   public static Method replicateAllMethod=null;
   // public static Method addChildMethod=null;
   public static Method addChildMethodLocal=null;
   public static Method getKeyValueMethodLocal=null;
   public static Method getNodeMethodLocal=null;
   public static Method getKeysMethodLocal=null;
   public static Method getChildrenNamesMethodLocal=null;
   public static Method releaseAllLocksMethodLocal=null;
   public static Method printMethodLocal=null;
   public static Method lockMethodLocal=null;

   static LinkedList    crud_methods=new LinkedList();
   protected boolean    isStateSet=false;
   private final Object stateLock=new Object();
   protected IsolationLevel isolationLevel=IsolationLevel.REPEATABLE_READ;

   /** Eviction policy configuration in xml Element */
   protected Element evictConfig_ = null;



   public MessageListener getMessageListener() {
      return ml;
   }

   /** {@link #invokeMethod(MethodCall)} will dispatch to this chain of interceptors.
    * In the future, this will be replaced with JBossAop. This is a first step towards refactoring JBossCache.
    */
   protected Interceptor interceptor_chain=null;


   /**
    * Interceptor which handles invocations of {@link #_replicate(MethodCall)}. Any such method
    * invocation is forwarded to the invoke_handler.<br/>
    * This will go away in the future, as we're moving replication functionality into the
    * ReplicationInterceptor itself
    */
   protected Replicatable replication_handler=null;


   /** Method to acquire a TransactionManager. By default we use JBossTransactionManagerLookup. Has
    * to be set before calling {@link #start()} */
   protected TransactionManagerLookup tm_lookup=null;

   /** Class of the implementation of TransactionManagerLookup */
   protected String tm_lookup_class=null;

   /** Used to get the Transaction associated with the current thread */
   protected TransactionManager tm=null;

   /** The fully qualified name of the CacheLoader (has to implement the CacheLoader interface) */
   protected String cache_loader_class=null;

   /** A reference to the CacheLoader. If null, we don't have a CachedLoader */
   protected CacheLoader cache_loader=null;

   /** The properties from which to configure the CacheLoader */
   protected Properties cache_loader_config=null;

   /** Are the CacheLoaders sharing the same resource or not ? */
   protected boolean cache_loader_shared=true;

   /** List<Fqn> of nodes to preload (if cache loader is enabled) */
   protected List cache_loader_preload=null;

   /** Fetches the transient state. Attribute fetch_cache_on_startup has to be true */
   protected boolean cache_loader_fetch_transient_state=true;

   /** Fetches the entire persistent state from the underlying CacheLoader. Only used if cache_loader_shared=false.
    * Attribute fetch_cache_on_startup has to be true */
   protected boolean cache_loader_fetch_persistent_state=true;

   /** synchronous or asynchrous commit phase ? */
   protected boolean sync_commit_phase=false;

   /** synchronous or asynchrous rollback phase ? */
   protected boolean sync_rollback_phase=false;

   protected boolean deadlockDetection=false;

   /** Queue used to replicate updates when mode is repl-async */
   protected ReplicationQueue repl_queue=null;

   public static final String SEPARATOR="/";

   /** Entries in the cache are by default local; ie. not replicated */
   public static final int LOCAL=1;

   /** Entries in the cache are by default replicated (asynchronously) */
   public static final int REPL_ASYNC=2;

   /** Entries in the cache are by default replicated (synchronously) */
   public static final int REPL_SYNC=3;

   static public final String UNINITIALIZED="jboss:internal:uninitialized"; // todo: move to CacheLoaderInterceptor
   static final String JNDI_LOCATOR_URI="socket://localhost:6789";


   static {
      try {
         putDataMethodLocal=TreeCache.class.getDeclaredMethod("_put",
                                                   new Class[]{GlobalTransaction.class,
                                                               Fqn.class,
                                                               Map.class,
                                                               boolean.class});
         putDataEraseMethodLocal=TreeCache.class.getDeclaredMethod("_put",
                                                         new Class[]{GlobalTransaction.class,
                                                                     Fqn.class,
                                                                     Map.class,
                                                                     boolean.class,
                                                                     boolean.class});
         putKeyValMethodLocal=TreeCache.class.getDeclaredMethod("_put",
                                                      new Class[]{GlobalTransaction.class,
                                                                  Fqn.class,
                                                                  Object.class,
                                                                  Object.class,
                                                                  boolean.class});
         putFailFastKeyValueMethodLocal=TreeCache.class.getDeclaredMethod("_put",
                                                                          new Class[]{GlobalTransaction.class,
                                                                          Fqn.class,
                                                                          Object.class,
                                                                          Object.class,
                                                                          boolean.class,
                                                                          long.class});
         removeNodeMethodLocal=TreeCache.class.getDeclaredMethod("_remove",
                                                      new Class[]{GlobalTransaction.class,
                                                                  Fqn.class,
                                                                  boolean.class});
         removeKeyMethodLocal=TreeCache.class.getDeclaredMethod("_remove",
                                                     new Class[]{GlobalTransaction.class,
                                                                 Fqn.class,
                                                                 Object.class,
                                                                 boolean.class});
         removeDataMethodLocal=TreeCache.class.getDeclaredMethod("_removeData",
                                                      new Class[]{GlobalTransaction.class,
                                                                  Fqn.class,
                                                                  boolean.class});
         evictNodeMethodLocal=TreeCache.class.getDeclaredMethod("_evict", new Class[] {Fqn.class});
         // evictKeyValueMethodLocal=TreeCache.class.getDeclaredMethod("_evict", new Class[]{Fqn.class, Object.class});
         prepareMethod=TreeCache.class.getDeclaredMethod("prepare",
                                                  new Class[]{GlobalTransaction.class,
                                                              List.class,
                                                              Address.class,
                                                              boolean.class});
         commitMethod=TreeCache.class.getDeclaredMethod("commit",
                                                 new Class[]{GlobalTransaction.class});
         rollbackMethod=TreeCache.class.getDeclaredMethod("rollback",
                                                   new Class[]{GlobalTransaction.class});
         addChildMethodLocal=TreeCache.class.getDeclaredMethod("_addChild",
                                                   new Class[]{GlobalTransaction.class,
                                                               Fqn.class, Object.class, Node.class});
         getKeyValueMethodLocal=TreeCache.class.getDeclaredMethod("_get",
                                                                  new Class[]{Fqn.class, Object.class, boolean.class});
         getNodeMethodLocal=TreeCache.class.getDeclaredMethod("_get", new Class[]{Fqn.class});
         getKeysMethodLocal=TreeCache.class.getDeclaredMethod("_getKeys", new Class[]{Fqn.class});
         getChildrenNamesMethodLocal=TreeCache.class.getDeclaredMethod("_getChildrenNames", new Class[]{Fqn.class});
         replicateMethod=TreeCache.class.getDeclaredMethod("_replicate", new Class[]{MethodCall.class});
         replicateAllMethod=TreeCache.class.getDeclaredMethod("_replicate", new Class[]{List.class});
         releaseAllLocksMethodLocal=TreeCache.class.getDeclaredMethod("_releaseAllLocks", new Class[]{Fqn.class});
         printMethodLocal=TreeCache.class.getDeclaredMethod("_print", new Class[]{Fqn.class});
         lockMethodLocal=TreeCache.class.getDeclaredMethod("_lock", new Class[]{Fqn.class,
                                                                                int.class,
                                                                                boolean.class});
      }
      catch(NoSuchMethodException ex) {
         ex.printStackTrace();
         throw new ExceptionInInitializerError(ex.toString());
      }

      crud_methods.add(putDataMethodLocal);
      crud_methods.add(putDataEraseMethodLocal);
      crud_methods.add(putKeyValMethodLocal);
      crud_methods.add(putFailFastKeyValueMethodLocal);
      crud_methods.add(removeNodeMethodLocal);
      crud_methods.add(removeKeyMethodLocal);
      crud_methods.add(removeDataMethodLocal);
   }




   public static boolean isCrudMethod(Method m) {
      return m == null? false : crud_methods.contains(m);
   }



   /**
    * Creates a channel with the given properties. Connects to the channel, then creates a PullPushAdapter
    * and starts it
    */
   public TreeCache(String cluster_name, String props, long state_fetch_timeout) throws Exception {
      super();
      if(cluster_name != null)
         this.cluster_name=cluster_name;
      if(props != null)
         this.cluster_props=props;
      this.state_fetch_timeout=state_fetch_timeout;
   }

   public TreeCache() throws Exception {
      super();
//      try {
//         Naming.rebind("//localhost:1098/" + this.getClusterName(), new RemoteTreeCacheImpl(this));
//      }
//      catch(Throwable t) {
//         log.error("Unable to bind remote tree cache implementation to '" + this.getClusterName() + "'.", t);
//      }
   }

   /**
    * Expects an already connected channel. Creates a PullPushAdapter and starts it
    */
   public TreeCache(JChannel channel) throws Exception {
      super();
      this.channel=channel;
   }

   /**
    * Used by interceptors. Don't use as client, will go away: interceptors will use TreeCacheImpl and clients
    * will only be able to use TreeCache (which will become an interface)
    * @return
    */
   public Node getRoot() {
      return root;
   }

   /**
    * @return
    * @jmx.managed-attribute access="read-only"
    */
   public Object getLocalAddress() {
      return channel != null ? channel.getLocalAddress() : null;
   }

   /**
    * @return
    * @jmx.managed-attribute access="read-only"
    */
   public Vector getMembers() {
      return members;
   }

   /**
    *
    * @return
    * @jmx.managed-attribute
    */
   public boolean isCoordinator() {
      return coordinator;
   }

   /**
    * Get the name of the replication group
    * @jmx.managed-attribute
    */
   public String getClusterName() {
      return cluster_name;
   }

   /**
    * Set the name of the replication group
    * @jmx.managed-attribute
    */
   public void setClusterName(String name) {
      cluster_name=name;
   }

   /**
    * Get the cluster properties (e.g. the protocol stack specification in case of JGroups)
    * @jmx.managed-attribute
    */
   public String getClusterProperties() {
      return cluster_props;
   }

   /**
    * Set the cluster properties. If the cache is to use the new properties, it has to be redeployed
    * @param cluster_props The properties for the cluster (JGroups)
    * @jmx.managed-attribute
    */
   public void setClusterProperties(String cluster_props) {
      this.cluster_props=cluster_props;
   }


   public TransactionTable getTransactionTable() {
      return tx_table;
   }


   public HashMap getLockTable() {
      return lock_table;
   }

   /**
    * Dumps the contents of the TransactionTable
    * @return
    * @jmx.managed-attribute
    */
   public String dumpTransactionTable() {
      return tx_table.toString(true);
   }

   /**
    *
    * @return
    * @jmx.managed-attribute
    */
   public boolean getDeadlockDetection() {
      return deadlockDetection;
   }

   /**
    *
    * @param dt
    * @jmx.managed-attribute
    */
   public void setDeadlockDetection(boolean dt) {
      deadlockDetection=dt;
      if(disp != null)
         disp.setDeadlockDetection(dt);
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public String getInterceptorChain() {
      String retval=printInterceptorChain(interceptor_chain);
      if(retval == null || retval.length() == 0)
         return "<empty>";
      else
         return retval;
   }


   /**
    * @return List<Interceptor>
    * @jmx.managed-attribute
    */
   public List getInterceptors() {
      if(interceptor_chain == null)
         return null;
      int num=1;
      Interceptor tmp=interceptor_chain;
      while((tmp=tmp.getNext()) != null) {
         num++;
      }
      List retval=new ArrayList(num);
      tmp=interceptor_chain;
      num=0;
      do {
         retval.add(tmp);
         tmp=tmp.getNext();
      }
      while(tmp != null);
      return retval;
   }




   /**
    * @return
    * @jmx.managed-attribute
    */
   public String getCacheLoaderClass() {
      return cache_loader_class;
   }

   /**
    *
    * @param cache_loader_class
    * @jmx.managed-attribute
    */
   public void setCacheLoaderClass(String cache_loader_class) {
      this.cache_loader_class=cache_loader_class;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public Properties getCacheLoaderConfig() {
      return cache_loader_config;
   }

   /**
    * @param cache_loader_config
    * @jmx.managed-attribute
    */
   public void setCacheLoaderConfig(Properties cache_loader_config) {
      this.cache_loader_config=cache_loader_config;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public CacheLoader getCacheLoader() {
      return cache_loader;
   }

   /**
    * @param cache_loader
    * @jmx.managed-attribute
    */
   public void setCacheLoader(CacheLoader cache_loader) {
      this.cache_loader=cache_loader;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public boolean getCacheLoaderShared() {
      return cache_loader_shared;
   }

   /**
    * @param shared
    * @jmx.managed-attribute
    */
   public void setCacheLoaderShared(boolean shared) {
      this.cache_loader_shared=shared;
   }


   /**
    * @param list
    * @jmx.managed-attribute
    */
   public void setCacheLoaderPreload(String list) {
      if(list == null) return;
      ArrayList l;
      StringTokenizer st=new StringTokenizer(list, ",");
      String tok;
      Fqn    fqn;
      l=new ArrayList();
      while(st.hasMoreTokens()) {
         tok=st.nextToken();
         fqn=Fqn.fromString(tok.trim());
         l.add(fqn);
      }
      if(l.size() > 0)
         this.cache_loader_preload=l;
   }



   /**
    * @return
    * @jmx.managed-attribute
    */
   public String getCacheLoaderPreload() {
      return cache_loader_preload != null? cache_loader_preload.toString() : null;
   }

   /**
    * @param flag
    * @jmx.managed-attribute
    */
   public void setCacheLoaderFetchPersistentState(boolean flag) {
      cache_loader_fetch_persistent_state=flag;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public boolean getCacheLoaderFetchPersistentState() {
      return cache_loader_fetch_persistent_state;
   }

   /**
    * @param flag
    * @jmx.managed-attribute
    */
   public void setCacheLoaderFetchTransientState(boolean flag) {
      cache_loader_fetch_transient_state=flag;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public boolean getCacheLoaderFetchTransientState() {
      return cache_loader_fetch_transient_state;
   }


   /**
    * @return
    * @jmx.managed-attribute
    */
   public boolean getSyncCommitPhase() {
      return sync_commit_phase;
   }

   /**
    * @param sync_commit_phase
    * @jmx.managed-attribute
    */
   public void setSyncCommitPhase(boolean sync_commit_phase) {
      this.sync_commit_phase=sync_commit_phase;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public boolean getSyncRollbackPhase() {
      return sync_rollback_phase;
   }

   /**
    * @param sync_rollback_phase
    * @jmx.managed-attribute
    */
   public void setSyncRollbackPhase(boolean sync_rollback_phase) {
      this.sync_rollback_phase=sync_rollback_phase;
   }


   /**
    * Setup eviction policy configuration
    * @jmx.managed-attribute access="write-only"
    */
   public void setEvictionPolicyConfig(Element config) {
      evictConfig_ = config;
      log.info("setEvictionPolicyConfig(): " +config);
   }

   public Element getEvictionPolicyConfig() {
      return evictConfig_;
   }

   /**
    * Convert a list of elements to the JG property string
    * @jmx.managed-attribute access="write-only"
    */
   public void setClusterConfig(Element config) {
      StringBuffer buffer=new StringBuffer();
      NodeList stack=config.getChildNodes();
      int length=stack.getLength();

      for(int s=0; s < length; s++) {
         org.w3c.dom.Node node=stack.item(s);
         if(node.getNodeType() != org.w3c.dom.Node.ELEMENT_NODE)
            continue;

         Element tag=(Element)node;
         String protocol=tag.getTagName();
         buffer.append(protocol);
         NamedNodeMap attrs=tag.getAttributes();
         int attrLength=attrs.getLength();
         if(attrLength > 0)
            buffer.append('(');
         for(int a=0; a < attrLength; a++) {
            Attr attr=(Attr)attrs.item(a);
            String name=attr.getName();
            String value=attr.getValue();
            buffer.append(name);
            buffer.append('=');
            buffer.append(value);
            if(a < attrLength - 1)
               buffer.append(';');
         }
         if(attrLength > 0)
            buffer.append(')');
         buffer.append(':');
      }
      // Remove the trailing ':'
      buffer.setLength(buffer.length() - 1);
      setClusterProperties(buffer.toString());
      log.info("setting cluster properties from xml to: " + cluster_props);
   }




   /**
    * Get the max time to wait until the initial state is retrieved.
    * This is used in a replicating cache: when a new cache joins the cluster,
    * it needs to acquire the (replicated) state of the other members to
    * initialize itself. If no state has been received within <tt>timeout</tt>
    * milliseconds, the map will be empty.
    *
    * @return long Number of milliseconds to wait for the state. 0 means to wait forever.
    * @jmx.managed-attribute
    */
   public long getInitialStateRetrievalTimeout() {
      return state_fetch_timeout;
   }

   /**
    * Set the initial state transfer timeout
    * (see {@link #getInitialStateRetrievalTimeout()})
    * @jmx.managed-attribute
    */
   public void setInitialStateRetrievalTimeout(long timeout) {
      state_fetch_timeout=timeout;
   }

   /**
    * Returns the current caching mode. Valid values are
    * <ul>
    * <li>LOCAL
    * <li>REPL_ASYNC
    * <li>REPL_SYNC
    * <ul>
    * @return String The caching mode
    * @jmx.managed-attribute
    */
   public String getCacheMode() {
      return mode2String(cache_mode);
   }

   public int getCacheModeInternal() {
      return cache_mode;
   }

   private String mode2String(int mode) {
      switch(mode) {
         case LOCAL:
            return "LOCAL";
         case REPL_ASYNC:
            return "REPL_ASYNC";
         case REPL_SYNC:
            return "REPL_SYNC";
         default:
            throw new RuntimeException("setCacheMode(): caching mode " + mode + " is invalid");
      }
   }

   /**
    * Sets the default caching mode)
    * @jmx.managed-attribute
    */
   public void setCacheMode(String mode) throws Exception {
      int m=string2Mode(mode);
      setCacheMode(m);
   }


   /**
    * Sets the default cache mode. Valid arguments are
    * <ol>
    * <li>TreeCache.LOCAL
    * <li>TreeCache.REPL_ASYNC
    * <li>TreeCache.REPL_SYNC
    * </ol>
    * @param mode
    */
   public void setCacheMode(int mode) {
      if(mode == LOCAL || mode == REPL_ASYNC || mode == REPL_SYNC)
         this.cache_mode=mode;
      else
         throw new IllegalArgumentException("setCacheMode(): caching mode " + mode + " is invalid");
   }


   /**
    * Returns the default max timeout after which synchronous replication calls return.
    * @return long Number of milliseconds after which a sync repl call must return. 0 means to wait forever
    * @jmx.managed-attribute
    */
   public long getSyncReplTimeout() {
      return sync_repl_timeout;
   }

   /**
    * Sets the default maximum wait time for synchronous replication to receive all results
    * @jmx.managed-attribute
    */
   public void setSyncReplTimeout(long timeout) {
      sync_repl_timeout=timeout;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public boolean getUseReplQueue() {
      return use_repl_queue;
   }

   /**
    * @param flag
    * @jmx.managed-attribute
    */
   public void setUseReplQueue(boolean flag) {
      use_repl_queue=flag;
      if(flag) {
         if(repl_queue == null) {
            repl_queue=new ReplicationQueue(this, repl_queue_interval, repl_queue_max_elements);
            if(repl_queue_interval >= 0)
               repl_queue.start();
         }
      }
      else {
         if(repl_queue != null) {
            repl_queue.stop();
            repl_queue=null;
         }
      }
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public long getReplQueueInterval() {
      return repl_queue_interval;
   }

   /**
    * @param interval
    * @jmx.managed-attribute
    */
   public void setReplQueueInterval(long interval) {
      this.repl_queue_interval=interval;
      if(repl_queue != null)
         repl_queue.setInterval(interval);
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public int getReplQueueMaxElements() {
      return repl_queue_max_elements;
   }

   /**
    * @param max_elements
    * @jmx.managed-attribute
    */
   public void setReplQueueMaxElements(int max_elements) {
      this.repl_queue_max_elements=max_elements;
      if(repl_queue != null)
         repl_queue.setMax_elements(max_elements);
   }


   public ReplicationQueue getReplQueue() {
      return repl_queue;
   }

   /**
    * Returns the transaction isolation level.
    * @jmx.managed-attribute
    */
   public String getIsolationLevel() {
      return isolationLevel.toString();
   }

   /**
    * Set the transaction isolation level. This determines the locking strategy to be used
    * @jmx.managed-attribute
    */
   public void setIsolationLevel(String level) {
      IsolationLevel tmp_level=IsolationLevel.stringToIsolationLevel(level);

      if(tmp_level == null) {
         throw new IllegalArgumentException("TreeCache.setIsolationLevel(): level \"" + level + "\" is invalid");
      }
      setIsolationLevel(tmp_level);
   }

   /**
    * @param level
    */
   public void setIsolationLevel(IsolationLevel level) {
      isolationLevel=level;
      LockStrategyFactory.setIsolationLevel(level);
   }

   public IsolationLevel getIsolationLevelClass() {
      return isolationLevel;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public boolean getFetchStateOnStartup() {
      return fetch_state_on_startup;
   }


   /**
    * @param flag
    * @jmx.managed-attribute
    */
   public void setFetchStateOnStartup(boolean flag) {
      fetch_state_on_startup=flag;
   }


   /**
    * Default max time to wait for a lock. If the lock cannot be acquired within this time, a LockingException will be thrown.
    * @return long Max number of milliseconds to wait for a lock to be acquired
    * @jmx.managed-attribute
    */
   public long getLockAcquisitionTimeout() {
      return lock_acquisition_timeout;
   }

   /**
    * Set the max time for lock acquisition. A value of 0 means to wait forever (not recomended).
    * Note that lock acquisition timeouts may be removed in the future when we have deadlock detection.
    * @param timeout
    * @jmx.managed-attribute
    */
   public void setLockAcquisitionTimeout(long timeout) {
      this.lock_acquisition_timeout=timeout;
   }



   /**
    * Returns the name of the cache eviction policy (must be an implementation of EvictionPolicy)
    * @return Fully qualified name of a class implementing the EvictionPolicy interface
    * @jmx.managed-attribute
    */
   public String getEvictionPolicyClass() {
      return eviction_policy_class;
   }

   /**
    * Sets the classname of the eviction policy
    * @jmx.managed-attribute
    */
   public void setEvictionPolicyClass(String eviction_policy_class) {
      if(eviction_policy_class == null || eviction_policy_class.length() ==0)
         return;
      try {
         this.eviction_policy_class=eviction_policy_class;
         eviction_policy_provider =(TreeCacheListener)
               getClass().getClassLoader().loadClass(eviction_policy_class).newInstance();
         this.addTreeCacheListener(eviction_policy_provider );
      }
      catch(Throwable t) {
         log.error("setEvictionPolicyClass(): failed creating instance of  " + eviction_policy_class, t);
      }
   }

   /**
    * Obtain eviction thread (if any) wake up interval in seconds
    * @jmx.managed-attribute
    */
   public int getEvictionThreadWakeupIntervalSeconds() {
      if( eviction_policy_provider == null ) return -1;
      else
         return ((LRUPolicy)eviction_policy_provider).getWakeupIntervalSeconds();
   }


   /**
    * Sets the TransactionManagerLookup object
    * @param l
    * @jmx.managed-attribute
    */
   public void setTransactionManagerLookup(TransactionManagerLookup l) {
      this.tm_lookup=l;
   }


   /**
    * @return
    * @jmx.managed-attribute
    */
   public String getTransactionManagerLookupClass() {
      return tm_lookup_class;
   }

   /**
    * Sets the class of the TransactionManagerLookup impl. This will attempt to create an
    * instance, and will throw an exception if this fails.
    * @param cl
    * @throws Exception
    * @jmx.managed-attribute
    */
   public void setTransactionManagerLookupClass(String cl) throws Exception {
      this.tm_lookup_class=cl;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public TransactionManager getTransactionManager() {
      return tm;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public TreeCache getInstance() {
      return this;
   }



   public void setReplicationHandler(Replicatable handler) {
      replication_handler=handler;
   }

   public Replicatable getReplicationHandler() {
      return replication_handler;
   }

   /**
    * Fetch the group state from the current coordinator. If successful, this will trigger setState().
    * @jmx.managed-operation
    */
   public void fetchState(long timeout) throws ChannelClosedException, ChannelNotConnectedException {
      if(channel == null)
         throw new ChannelNotConnectedException();
      boolean rc=channel.getState(null, timeout);
      if(rc)
         log.info("fetchState(): state was retrieved successfully");
      else
         log.info("fetchState(): state could not be retrieved (first member)");
   }

   /**
    * @param listener
    * @jmx.managed-operation
    */
   public void addTreeCacheListener(TreeCacheListener listener) {
      if(!listeners.contains(listener))
         listeners.addElement(listener);
   }

   /**
    * @param listener
    * @jmx.managed-operation
    */
   public void removeTreeCacheListener(TreeCacheListener listener) {
      listeners.removeElement(listener);
   }

   /* --------------------------- MBeanSupport ------------------------- */

   /**
    *
    * @throws Exception
    * @jmx.managed-operation
    */
   public void createService() throws Exception {
   }

   /**
    * @jmx.managed-operation
    */
   public void destroyService() {
   }


   /**
    *
    * @throws Exception
    * @jmx.managed-operation
    */
   public void startService() throws Exception {
      if(this.tm_lookup == null && this.tm_lookup_class != null) {
         Class clazz=Thread.currentThread().getContextClassLoader().loadClass(this.tm_lookup_class);
         this.tm_lookup=(TransactionManagerLookup)clazz.newInstance();
      }

      try {
         if(tm_lookup != null)
            tm=tm_lookup.getTransactionManager();
         else
            log.warn("No transaction manager lookup class has been defined. Transactions cannot be used");
      }
      catch(Exception e) {
         log.debug("failed looking up TransactionManager, will not use transactions", e);
      }

      // Create the cache loader (needs to be created regardless of mode (local/replicated). This method will
      // be a no-op if no cache loader has been configured
      createCacheLoader();

      // Create the interceptors in the correct order (later to be defined in XML file)
      createInterceptorChain();

      createEvictionPolicy();

      switch(cache_mode) {
         case LOCAL:
            log.info("cache mode is local, will not create the channel");
            break;
         case REPL_SYNC:
         case REPL_ASYNC:
            log.info("cache mode is " + mode2String(cache_mode));
            if(channel != null) { // already started
               log.info("channel is already running");
               return;
            }
            if(cluster_props == null) {
               cluster_props=getDefaultProperties();
               log.debug("setting cluster properties to default value");
            }

            channel=new JChannel(cluster_props);
            channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
            channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
            if(log.isTraceEnabled())
               log.trace("cache properties: " + cluster_props);
            channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
            disp=new RpcDispatcher(channel, ml, this, this);
            disp.setDeadlockDetection(deadlockDetection);
            channel.connect(cluster_name);
            if(fetch_state_on_startup) {
               fetchStateOnStartup();
            }
            break;
         default:
            throw new IllegalArgumentException("cache mode " + cache_mode + " is invalid");
      }

      // does the preloading
      cacheLoaderPreload();

      coordinator=determineCoordinator();
      notifyCacheStarted();
   }

   private void createEvictionPolicy() {
      // Configure if eviction policy is set
      if(eviction_policy_provider != null)
         ((LRUPolicy)eviction_policy_provider).configure(this);
   }

   /**
    * Assembles the interceptor stack. Presence and order of interceptors is determined by looking at
    * the cache configuration. In the future, this will be accessible through XML. See refactoring.txt for
    * details. An alternative might be to use a simple rools engine to assemble the stack.
    * Creates either:
    * <pre>
    *
    * CallInterceptor
    * LockInterceptor
    * [CacheLoaderInterceptor]
    * [ReplicationInterceptor]
    * [CacheStoreInterceptor]
    *
    * or
    *
    * CallInterceptor
    * LockInterceptor
    * [CacheStoreInterceptor]
    * [CacheLoaderInterceptor]
    * [ReplicationInterceptor]
    *
    * </pre>
    * CallInterceptor is always present at the top, the others may or may not be present
    */
   protected void createInterceptorChain() throws IllegalAccessException, InstantiationException, ClassNotFoundException {
      Interceptor call_interceptor=null;
      Interceptor lock_interceptor=null;
      // Interceptor create_if_not_exists_interceptor=null;
      Interceptor repl_interceptor=null;
      Interceptor cache_loader_interceptor=null;
      Interceptor cache_store_interceptor=null;
      Interceptor unlock_interceptor=null;
      Interceptor first=null;

      call_interceptor=createInterceptor("org.jboss.cache.interceptors.CallInterceptor");
      call_interceptor.setCache(this);

      lock_interceptor=createInterceptor("org.jboss.cache.interceptors.LockInterceptor");
      lock_interceptor.setCache(this);

      //create_if_not_exists_interceptor=createInterceptor("org.jboss.cache.interceptors.CreateIfNotExistsInterceptor");
      //create_if_not_exists_interceptor.setCache(this);

      unlock_interceptor=createInterceptor("org.jboss.cache.interceptors.UnlockInterceptor");
      unlock_interceptor.setCache(this);

      if(cache_mode != LOCAL) {
         repl_interceptor=createInterceptor("org.jboss.cache.interceptors.ReplicationInterceptor");
         repl_interceptor.setCache(this);
      }

      if(cache_loader_class != null || cache_loader != null) {
         cache_loader_interceptor=createInterceptor("org.jboss.cache.interceptors.CacheLoaderInterceptor");
         cache_loader_interceptor.setCache(this);
         cache_store_interceptor=createInterceptor("org.jboss.cache.interceptors.CacheStoreInterceptor");
         cache_store_interceptor.setCache(this);
      }


      // create the stack from the bottom up
      if(cache_loader_interceptor != null) {
         if(cache_loader_shared == true) {
            if(first == null)
               first=cache_store_interceptor;
            else
               addInterceptor(first, cache_store_interceptor);
         }
      }

      if(repl_interceptor != null) {
         if(first == null)
            first=repl_interceptor;
         else
            addInterceptor(first, repl_interceptor);
      }

      if(unlock_interceptor != null) {
         if(first == null)
            first=unlock_interceptor;
         else
            addInterceptor(first, unlock_interceptor);
      }

      if(cache_loader_interceptor != null) {
         if(cache_loader_shared == true) {
            if(first == null)
               first=cache_loader_interceptor;
            else
               addInterceptor(first, cache_loader_interceptor);
         }
         else {
            if(first == null)
               first=cache_loader_interceptor;
            else
               addInterceptor(first, cache_loader_interceptor);
            if(first == null)
               first=cache_store_interceptor;
            else
               addInterceptor(first, cache_store_interceptor);
         }
      }

      //if(first == null)
        // first=create_if_not_exists_interceptor;
      //else
        // addInterceptor(first, create_if_not_exists_interceptor);

      if(first == null)
         first=lock_interceptor;
      else
         addInterceptor(first, lock_interceptor);

      if(first == null)
         first=call_interceptor;
      else
         addInterceptor(first, call_interceptor);

      interceptor_chain=first;
      if(log.isInfoEnabled())
         log.info("interceptor chain is:\n" + printInterceptorChain(first));
   }




   private String printInterceptorChain(Interceptor i) {
      StringBuffer sb=new StringBuffer();
      if(i != null) {
         if(i.getNext() != null) {
            sb.append(printInterceptorChain(i.getNext())).append("\n");
         }
         sb.append(i.getClass());
      }
      return sb.toString();
   }


   /** Adds an interceptor at the end of the chain */
   private void addInterceptor(Interceptor first, Interceptor i) {
      if(first == null) {
         return;
      }
      do {
         if(first.getNext() != null)
            first=first.getNext();
         else
            break;
      }
      while(first != null); // findbugs has a false positive for an NPE here...
      first.setNext(i);
   }


   private Interceptor createInterceptor(String classname) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
      Class clazz=getClass().getClassLoader().loadClass(classname);
      return (Interceptor)clazz.newInstance();
   }


//

   /** Creates an instance of a CacheLoader if and only if
    * <ul>
    * <li>The CacheLoader has not yet been created
    * <li>cache_loader_class is set
    * <li>The CacheLoader is shared and we are <em>not</em> the coordinator (only the coordinator
    *     is supposed to have a CacheLoader)
    * </ul>
    * @throws Exception
    */
   protected void createCacheLoader() throws Exception {
      if(cache_loader == null && cache_loader_class != null) {
         Class cl=Thread.currentThread().getContextClassLoader().loadClass(cache_loader_class);
         cache_loader=(CacheLoader)cl.newInstance();
         cache_loader.setConfig(cache_loader_config);
         cache_loader.setCache(this);
         cache_loader.create();
         cache_loader.start();
      }
   }

   protected void cacheLoaderPreload() throws Exception {
      if(cache_loader != null) {
         if(log.isTraceEnabled())
            log.trace("preloading " + cache_loader_preload);
         if(cache_loader_preload != null) {
            for(Iterator it=cache_loader_preload.iterator(); it.hasNext();) {
               Fqn fqn=(Fqn)it.next();
               preload(fqn, true, true);
            }
         }
      }
   }


   /**
    * Loads the indicated Fqn, plus all parents recursively from the CacheLoader. If no CacheLoader is present,
    * this is a no-op
    * @param fqn
    * @throws Exception
    * @jmx.managed-operation
    */
   public void load(String fqn) throws Exception {
      if(cache_loader != null)
         preload(Fqn.fromString(fqn), true, true);
   }

   void preload(Fqn fqn, boolean preload_parents, boolean preload_children) throws Exception {

      // 1. Load the attributes first
      this.get(fqn, "bla");

      // 2. Then load the parents
      if(preload_parents) {
         Fqn tmp_fqn=new Fqn();
         for(int i=0; i < fqn.size()-1; i++) {
            tmp_fqn=new Fqn(tmp_fqn, fqn.get(i));
            this.get(tmp_fqn, "bla");
         }
      }

      if(preload_children == false)
         return;

      // 3. Then recursively for all child nodes, preload them as well
      Set children=cache_loader.getChildrenNames(fqn);
      if(children == null)
         return;
      for(Iterator it=children.iterator(); it.hasNext();) {
         String child_name=(String)it.next();
         Fqn child_fqn=new Fqn(fqn, child_name);
         preload(child_fqn, false, true);
      }
   }

   void destroyCacheLoader() {
      if(cache_loader != null) {
         cache_loader.stop();
         cache_loader.destroy();
         cache_loader=null;
      }
   }

   protected boolean determineCoordinator() {
      if(channel == null)
         return false;
      Object local_addr=getLocalAddress();
      if(local_addr == null)
         return false;
      View view=channel.getView();
      if(view == null) return false;
      ViewId vid=view.getVid();
      if(vid == null) return false;
      Object coord=vid.getCoordAddress();
      if(coord == null) return false;
      return local_addr.equals(coord);
   }

   public Address getCoordinator() {
      if(channel == null) return null;
      View view=channel.getView();
      if(view == null) return null;
      ViewId vid=view.getVid();
      if(vid == null) return null;
      Address coord=vid.getCoordAddress();
      return coord;
   }

   public byte[] getStateBytes() {
      return this.getMessageListener().getState();
   }

   public void setStateBytes(byte[] state) {
      this.getMessageListener().setState(state);
   }

   protected void fetchStateOnStartup() throws Exception {
      long start, stop;
      synchronized(stateLock) {
         isStateSet=false;
         start=System.currentTimeMillis();
         boolean rc=channel.getState(null, state_fetch_timeout);
         if(rc) {
            while(!isStateSet) {
               try {
                  stateLock.wait();
               }
               catch(InterruptedException iex) {
               }
            }
            stop=System.currentTimeMillis();
            log.info("state was retrieved successfully (in " + (stop-start) + " milliseconds)");
         }
         else
            log.info("state could not be retrieved (must be first member in group)");
      }
   }

   /**
    * @jmx.managed-operation
    */
   public void stopService() {
      if(channel != null) {
         log.info("stopService(): closing the channel");
         channel.close();
         channel=null;
      }
      if(disp != null) {
         log.info("stopService(): stopping the dispatcher");
         disp.stop();
         disp=null;
      }
      if(members != null && members.size() > 0)
         members.clear();

      if(repl_queue != null)
         repl_queue.stop();

      destroyCacheLoader();

      notifyCacheStopped();

      // Need to clean up listeners as well
      listeners.clear();
   }

   /* ----------------------- End of MBeanSupport ----------------------- */

   /**
    * @param fqn fqn String name to retrieve from cache
    * @return Node corresponding to the fqn. Null if does not exist. No guarantees wrt replication,
    * cache loading are given if the underlying node is modified
    */
   public Node get(String fqn) throws CacheException {
      return get(Fqn.fromString(fqn));
   }

   /**
    * @param fqn fqn instance to retrieve from cache
    * @return Node corresponding to the fqn. Null if does not exist. No guarantees wrt replication,
    * cache loading are given if the underlying node is modified
    */
   public Node get(Fqn fqn) throws CacheException {
      MethodCall m=new MethodCall(getNodeMethodLocal, new Object[]{fqn});
      return (Node)invokeMethod(m);
   }

   public Node _get(Fqn fqn) throws CacheException {
      return findNode(fqn);
   }


   /**
    * @param fqn
    * @return
    * @jmx.managed-operation
    */
   public Set getKeys(String fqn) throws CacheException {
      return getKeys(Fqn.fromString(fqn));
   }

   /**
    * @param fqn
    * @return A Set<String> of keys. This is a copy of the key set, modifications will not be written to the original.
    * Returns null if the node is not found, or the node has no attributes
    * @jmx.managed-operation
    */
   public Set getKeys(Fqn fqn) throws CacheException {
      MethodCall m=new MethodCall(getKeysMethodLocal, new Object[]{fqn});
      return (Set)invokeMethod(m);
   }


   public Set _getKeys(Fqn fqn) throws CacheException {
      Set retval=null;
      Node n=findNode(fqn);
      if(n == null)
         return null;
      retval=n.getDataKeys();
      return retval != null? new LinkedHashSet(retval) : null;
   }

   /**
    * Finds a node given its name and returns the value associated with a given key in its <code>data</code>
    * map. Returns null if the node was not found in the tree or the key was not found in the hashmap.
    *
    * @param fqn The fully qualified name of the node.
    * @param key The key.
    * @jmx.managed-operation
    */
   public Object get(String fqn, Object key) throws CacheException {
      return get(Fqn.fromString(fqn), key);
   }


   /**
    * Finds a node given its name and returns the value associated with a given key in its <code>data</code>
    * map. Returns null if the node was not found in the tree or the key was not found in the hashmap.
    *
    * @param fqn The fully qualified name of the node.
    * @param key The key.
    * @jmx.managed-operation
    */
   public Object get(Fqn fqn, Object key) throws CacheException {
      return get(fqn, key, true);
   }

   public Object _get(Fqn fqn, Object key, boolean sendNodeEvent) throws CacheException {
      if(log.isTraceEnabled())
         log.trace("_get(" + ", \"" + fqn + "\", " + key + ", \"" +sendNodeEvent +"\")");
      Node n=findNode(fqn);
      if(n == null) return null;
      if(sendNodeEvent)
         notifyNodeVisisted(fqn);
      return n.get(key);
   }


   protected Object get(Fqn fqn, Object key, boolean sendNodeEvent) throws CacheException {
      MethodCall m=new MethodCall(getKeyValueMethodLocal, new Object[]{fqn, key, new Boolean(sendNodeEvent)});
      return invokeMethod(m);
   }

   /**
    * Like <code>get()</code> method but without triggering a node visit event. This is used
    * to prevent refresh of the cache data in the eviction policy.
    * @param fqn
    * @param key
    * @return
    */
   public Object peek(Fqn fqn, Object key) throws CacheException {
      return get(fqn, key, false);
   }



   /**
    * Checks whether a given node exists in the tree
    *
    * @param fqn The fully qualified name of the node
    * @return boolean Whether or not the node exists
    * @jmx.managed-operation
    */
   public boolean exists(String fqn) {
      return exists(Fqn.fromString(fqn));
   }


   /**
    * Checks whether a given node exists in the tree. Does not acquire any locks in doing so (result may be dirty read)
    * @param fqn The fully qualified name of the node
    * @return boolean Whether or not the node exists
    * @jmx.managed-operation
    */
   public boolean exists(Fqn fqn) {
      Node n=findInternal(fqn);
      return n != null;
   }

   /**
    * Gets node without attempt to load it from CacheLoader if not present
    * @param fqn
    * @return
    */
   private Node findInternal(Fqn fqn) {
      if(fqn == null || fqn.size() == 0) return root;
      Node n=root, retval=null;
      Object obj;
      for(int i=0; i < fqn.size(); i++) {
         obj=fqn.get(i);
         n=n.getChild(obj);
         if(n == null)
            return null;
         else
            retval=n;
      }
      return retval;
   }


   /**
    *
    * @param fqn
    * @param key
    * @return
    * @jmx.managed-operation
    */
   public boolean exists(String fqn, Object key) {
      return exists(Fqn.fromString(fqn), key);
   }


   /**
    * Checks whether a given key exists in the given node. Does not interact with CacheLoader, so the behavior is
    * different from {@link #get(Fqn,Object)}
    * @param fqn The fully qualified name of the node
    * @param key
    * @return boolean Whether or not the node exists
    * @jmx.managed-operation
    */
   public boolean exists(Fqn fqn, Object key) {
      Node n=findInternal(fqn);
      if(n == null)
         return false;
      else
         return n.containsKey(key);
    }


   /**
    * Adds a new node to the tree and sets its data. If the node doesn not yet exist, it will be created.
    * Also, parent nodes will be created if not existent. If the node already has data, then the new data
    * will override the old one. If the node already existed, a nodeModified() notification will be generated.
    * Otherwise a nodeCreated() motification will be emitted.
    *
    * @param fqn  The fully qualified name of the new node
    * @param data The new data. May be null if no data should be set in the node.
    * @jmx.managed-operation
    */
   public void put(String fqn, Map data) throws CacheException {
      put(Fqn.fromString(fqn), data);
   }

   /**
    * Adds a new node to the tree and sets its data. If the node doesn not yet exist, it will be created.
    * Also, parent nodes will be created if not existent. If the node already has data, then the new data
    * will override the old one. If the node already existed, a nodeModified() notification will be generated.
    * Otherwise a nodeCreated() motification will be emitted.
    *
    * @param fqn  The fully qualified name of the new node
    * @param data The new data. May be null if no data should be set in the node.
    * @jmx.managed-operation
    */
   public void put(Fqn fqn, Map data) throws CacheException {
      GlobalTransaction tx=getCurrentTransaction();
      MethodCall m=new MethodCall(putDataMethodLocal, new Object[]{tx, fqn, data, Boolean.TRUE});
      invokeMethod(m);
   }

   /**
    * Adds a key and value to a given node. If the node doesn't exist, it will be created. If the node
    * already existed, a nodeModified() notification will be generated. Otherwise a
    * nodeCreated() motification will be emitted.
    *
    * @param fqn   The fully qualified name of the node
    * @param key   The key
    * @param value The value
    * @return Object The previous value (if any), if node was present
    * @jmx.managed-operation
    */
   public Object put(String fqn, Object key, Object value) throws CacheException {
      return put(Fqn.fromString(fqn), key, value);
   }


   /**
    * Put with the following properties:
    * <ol>
    * <li>Fails fast (after timeout milliseconds)
    * <li>If replication is used: replicates <em>asynchronously</em>, overriding a potential synchronous mode
    * </ol>
    * This method should be used without running in a transaction (suspend()/resume() before calling it)
    * @param fqn The fully qualified name of the node
    * @param key
    * @param value
    * @param timeout Number of milliseconds to wait until a lock has been acquired. A TimeoutException will
    * be thrown if not successful. 0 means to wait forever
    * @return
    * @throws CacheException
    * @deprecated This is a kludge created specifically form the Hibernate 3.0 release. This method should
    * <em>not</em> be used by any application. The methodV will likely be removed in a future release
    */
   public Object putFailFast(Fqn fqn, Object key, Object value, long timeout) throws CacheException {
      GlobalTransaction tx=getCurrentTransaction();
      MethodCall m=new MethodCall(putFailFastKeyValueMethodLocal,
                                  new Object[]{tx, fqn, key, value, Boolean.TRUE, new Long(timeout)});
      return invokeMethod(m);
   }

   /**
    *
    * @param fqn
    * @param key
    * @param value
    * @param timeout
    * @return
    * @throws CacheException
    * @deprecated
    */
   public Object putFailFast(String fqn, Object key, Object value, long timeout) throws CacheException {
      GlobalTransaction tx=getCurrentTransaction();
      Fqn fqntmp=Fqn.fromString(fqn);
      MethodCall m=new MethodCall(putFailFastKeyValueMethodLocal,
                                  new Object[]{tx, fqntmp, key, value, Boolean.TRUE, new Long(timeout)});
      return invokeMethod(m);
   }

   /**
    * Adds a key and value to a given node. If the node doesn't exist, it will be created. If the node
    * already existed, a nodeModified() notification will be generated. Otherwise a
    * nodeCreated() motification will be emitted.
    *
    * @param fqn   The fully qualified name of the node
    * @param key   The key
    * @param value The value
    * @return Object The previous value (if any), if node was present
    * @jmx.managed-operation
    */
   public Object put(Fqn fqn, Object key, Object value) throws CacheException {
      GlobalTransaction tx=getCurrentTransaction();
      MethodCall m=new MethodCall(putKeyValMethodLocal, new Object[]{tx, fqn, key, value, Boolean.TRUE});
      return invokeMethod(m);
   }

   /**
    * Removes the node from the tree.
    *
    * @param fqn The fully qualified name of the node.
    * @jmx.managed-operation
    */
   public void remove(String fqn) throws CacheException {
      remove(Fqn.fromString(fqn));
   }

   /**
    * Removes the node from the tree.
    *
    * @param fqn The fully qualified name of the node.
    * @jmx.managed-operation
    */
   public void remove(Fqn fqn) throws CacheException {
      GlobalTransaction tx=getCurrentTransaction();
      MethodCall m=new MethodCall(removeNodeMethodLocal, new Object[]{tx, fqn, Boolean.TRUE});
      invokeMethod(m);
   }

   /**
    * Called by eviction policy provider. Note that eviction is done only in local mode,
    * that is, it doesn't replicate the node removal. This is will cause the replcation nodes
    * not synchronizing, but it is ok since user is supposed to add the node again when get is
    * null. After that, the contents will be in sync.
    * @param fqn Will remove everythign assoicated with this fqn.
    * @throws CacheException
    * @jmx.managed-operation
    */
   public void evict(Fqn fqn) throws CacheException {
      MethodCall m=new MethodCall(evictNodeMethodLocal, new Object[]{fqn});
      invokeMethod(m);
   }


   /**
    * Evicts a key/value pair from a node's attributes. Note that this is <em>local</em>, will not be replicated.
    * @param fqn
    * @param key
    * @throws CacheException
    * @jmx.managed-operation
    */
//   public void evict(Fqn fqn, Object key) throws CacheException {
//      MethodCall m=new MethodCall(evictKeyValueMethodLocal, new Object[]{fqn, key});
//      invokeMethod(m);
//   }




   /**
    * Removes <code>key</code> from the node's hashmap
    *
    * @param fqn The fullly qualified name of the node
    * @param key The key to be removed
    * @return The previous value, or null if none was associated with the given key
    * @jmx.managed-operation
    */
   public Object remove(String fqn, Object key) throws CacheException {
      return remove(Fqn.fromString(fqn), key);
   }

   /**
    * Removes <code>key</code> from the node's hashmap
    *
    * @param fqn The fullly qualified name of the node
    * @param key The key to be removed
    * @return The previous value, or null if none was associated with the given key
    * @jmx.managed-operation
    */
   public Object remove(Fqn fqn, Object key) throws CacheException {
      GlobalTransaction tx=getCurrentTransaction();
      MethodCall m=new MethodCall(removeKeyMethodLocal, new Object[]{tx, fqn, key, Boolean.TRUE});
      return invokeMethod(m);
   }

   /**
    *
    * @param fqn
    * @throws CacheException
    * @jmx.managed-operation
    */
   public void removeData(String fqn) throws CacheException {
      removeData(Fqn.fromString(fqn));
   }

   /**
    *
    * @param fqn
    * @throws CacheException
    * @jmx.managed-operation
    */
   public void removeData(Fqn fqn) throws CacheException {
      GlobalTransaction tx=getCurrentTransaction();
      MethodCall m=new MethodCall(removeDataMethodLocal, new Object[]{tx, fqn, Boolean.TRUE});
      invokeMethod(m);
   }


   /**
    * Lock a given node (or the entire subtree starting at this node)
    * @param fqn The FQN of the node
    * @param owner The owner. This is simply a key into a hashtable, and can be anything, e.g.
    * a GlobalTransaction, the current thread, or a special object. If null, it is set to Thread.currentThread()
    * @param lock_type The type of lock (RO, RW). Needs to be of type Node.LOCK_TYPE_READ or Node.LOCK_TYPE_WRITE
    * @param lock_recursive If true, the entire subtree is locked, else only the given node
    * @throws CacheException If node doesn't exist, a NodeNotExistsException is throw. Other exceptions are
    * LockingException, TimeoutException and UpgradeException
    */
//   public void lock(Fqn fqn, Object owner, int lock_type, boolean lock_recursive) throws CacheException {
//
//   }

   /**
    * Unlock a given node (or the entire subtree starting at this node)
    * @param fqn The FQN of the node
    * @param owner The owner. This is simply a key into a hashtable, and can be anything, e.g.
    * a GlobalTransaction, the current thread, or a special object. If null, it is set to Thread.currentThread()
    * @param unlock_recursive If true, the entire subtree is unlocked, else only the given node
    * @param force Release the lock even if we're not the owner
    */
//   public void unlock(Fqn fqn, Object owner, boolean unlock_recursive, boolean force) {
//
//   }

   /**
    * Force-releases all locks in this node and the entire subtree
    *
    * @param fqn
    * @jmx.managed-operation
    */
   public void releaseAllLocks(String fqn) {
      releaseAllLocks(Fqn.fromString(fqn));
   }

   /**
    * Force-releases all locks in this node and the entire subtree
    *
    * @param fqn
    * @jmx.managed-operation
    */
   public void releaseAllLocks(Fqn fqn) {
      MethodCall m=new MethodCall(releaseAllLocksMethodLocal, new Object[]{fqn});
      try {
         invokeMethod(m);
      }
      catch(CacheException e) {
         log.error("failed releasing all locks for " + fqn, e);
      }
   }

   /**
    * Prints a representation of the node defined by <code>fqn</code>.
    * Output includes name, fqn and data.
    *
    * @jmx.managed-operation
    */
   public String print(String fqn) {
      return print(Fqn.fromString(fqn));
   }

   /**
    * Prints a representation of the node defined by <code>fqn</code>.
    * Output includes name, fqn and data.
    *
    * @jmx.managed-operation
    */
   public String print(Fqn fqn) {
      MethodCall m=new MethodCall(printMethodLocal, new Object[]{fqn});
      Object retval=null;
      try {
         retval=invokeMethod(m);
      }
      catch(Throwable e) {
         retval=e;
      }
      if(retval != null)
         return retval.toString();
      else return "";
   }


   /**
    * Returns all children of a given node
    *
    * @param fqn The fully qualified name of the node
    * @return Set A list of child names (as Strings)
    * @jmx.managed-operation
    */
   public Set getChildrenNames(String fqn) throws CacheException {
      return getChildrenNames(Fqn.fromString(fqn));
   }

   /**
    * Returns all children of a given node
    *
    * @param fqn The fully qualified name of the node
    * @return Set A list of child names (as Objects). Must <em>not</em> be modified because this would
    * modify the underlying node directly (will throw an exception if modification is attempted). Returns null
    * of the parent node was not found, or if there are no children
    * @jmx.managed-operation
    */
   public Set getChildrenNames(Fqn fqn) throws CacheException {
      MethodCall m=new MethodCall(getChildrenNamesMethodLocal, new Object[]{fqn});
      return (Set)invokeMethod(m);
   }

   public Set _getChildrenNames(Fqn fqn) throws CacheException {
      Node n=findNode(fqn);
      if(n == null) return null;
      Map m=n.getChildren();
      if(m != null)
         return new LinkedHashSet(m.keySet());
      else
         return null;
   }



   public boolean hasChild(Fqn fqn) {
      if(fqn == null) return false;

      Node n=root;
      Object obj;
      for(int i=0; i < fqn.size(); i++) {
         obj=fqn.get(i);
         n=n.getChild(obj);
         if(n == null)
            return false;
      }
      return n.hasChildren();
   }

   /**
    * @return
    * @jmx.managed-operation
    */
   public String toString() {
      return toString(false);
   }


   /**
    * @return
    * @jmx.managed-operation
    */
   public String toString(boolean details) {
      StringBuffer sb=new StringBuffer();
      int indent=0;
      Map children;

      if(!details) {
         sb.append(getClass().getName()).append(" [").append(getNumberOfNodes()).append(" nodes, ");
         sb.append(getNumberOfLocksHeld()).append(" locks]");
      }
      else {
         children=root.getChildren();
         if(children != null && children.size() > 0) {
            Collection nodes=children.values();
            for(Iterator it=nodes.iterator(); it.hasNext();) {
               ((Node)it.next()).print(sb, indent);
               sb.append("\n");
            }
         }
         else
            sb.append(SEPARATOR);
      }
      return sb.toString();
   }




   /**
    * @return
    * @jmx.managed-operation
    */
   public String printDetails() {
      StringBuffer sb=new StringBuffer();
      int indent=0;
      Map children;

      children=root.getChildren();
      if(children != null && children.size() > 0) {
         Collection nodes=children.values();
         for(Iterator it=nodes.iterator(); it.hasNext();) {
            ((Node)it.next()).printDetails(sb, indent);
            sb.append("\n");
         }
      }
      else
         sb.append(SEPARATOR);
      return sb.toString();
   }

   /**
    * @return
    * @jmx.managed-operation
    */
   public String printLockInfo() {
      StringBuffer sb=new StringBuffer("\n");
      int indent=0;
      Map children;

      children=root.getChildren();
      if(children != null && children.size() > 0) {
         Collection nodes=children.values();
         for(Iterator it=nodes.iterator(); it.hasNext();) {
            ((Node)it.next()).printLockInfo(sb, indent);
            sb.append("\n");
         }
      }
      else
         sb.append(SEPARATOR);
      return sb.toString();
   }

   /**
    * Gets the number of read or write locks held across the entire tree
    * @return
    * @jmx.managed-attribute
    */
   public int getNumberOfLocksHeld() {
      return numLocks(root);
   }


   int numLocks(Node n) {
      int num=0;
      Map children;
      if(n.isLocked())
         num++;
      if((children=n.getChildren()) != null) {
         for(Iterator it=children.values().iterator(); it.hasNext();) {
            num+=numLocks((Node)it.next());
         }
      }
      return num;
   }

   /**
    * Returns an <em>approximation</em> of the total number of nodes in the tree. Since this method doesn't acquire
    * any locks, the number might be incorrect, or the method might even throw a ConcurrentModificationException
    * @return
    * @jmx.managed-attribute
    */
   public int getNumberOfNodes() {
      return numNodes(root)-1;
   }


   int numNodes(Node n) {
      if(n == null)
         return 0;
      int count=1; // for n
      if(n.hasChildren()) {
         Map children=n.getChildren();
         if(children != null && children.size() > 0) {
            Collection child_nodes=children.values();
            Node child;
            for(Iterator it=child_nodes.iterator(); it.hasNext();) {
               child=(Node)it.next();
               count+=numNodes(child);
            }
         }
      }
      return count;
   }

   /**
    * Returns an <em>approximation</em> of the total number of attributes in the tree. Since this method doesn't acquire
    * any locks, the number might be incorrect, or the method might even throw a ConcurrentModificationException
    * @return
    * @jmx.managed-attribute
    */
   public int getNumberOfAttributes() {
      return numAttributes(root);
   }

   int numAttributes(Node n) {
      if(n == null)
         return 0;
      int count=n.numAttributes();
      if(n.hasChildren()) {
         Map children=n.getChildren();
         if(children != null && children.size() > 0) {
            Collection child_nodes=children.values();
            Node child;
            for(Iterator it=child_nodes.iterator(); it.hasNext();) {
               child=(Node)it.next();
               count+=child.numAttributes();
            }
         }
      }
      return count;
   }


   /* ---------------------- Remote method calls -------------------- */

   public List callRemoteMethods(Vector mbrs, MethodCall method_call,
                                 boolean synchronous, boolean exclude_self, long timeout)
           throws Exception {
      RspList rsps;
      Rsp rsp;
      List retval;
      Vector validMembers;
      int mode=synchronous ? GroupRequest.GET_ALL : GroupRequest.GET_NONE;

      if(disp == null)
         return null;

      validMembers=mbrs != null ? new Vector(mbrs) : new Vector(this.members);
      if(exclude_self && validMembers.size() > 0) {
         Object local_addr=getLocalAddress();
         if(local_addr != null)
            validMembers.remove(local_addr);
      }
      if(validMembers.size() == 0) {
         if(log.isTraceEnabled())
            log.trace("destination list is empty, discarding call");
         return null;
      }

      if(log.isTraceEnabled())
         log.trace("callRemoteMethods(): valid members are " + validMembers);

      rsps=disp.callRemoteMethods(validMembers, method_call, mode, timeout);
      if(log.isTraceEnabled())
         log.trace("(" + getLocalAddress() + "): responses for method " + method_call.getName() + ":\n" + rsps);
      if(rsps == null)
         return null;
      retval=new ArrayList(rsps.size());
      for(int i=0; i < rsps.size(); i++) {
         rsp=(Rsp)rsps.elementAt(i);
         if(rsp.wasSuspected() || !rsp.wasReceived())
            retval.add(new TimeoutException("rsp=" + rsp));
         else
            retval.add(rsp.getValue());
      }
      return retval;
   }


   /**
    * @param members
    * @param method
    * @param args
    * @param synchronous
    * @param exclude_self
    * @param timeout
    * @return
    * @throws Exception
    * @jmx.managed-operation
    */
   public List callRemoteMethods(Vector members, Method method, Object[] args,
                                 boolean synchronous, boolean exclude_self, long timeout)
           throws Exception {
      return callRemoteMethods(members, new MethodCall(method, args), synchronous, exclude_self, timeout);
   }

   /**
    * @param members
    * @param method_name
    * @param types
    * @param args
    * @param synchronous
    * @param exclude_self
    * @param timeout
    * @return
    * @throws Exception
    * @jmx.managed-operation
    */
   public List callRemoteMethods(Vector members, String method_name,
                                 Class[] types, Object[] args,
                                 boolean synchronous, boolean exclude_self, long timeout)
           throws Exception {
      Method method=getClass().getDeclaredMethod(method_name, types);
      return callRemoteMethods(members, method, args, synchronous, exclude_self, timeout);
   }
   /* -------------------- End Remote method calls ------------------ */


   /* --------------------- Callbacks -------------------------- */

   /**
    * Does the real work. Needs to acquire locks if accessing nodes, depending on
    * the value of <tt>locking</tt>. If run inside a transaction, needs to (a) add
    * newly acquired locks to {@link TransactionEntry}'s lock list, (b) add nodes
    * that were created to {@link TransactionEntry}'s node list and (c) create
    * {@link Modification}s and add them to {@link TransactionEntry}'s modification
    * list and (d) create compensating modifications to undo the changes in case
    * of a rollback
    *
    * @param fqn
    * @param data
    * @param create_undo_ops If true, undo operations will be created (default is true).
    *                        Otherwise they will not be created (used by rollback()).
    * @jmx.managed-operation
    */
   public void _put(GlobalTransaction tx, String fqn, Map data, boolean create_undo_ops)
           throws CacheException {
      _put(tx, Fqn.fromString(fqn), data, create_undo_ops);
   }


   /**
    * Does the real work. Needs to acquire locks if accessing nodes, depending on
    * the value of <tt>locking</tt>. If run inside a transaction, needs to (a) add
    * newly acquired locks to {@link TransactionEntry}'s lock list, (b) add nodes
    * that were created to {@link TransactionEntry}'s node list and (c) create
    * {@link Modification}s and add them to {@link TransactionEntry}'s modification
    * list and (d) create compensating modifications to undo the changes in case
    * of a rollback
    *
    * @param fqn
    * @param data
    * @param create_undo_ops If true, undo operations will be created (default is true).
    *                        Otherwise they will not be created (used by rollback()).
    * @jmx.managed-operation
    */
   public void _put(GlobalTransaction tx, Fqn fqn, Map data, boolean create_undo_ops)
           throws CacheException {
      _put(tx, fqn, data, create_undo_ops, false);
   }


   /**
    * Does the real work. Needs to acquire locks if accessing nodes, depending on
    * the value of <tt>locking</tt>. If run inside a transaction, needs to (a) add
    * newly acquired locks to {@link TransactionEntry}'s lock list, (b) add nodes
    * that were created to {@link TransactionEntry}'s node list and (c) create
    * {@link Modification}s and add them to {@link TransactionEntry}'s modification
    * list and (d) create compensating modifications to undo the changes in case
    * of a rollback
    *
    * @param fqn
    * @param data
    * @param create_undo_ops If true, undo operations will be created (default is true).
    * @param erase_contents  Clear the existing hashmap before putting the new data into it
    *                        Otherwise they will not be created (used by rollback()).
    * @jmx.managed-operation
    */
   public void _put(GlobalTransaction tx, Fqn fqn, Map data, boolean create_undo_ops, boolean erase_contents)
           throws CacheException {
      Node n;
      MethodCall undo_op=null;
      Map old_data;

      if(log.isTraceEnabled())
         log.trace(new StringBuffer().append("_put(").append(tx).append(", \"").append(fqn)
                   .append("\", ").append(data).append(")").toString());

      // Find the node. This will lock it (if <tt>locking</tt> is true) and
      // add the temporarily created parent nodes to the TX's node list if tx != null)
      n=findNode(fqn);
      if(n == null) {
         String errStr="node " + fqn + " not found (gtx=" + tx + ", caller=" + Thread.currentThread() + ")";
         if(log.isTraceEnabled())
            log.trace(errStr);
         throw new NodeNotExistsException(errStr);
      }

      // TODO: move creation of undo-operations to separate Interceptor
      // create a compensating method call (reverting the effect of
      // this modification) and put it into the TX's undo list.
      if(tx != null && create_undo_ops) {
         // TODO even if n is brand new, getData can have empty value instead. Need to fix.
         if((old_data=n.getData()) == null) {
            undo_op=new MethodCall(removeDataMethodLocal,
                                   new Object[]{tx, fqn, Boolean.FALSE});
         }
         else {
            undo_op=new MethodCall(putDataEraseMethodLocal,
                                   new Object[]{tx, fqn,
                                                new HashMap(old_data),
                                                Boolean.FALSE,
                                                Boolean.TRUE}); // erase previous hashmap contents
         }
      }

      n.put(data, erase_contents);

      if(tx != null && create_undo_ops) {
         // 1. put undo-op in TX' undo-operations list (needed to rollback TX)
         tx_table.addUndoOperation(tx, undo_op);
      }
      notifyNodeModified(fqn);
   }


   /**
    * @param fqn
    * @param key
    * @param value
    * @return Previous value (if any)
    * @jmx.managed-operation
    */
   public Object _put(GlobalTransaction tx, String fqn, Object key, Object value,  boolean create_undo_ops)
         throws CacheException {
      return _put(tx, Fqn.fromString(fqn), key, value, create_undo_ops);
   }




   /**
    * @param fqn
    * @param key
    * @param value
    * @return Previous value (if any)
    * @jmx.managed-operation
    */
   public Object _put(GlobalTransaction tx, Fqn fqn, Object key, Object value, boolean create_undo_ops)
         throws CacheException {
      Node n=null;
      MethodCall undo_op=null;
      Object old_value=null;

      if(log.isTraceEnabled()) {
         log.trace(new StringBuffer().append("_put(").append(tx).append(", \"").
                   append(fqn).append("\", ").append(key).append(", ").append(value).append(")").toString());
      }

      n=findNode(fqn);
      if(n == null) {
         String errStr="node " + fqn + " not found (gtx=" + tx + ", caller=" + Thread.currentThread() + ")";
         if(log.isTraceEnabled())
            log.trace(errStr);
         throw new NodeNotExistsException(errStr);
      }

      old_value=n.get(key);
      n.put(key, value);

      // create a compensating method call (reverting the effect of
      // this modification) and put it into the TX's undo list.
      if(tx != null && create_undo_ops) {
         if(old_value == null) {
            undo_op=new MethodCall(removeKeyMethodLocal,
                                   new Object[]{tx, fqn, key, Boolean.FALSE});
         }
         else {
            undo_op=new MethodCall(putKeyValMethodLocal,
                                   new Object[]{tx, fqn, key, old_value,
                                                Boolean.FALSE});
         }
         // 1. put undo-op in TX' undo-operations list (needed to rollback TX)
         tx_table.addUndoOperation(tx, undo_op);
      }

      notifyNodeModified(fqn);
      return old_value;
   }


   public Object _put(GlobalTransaction tx, Fqn fqn, Object key, Object value,  boolean create_undo_ops, long timeout)
         throws CacheException {
      return _put(tx, fqn, key, value, create_undo_ops);
   }

   /**
    * @param fqn
    * @jmx.managed-operation
    */
   public void _remove(GlobalTransaction tx, String fqn, boolean create_undo_ops) throws CacheException {
      _remove(tx, Fqn.fromString(fqn), create_undo_ops);
   }

   /**
    * @param fqn
    * @jmx.managed-operation
    */
   public void _remove(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops) throws CacheException {
      _remove(tx, fqn, create_undo_ops, true);
   }

   public void _remove(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent)
           throws CacheException {
      _remove(tx, fqn, create_undo_ops, sendNodeEvent, false);
   }

   /**
    * Remove a node
    * @param tx
    * @param fqn
    * @param create_undo_ops
    * @param sendNodeEvent
    */
   public void _remove(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent, boolean eviction)
         throws CacheException {
      Node n, parent_node;
      MethodCall undo_op=null;

      if(log.isTraceEnabled())
         log.trace("_remove(" + tx + ", \"" + fqn + "\")");

      if(fqn.size() == 0) {
         Set children=getChildrenNames(fqn);
         if(children != null) {
            Object[] kids=children.toArray();

            for(int i=0; i < kids.length; i++) {
               Object s=kids[i];
               Fqn tmp=new Fqn(fqn, s);
               try {
                  _remove(tx, tmp, create_undo_ops, true, eviction);
               }
               catch(Exception e) {
                  log.error("failure removing node " + tmp);
               }
            }
         }
         return;
      }

      // Find the node. This will add the temporarily created parent nodes to the TX's node list if tx != null)
      n=findNode(fqn);
      if(n == null) {
         log.warn("node " + fqn + " not found");
         return;
      }
      parent_node=n.getParent();

      // remove subtree from parent
      parent_node.removeChild(n.getName());

      // release all locks for the entire subtree
      n.releaseAll(tx != null? tx : (Object)Thread.currentThread());

      // create a compensating method call (reverting the effect of
      // this modification) and put it into the TX's undo list.
      if(tx != null && create_undo_ops && n != null && eviction == false) {
         undo_op=new MethodCall(addChildMethodLocal, new Object[]{tx, parent_node.getFqn(), n.getName(), n});

         // 1. put undo-op in TX' undo-operations list (needed to rollback TX)
         tx_table.addUndoOperation(tx, undo_op);
      }

      if(sendNodeEvent)
         notifyNodeRemoved(fqn);
      else
         notifyNodeEvicted(fqn);
   }

   /**
    * @param fqn
    * @param key
    * @return Object
    * @jmx.managed-operation
    */
   public Object _remove(GlobalTransaction tx, String fqn, Object key, boolean create_undo_ops)
           throws CacheException {
      return _remove(tx, Fqn.fromString(fqn), key, create_undo_ops);
   }

   /**
    * @param fqn
    * @param key
    * @return Object
    * @jmx.managed-operation
    */
   public Object _remove(GlobalTransaction tx, Fqn fqn, Object key, boolean create_undo_ops)
           throws CacheException {
      return _remove(tx, fqn, key, create_undo_ops, false); // din't node remove notification event
   }

   public Object _remove(GlobalTransaction tx, Fqn fqn, Object key, boolean create_undo_ops, boolean sendNodeEvent)
           throws CacheException {
      Node n=null;
      MethodCall undo_op=null;
      Object old_value=null;

      if(log.isTraceEnabled())
         log.trace(new StringBuffer().append("_remove(").append(tx).append(", \"")
                   .append(fqn).append("\", ").append(key).append(")").toString());

      // Find the node. This will lock it (if <tt>locking</tt> is true) and
      // add the temporarily created parent nodes to the TX's node list if tx != null)
      n=findNode(fqn);
      if(n == null) {
         log.warn("node " + fqn + " not found");
         return null;
      }
      old_value=n.remove(key);

      // create a compensating method call (reverting the effect of
      // this modification) and put it into the TX's undo list.
      if(tx != null && create_undo_ops && old_value != null) {
         undo_op=new MethodCall(putKeyValMethodLocal,
                                new Object[]{tx, fqn, key, old_value,
                                             Boolean.FALSE});
         // 1. put undo-op in TX' undo-operations list (needed to rollback TX)
         tx_table.addUndoOperation(tx, undo_op);
      }

      if(sendNodeEvent)
         notifyNodeModified(fqn); // changed from notifyNodeRemoved() - Jimmy Wilson
      return old_value;
   }

   /**
    * @param fqn
    */
   public void _removeData(GlobalTransaction tx, String fqn, boolean create_undo_ops)
           throws CacheException {
      _removeData(tx, Fqn.fromString(fqn), create_undo_ops);
   }

   /**
    * @param fqn
    */
   public void _removeData(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops)
           throws CacheException {
      _removeData(tx, fqn, create_undo_ops, true);
   }


   public void _removeData(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent)
           throws CacheException {
      _removeData(tx, fqn, create_undo_ops, sendNodeEvent, false);
   }

   public void _removeData(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent, boolean eviction)
           throws CacheException {
      Node n=null;
      MethodCall undo_op=null;
      Map old_data=null;

      if(log.isTraceEnabled())
         log.trace("_removeData(" + tx + ", \"" + fqn + "\")");

      // Find the node. This will lock it (if <tt>locking</tt> is true) and
      // add the temporarily created parent nodes to the TX's node list if tx != null)
      n=findNode(fqn);
      if(n == null) {
         log.warn("node " + fqn + " not found");
         return;
      }

      // create a compensating method call (reverting the effect of
      // this modification) and put it into the TX's undo list.
      if(tx != null && create_undo_ops && (old_data=n.getData()) != null && !eviction) {
         undo_op=new MethodCall(putDataMethodLocal, new Object[]{tx, fqn, new HashMap(old_data), Boolean.FALSE});
      }

      n.clear();
      if(eviction)
         n.put(UNINITIALIZED, null); // required by cache loader to subsequently load the element again

      if(sendNodeEvent) {
         notifyNodeVisisted(fqn);
      }
      else { // FIXME Bela did this so GUI view can refresh the view after node is evicted. But this breaks eviction policy, especially AOP!!!!
         if(eviction)
            notifyNodeEvicted(fqn);
         else
            notifyNodeModified(fqn); // todo: merge these 2 notifications back into 1 !
      }

      // put undo-op in TX' undo-operations list (needed to rollback TX)
      if(tx != null && create_undo_ops) {
         tx_table.addUndoOperation(tx, undo_op);
      }
   }


   /**
     * Called by eviction policy provider. Note that eviction is done only in local mode,
     * that is, it doesn't replicate the node removal. This is will cause the replcation nodes
     * not synchronizing, but it is ok since user is supposed to add the node again when get is
     * null. After that, the contents will be in sync.
     * @param fqn Will remove everythign assoicated with this fqn.
     * @throws CacheException
     */
    public void _evict(Fqn fqn) throws CacheException {
       if(!exists(fqn)) return;   // node does not exist. Maybe it has been recursively removed.
       // use remove method now if there is a child node. Otherwise, it is removed
       boolean create_undo_ops = false;
       boolean sendNodeEvent = false;
       boolean eviction=true;
      if(log.isTraceEnabled())
         log.trace("_evict(" + fqn + ")");
       if(hasChild(fqn)) {
          _removeData(null, fqn, create_undo_ops, sendNodeEvent, eviction);
       }
       else {
          _remove(null, fqn, create_undo_ops, sendNodeEvent, eviction);
       }
    }


    /**
     * Evicts a key/value pair from a node's attributes. Note that this is <em>local</em>, will not be replicated.
     * @param fqn
     * @param key
     * @throws CacheException
     */
//    public void _evict(Fqn fqn, Object key) throws CacheException {
//       if(!exists(fqn)) return;
//       boolean create_undo_ops = false;
//       boolean sendNodeEvent = false;
//       boolean eviction=true;
//       _removeData(null, fqn, create_undo_ops, sendNodeEvent, eviction);
//    }




   /**
    * Compensating method to {@link #_remove(GlobalTransaction,Fqn,boolean)}. This
    * is package-private on purpose because it is only supposed to be called inside
    * the same VM (by {@link #rollback(GlobalTransaction)}).
    *
    * @param parent_fqn
    * @param child_name
    * @param old_node
    */
   public void _addChild(GlobalTransaction tx, Fqn parent_fqn, Object child_name, Node old_node)
           throws CacheException {
      if(log.isTraceEnabled())
         log.trace("_addChild(" + tx + ", \"" + parent_fqn + "\", \"" + child_name + "\")");

      if(parent_fqn == null || child_name == null || old_node == null) {
         log.error("parent_fqn or child_name or node was null");
         return;
      }
      Node tmp=findNode(parent_fqn);
      if(tmp == null) {
         log.warn("node " + parent_fqn + " not found");
         return;
      }

      tmp.addChild(child_name, old_node);
   }




   /**
    * Invoked by the ReplicationInterceptor of other nodes in the cluster, to replicate their changes across the
    * cluster. Needs to be forwarded to the ReplicationInterceptor in this interceptor chain.<br/>
    * This method will later be moved entirely into the ReplicationInterceptor.
    * @param method_call
    * @return
    * @throws Throwable
    */
   public Object _replicate(MethodCall method_call) throws Throwable {
      if(replication_handler != null) {
         return replication_handler.replicate(method_call);
      }
      else {
         throw new UnsupportedOperationException("no replication handler is installed");
      }
   }

   public void _replicate(List method_calls) throws Throwable {
      if(replication_handler != null) {
         replication_handler.replicate(method_calls);
      }
      else {
         throw new UnsupportedOperationException("no replication handler is installed");
      }
   }

   public void _releaseAllLocks(Fqn fqn) {
      Node n;

      try {
         n=findNode(fqn);
         if(n == null) {
            log.error("releaseAllLocks(): node " + fqn + " not found");
            return;
         }
         n.releaseAllForce();
      }
      catch(Throwable t) {
         log.error("releaseAllLocks(): failed", t);
      }
   }


   public String _print(Fqn fqn) {
      try {
         Node n=findNode(fqn);
         if(n == null) return null;
         return n.toString();
      }
      catch(Throwable t) {
         return null;
      }
   }


   public void _lock(Fqn fqn, int lock_type, boolean recursive)
         throws TimeoutException, LockingException {
      log.warn("method _lock() should not be invoked on TreeCache");
   }

   // todo: these methods can be removed once we move 2PC entirely into {Replication/Lock}Interceptor
   public void prepare(GlobalTransaction global_tx, List modifications, Address coord, boolean commit) {
      throw new UnsupportedOperationException("prepare() should not be called on TreeCache directly");
   }

   public void commit(GlobalTransaction tx) {
      throw new UnsupportedOperationException("commit() should not be called on TreeCache directly");
   }

   public void rollback(GlobalTransaction tx) {
      throw new UnsupportedOperationException("rollback() should not be called on TreeCache directly");
   }


   /* ----------------- End of  Callbacks ---------------------- */


   public void addNode(GlobalTransaction gtx, Fqn node) {
      tx_table.addNode(gtx, node);
   }



   /*-------------------- MessageListener ----------------------*/

   class MessageListenerAdaptor implements MessageListener {
      final TreeCache cache;
      final Logger log; // Need this to run under jdk1.3

      MessageListenerAdaptor(TreeCache cache, Logger log) {
         this.cache = cache;
         this.log = log;
      }

      /**
       * Callback (no-op)
       *
       * @param msg
       */
      public void receive(Message msg) {
      }

      /**
       * Return a copy of the current cache (tree). It actually returns a 2 element array of byte[], element 0
       * being the transient state (or null) and element 1 being the persistent state (or null)
       */
      public byte[] getState() {
         Object owner=null;
         boolean fetch_persistent_state=cache_loader != null &&
               cache_loader_shared == false &&
               cache_loader_fetch_persistent_state;
         byte[] transient_state=null;
         byte[] persistent_state=null;
         byte[][] states=new byte[2][];
         byte[] retval=null;
         boolean locked=false;

         owner=getCurrentTransaction();
         if(owner == null) owner=Thread.currentThread();

         states[0]=states[1]=null;
         try {
            if(cache_loader_fetch_transient_state) {
               log.info("locking the tree to obtain transient state");
               root.acquireAll(owner, state_fetch_timeout, Node.LOCK_TYPE_READ);
               locked=true;
               transient_state=Util.objectToByteBuffer(root);
               states[0]=transient_state;
               log.info("returning the transient state (" + transient_state.length + " bytes)");
            }
         }
         catch(Throwable t) {
            log.error("failed getting the transient state", t);
         }
         try {
            if(fetch_persistent_state) {
               if(!locked) {
                  log.info("locking the tree to obtain persistent state");
                  root.acquireAll(owner, state_fetch_timeout, Node.LOCK_TYPE_READ);
                  locked=true;
               }
               log.info("getting the persistent state");
               persistent_state=cache_loader.loadEntireState();
               states[1]=persistent_state;
               log.info("returning the persistent state (" + persistent_state.length + " bytes)");
            }
         }
         catch(Throwable t) {
            log.error("failed getting the persistent state", t);
         }

         try {
            retval=Util.objectToByteBuffer(states);
            return retval;
         }
         catch(Throwable t) {
            log.error("failed serializing transient and persistent state", t);
            return retval;
         }

         finally {
            root.releaseAll(owner);
         }
      }


      public void setState(byte[] new_state) {
         try {
            _setState(new_state);
         }
         finally {
            synchronized(stateLock) {
               // Notify wait that state has been set.
               stateLock.notifyAll();
               if(root != null)
                  root.releaseAllForce();
            }
         }
      }

      /**
       * Set the cache (tree) to this value. The new_state is a byt[][] array, element 0 is the transient state
       * (or null) , and element 1 is the persistent state (or null)
       */
      void _setState(byte[] new_state) {
         Node new_root=null, old_root=null;
         Object obj;
         Object owner=null;
         byte[][] states=null;
         byte[] transient_state=null;
         byte[] persistent_state=null;
         boolean locked=false;

         if(new_state == null) {
            log.info("new cache is null (maybe first member in cluster)");
            return;
         }

         // 1. Unserialize the states into transient and persistent state
         try {
            log.info("received the state (size=" + new_state.length + " bytes)");
            states=(byte[][])Util.objectFromByteBuffer(new_state);
            transient_state=states[0];
            persistent_state=states[1];
            if(transient_state != null)
               log.info("transient state: " + transient_state.length + " bytes");
            if(persistent_state != null)
               log.info("persistent state: " + persistent_state.length + " bytes");
            owner=getCurrentTransaction();
            if(owner == null) owner=Thread.currentThread();
         }
         catch(Throwable t) {
            log.error("failed unserializing state", t);
         }

         // 2. If transient state is available: set it (lock tree first)
         if(transient_state != null) {
            try {
               log.info("setting transient state");
               obj=Util.objectFromByteBuffer(transient_state);
               new_root=(Node)obj;
               new_root.setRecursiveTreeCacheInstance(cache);  // need to set this at root and set it recursively
               log.info("locking the old tree");
               root.acquireAll(owner, state_fetch_timeout, Node.LOCK_TYPE_WRITE);
               locked=true;
               log.info("locking the old tree was successful");
               old_root=root;
               root=new_root;
               log.info("setting the transient state was successful");
               notifyAllNodesCreated(root);
            }
            catch(Throwable t) {
               log.error("failed setting transient state", t);
            }
         }

         // 3. Set the persistent state
         if(persistent_state != null) {
            if(cache_loader == null) {
               log.error("cache loader is null, cannot set persistent state");
            }
            else {
               try {
                  if(!locked) {
                     log.info("locking the old tree");
                     root.acquireAll(owner, state_fetch_timeout, Node.LOCK_TYPE_WRITE);
                     old_root=root; // to unlock later
                     locked=true;
                     root=new Node(SEPARATOR, Fqn.fromString(SEPARATOR), null, null, cache);
                     log.info("locking the old tree was successful");
                  }
                  log.info("setting the persistent state");
                  // cache_loader.remove(Fqn.fromString("/"));
                  cache_loader.storeEntireState(persistent_state);
                  log.info("setting the persistent state was successful");
               }
               catch(Throwable t) {
                  log.error("failed setting persistent state", t);
               }
            }
         }

         if(old_root != null) {
            log.info("forcing release of all locks in old tree");
            try {old_root.releaseAllForce();} catch(Throwable t) {log.error("failed releasing locks", t);}
         }

         isStateSet=true;
      }



   }



   /*-------------------- End of MessageListener ----------------------*/


   /*----------------------- MembershipListener ------------------------*/

   public void viewAccepted(View new_view) {
      Vector new_mbrs=new_view.getMembers();

      // todo: if MergeView, fetch and reconcile state from coordinator
      // actually maybe this is best left up to the application ? we just notify them and let
      // the appl handle it ?

      log.info("viewAccepted(): new members: " + new_mbrs);
      if(new_mbrs != null) {
         members.removeAllElements();
         members.addAll(new_view.getMembers());
         notifyViewChange(new_view);
      }

      if(cache_loader_shared) {
         // boolean old_coord=coordinator;
         coordinator=determineCoordinator();
      }
   }


   /**
    * Called when a member is suspected
    */
   public void suspect(Address suspected_mbr) {
   }


   /**
    * Block sending and receiving of messages until viewAccepted() is called
    */
   public void block() {
   }

   /*------------------- End of MembershipListener ----------------------*/


   /* ------------------------------ Private methods --------------------------- */

   /**
    * Returns the transaction associated with the current thread. We get the initial context
    * and a reference to the TransactionManager to get the transaction. This method is used
    * by {@link #getCurrentTransaction()}
    *
    * @return
    */
   protected Transaction getLocalTransaction() {
      if(tm == null) {
         return null;
      }
      try {
         return tm.getTransaction();
      }
      catch(Throwable t) {
         return null;
      }
   }



   
   /** Returns true if transaction is ACTIVE or PREPARING, false otherwise */
   boolean isValid(Transaction tx) {
      if(tx == null) return false;
      int status=-1;
      try {
         status=tx.getStatus();
         return status == Status.STATUS_ACTIVE || status == Status.STATUS_PREPARING;
      }
      catch(SystemException e) {
         log.error("failed getting transaction status", e);
         return false;
      }
   }


   /**
    * Get the transaction associated with the current thread and maps it onto a
    * {@link GlobalTransaction}. The mapping is maintained in the <tt>tx_map</tt> table.
    * If a local transaction exists, but doesn't yet have a mapping to a GlobalTransaction,
    * a new GlobalTransaction will be created and mapped to the local transaction.
    * Note that if a local transaction exists, but is not ACTIVE or PREPARING, null is returned too.
    * @return A GlobalTransaction, or null if no (local) transaction was associated with the current thread
    */
   public GlobalTransaction getCurrentTransaction() {
      Transaction tx;

      if((tx=getLocalTransaction()) == null) { // no transaction is associated with the current thread
         return null;
      }

      if(!isValid(tx)) { // we got a non-null transaction, but it is not active anymore
         int status=-1;
         try {status=tx.getStatus();} catch(SystemException e) {}
         log.warn("status is " + status + " (not ACTIVE or PREPARING); returning null)");
         return null;
      }

      return getCurrentTransaction(tx);
   }

   public GlobalTransaction getCurrentTransaction(Transaction tx) {
      synchronized(tx_table) {
         GlobalTransaction gtx=tx_table.get(tx);
         if(gtx == null) {
            Address addr=(Address)getLocalAddress();
            gtx=GlobalTransaction.create(addr);
            tx_table.put(tx, gtx);
            TransactionEntry ent=new TransactionEntry();
            ent.setTransaction(tx);
            tx_table.put(gtx, ent);
            if(log.isTraceEnabled())
               log.trace("created new GTX: " + gtx + ", local TX=" + tx);
         }
         return gtx;
      }
   }



   /**
    * Invokes a method against this object. Contains the logger_ic for handling the various
    * use cases, e.g. mode (local, repl_async, repl_sync), transaction (yes or no) and
    * locking (yes or no).
    *
    * @param m
    * @return
    * @throws CacheException
    */
   protected Object invokeMethod(MethodCall m) throws CacheException {
      // if(log.isTraceEnabled()) log.trace("calling method " + m.getName());
      try {
         return interceptor_chain.invoke(m);
      }
      catch(Throwable t) {
         if(t instanceof CacheException)
            throw (CacheException)t;
         throw new NestedRuntimeException(t);
      }
   }


   /**
    * Finds a node given a fully qualified name
    * Whenever nodes are created, and the global transaction is not null, the created
    * nodes have to be added to the transaction's {@link TransactionEntry#nodes}
    * field.<br>
    * When a lock is acquired on a node, a reference to the lock has to be added to the list
    * of locked nodes in the {@link TransactionEntry#locks} field.
    * <p>This operation will also apply different locking to the tree nodes, depending on
    * <tt>operation_type</tt>. If it is <tt>read</tt> type, all nodes will be acquired with
    * read lock. Otherwise, the operation is <tt>write</tt> type, all parent nodes will be acquired
    * with read lock while the destination node acquires write lock.</p>
    *
    * @param fqn                  Fully qualified name for the corresponding node.
    * @return Node
    */
   private Node findNode(Fqn fqn) {
      Node n, child_node=null;
      Object  child_name;
      int     treeNodeSize;
      Fqn     tmp_fqn=new Fqn();

      if(fqn == null) return null;
      if((treeNodeSize=fqn.size()) == 0)
         return root;

      n=root;
      for(int i=0; i < treeNodeSize; i++) {
         child_name=fqn.get(i);
         tmp_fqn=new Fqn(tmp_fqn, child_name);
         child_node=n.getChild(child_name);
         if(child_node == null)
            return null;
         n=child_node;
      }
      return child_node;
   }



   public void notifyNodeCreated(Fqn fqn) {
      for(int i=0; i < listeners.size(); i++)
         ((TreeCacheListener)listeners.elementAt(i)).nodeCreated(fqn);
   }

   public void notifyNodeLoaded(Fqn fqn) {
      for(int i=0; i < listeners.size(); i++)
         ((TreeCacheListener)listeners.elementAt(i)).nodeLoaded(fqn);
   }

   protected void notifyNodeRemoved(Fqn fqn) {
      for(int i=0; i < listeners.size(); i++)
         ((TreeCacheListener)listeners.elementAt(i)).nodeRemoved(fqn);
   }

   protected void notifyNodeEvicted(Fqn fqn) {
      for(int i=0; i < listeners.size(); i++)
         ((TreeCacheListener)listeners.elementAt(i)).nodeEvicted(fqn);
   }

   protected void notifyNodeModified(Fqn fqn) {
      for(int i=0; i < listeners.size(); i++)
         ((TreeCacheListener)listeners.elementAt(i)).nodeModified(fqn);
   }

   protected void notifyNodeVisisted(Fqn fqn) {
      for(int i=0; i < listeners.size(); i++)
         ((TreeCacheListener)listeners.elementAt(i)).nodeVisited(fqn);
   }

   protected void notifyCacheStarted() {
      for(int i=0; i < listeners.size(); i++)
         ((TreeCacheListener)listeners.elementAt(i)).cacheStarted(this);
   }

   protected void notifyCacheStopped() {
      for(int i=0; i < listeners.size(); i++)
         ((TreeCacheListener)listeners.elementAt(i)).cacheStopped(this);
   }

   protected void notifyViewChange(View v) {
      for(int i=0; i < listeners.size(); i++)
         ((TreeCacheListener)listeners.elementAt(i)).viewChange(v);
   }

   /**
    * Generates NodeAdded notifications for all nodes of the tree. This is called whenever the tree is
    * initially retrieved (state transfer)
    */
   protected void notifyAllNodesCreated(Node curr) {
      Node n;
      Map children;

      if(curr == null) return;
      notifyNodeCreated(curr.fqn);
      if((children=curr.getChildren()) != null) {
         for(Iterator it=children.values().iterator(); it.hasNext();) {
            n=(Node)it.next();
            notifyAllNodesCreated(n);
         }
      }
   }

   protected String getDefaultProperties() {
      return "UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;" +
              "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" +
              "PING(timeout=1000;num_initial_members=2):" +
              "MERGE2(min_interval=5000;max_interval=10000):" +
              "FD_SOCK:" +
              "VERIFY_SUSPECT(timeout=1500):" +
              "pbcast.NAKACK(gc_lag=50;max_xmit_size=8192;retransmit_timeout=600,1200,2400,4800):" +
              "UNICAST(timeout=600,1200,2400,4800):" +
              "pbcast.STABLE(desired_avg_gossip=20000):" +
              "FRAG(frag_size=8192;down_thread=false;up_thread=false):" +
              "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
              "shun=false;print_local_addr=true):" +
              "pbcast.STATE_TRANSFER";
   }


   protected int string2Mode(String mode) {
      if(mode == null) return -1;
      String m=mode.toLowerCase().trim();
      if(m.equals("local"))
         return LOCAL;
      else
         if(m.equals("repl_async") || m.equals("repl-async"))
            return REPL_ASYNC;
         else
            if(m.equals("repl_sync") || m.equals("repl-sync"))
               return REPL_SYNC;
            else
               return -1;
   }



}