/*
 * JBoss, the OpenSource J2EE webOS
 *
 * Distributable under LGPL license.
 * See terms of license at gnu.org.
 * Created on March 25 2003
 */
package org.jboss.cache;


import org.jboss.cache.lock.IdentityLock;
import org.jboss.cache.lock.LockingException;
import org.jboss.cache.lock.TimeoutException;
import org.jboss.cache.lock.UpgradeException;
import org.jboss.logging.Logger;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.*;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;

/**
 * Represents a node in the tree. Has a relative name and a Fqn. Maintains a hashmap.
 * If the node is created in a replicated cache, the relative and fully qualified name,
 * and the keys and values of the hashmap have to be serializable.
 * <p>Note that current version supports different levels of transaction locking such
 * as simple locking (<CODE>TreeCache.SERIALIZABLE</code>, or R/W lock with upgrade (
 * <code>REPEATABLE_READ</code>) --that is the read lock
 * will be automatically upgraded to write lock when the same owner intends to modify
 * the data after read.</p>
 * This class is <em>not</em> synchronized, so access to instances of Node need to be run under an
 * isolation level > NONE.
 * <br/>
 * @author Bela Ban March 25 2003
 * @author Ben Wang
 * @version $Revision: 1.39.2.4 $
 */
public class Node implements Externalizable {

   static final long serialVersionUID = -5040432493172658796L;

   /** Relative name of the node, e.g. "c" */
   Object name=null;

   /** Fully qualified name (e.g. "/a/b/c") */
   Fqn fqn=null;

   /** Parent node. For navigation purposes */
   Node parent=null;

   /** Children. Map<Object,Node>. Keys: child name (Object), value: Node */
   Map children=null;

   final transient Object childrenMutex=new Object();

   /** Data for the current node. Map<Object,Object> */
   Map data=null;

   final transient Object dataMutex=new Object();


   /** Lock manager that manages locks to be acquired when accessing
    * the node inside a transaction. Lazy loading just in case locking is not needed */
   transient IdentityLock lock_=null;

   /** Declare locking type with the intention to read or write data */
   public static final int LOCK_TYPE_READ=1;
   public static final int LOCK_TYPE_WRITE=2;

   static transient Logger log=Logger.getLogger(Node.class);

   /** Keep a reference of the TreeCache instance */
   transient TreeCache cache;

   protected static final int INDENT=4;

   public static transient final boolean PRINT_LOCK_DETAILS=Boolean.getBoolean("print_lock_details");


   public Node() {
      // used by externalization
   }

//   public Node(TreeCache cache) {
//      this.cache = cache;
//      init();
//   }

   public Node(Object child_name, Fqn fqn, Node parent, Map data, TreeCache cache) {
      init(child_name, fqn, parent, cache);
      if(data != null)
         this.data().putAll(data);
   }

   public Node(Object child_name, Fqn fqn, Node parent, Object key, Object value, TreeCache cache) {
      init(child_name, fqn, parent, cache);
      data().put(key, value);
   }

   protected void init(Object child_name, Fqn fqn, Node parent, TreeCache cache) {
      this.cache = cache;
      name=child_name;
      this.fqn=fqn;
      this.parent=parent;
      init();
   }

   protected void init() {
      lock_ = new IdentityLock(cache, fqn);
   }

   protected Map initChildren() {
      return new ConcurrentReaderHashMap();
   }

   protected Map initData() {
      return new HashMap();
   }

   protected Map children() {
      synchronized(childrenMutex) {
         if(children == null)
            children=initChildren();
         return children;
      }
   }

   protected Map data() {
      synchronized(dataMutex) {
         if(data == null)
            data=initData();
         return data;
      }
   }

   void setTreeCacheInstance(TreeCache cache) {
      this.cache = cache;
   }

   /**
    * Set the tree cache instance recursively down to the children as well.
    * Note that this method is not currently thread safe.
    * @param cache
    */
   void setRecursiveTreeCacheInstance(TreeCache cache) {
      this.cache = cache;

      if(children != null) {
         for(Iterator it = children.keySet().iterator(); it.hasNext(); ) {
            Node nd = (Node)children.get(it.next());
            nd.setRecursiveTreeCacheInstance(cache);
         }
      }
   }

   public Object getName() {
      return name;
   }

   public Fqn getFqn() {
      return fqn;
   }

   public Object get(Object key) {
      return data == null? null : data.get(key);
   }

   public boolean containsKey(Object key) {
      return data == null? false : data.containsKey(key);
   }

   public Node getChild(Object child_name) {
      if(child_name == null) return null;
      return children == null ? null : (Node)children.get(child_name);
   }

   public Node getParent() {
      return parent;
   }

   public Set getDataKeys() {
      return data != null? data.keySet() : null;
   }


   public boolean childExists(Object child_name) {
      if(child_name == null) return false;
      return children != null? children.containsKey(child_name) : false;
   }

   boolean isReadLocked() {
      return lock_ != null ? lock_.isReadLocked() : false;
   }

   boolean isWriteLocked() {
      return lock_ != null ? lock_.isWriteLocked() : false;
   }

   boolean isLocked() {
      return isWriteLocked() || isReadLocked();
   }

   /**
    * @deprecated Use getLock() instead
    * @return
    */
   public IdentityLock getImmutableLock() {
      return lock_;
   }

   public IdentityLock getLock() {
      return lock_;
   }

   /** @return Map<Object,Node> */
   public Map getChildren() {
      return children;
   }

   /** Returns a copy of the attributes */
   public Map getData() {
      if(data == null)
         return null;
      return new HashMap(data);
   }

   public int numAttributes() {
      return data != null? data.size() : 0;
   }

   /**
    * @deprecated Use {@link #hasChildren} instead
    * @return
    */
   public boolean hasChild() {
      return hasChildren();
   }

   public boolean hasChildren() {
      return children != null? children.size() > 0 : false;
   }

   public void put(Map data, boolean erase) {
      if(erase) {
         if(this.data != null)
            this.data.clear();
      }
      if(data == null) return;
      this.data().putAll(data);
   }

   public void put(Map data) {
      put(data, false);
   }


   public Object put(Object key, Object value) {
      return this.data().put(key, value);
   }

   public Node createChild(Object child_name, Fqn fqn, Node parent) {
      Node child=null;

      if(child_name == null) return null;
      child=(Node)children().get(child_name);
      if(child == null) {
         child=new Node(child_name, fqn, parent, null, cache);
         children.put(child_name, child);
      }
      if(log.isTraceEnabled())
         log.trace("createChild: fqn=" + fqn + ", child_name=" + child_name);
      return child;
   }

   public Node createChild(Object child_name, Fqn fqn, Node parent, Map data) {
      Node child=null;

      if(child_name == null) return null;
      child=(Node)children().get(child_name);
      if(child != null)
         child.put(data);
      else {
         child=new Node(child_name, fqn, parent, data, cache);
         children.put(child_name, child);
      }
      if(log.isTraceEnabled())
         log.trace("createChild: fqn=" + fqn + ", child_name=" + child_name);
      return child;
   }

   public Node createChild(Object child_name, Fqn fqn, Node parent, Object key, Object value) {
      Node child=null;

      if(child_name == null) return null;
      child=(Node)children().get(child_name);
      if(child != null)
         child.put(key, value);
      else {
         child=new Node(child_name, fqn, parent, key, value, cache);
         children.put(child_name, child);
      }
      if(log.isTraceEnabled())
         log.trace("createChild: fqn=" + fqn + ", child_name=" + child_name);
      return child;
   }

   /**
    * Adds the (already created) child node. Replaces existing node if present.
    * @param child_name
    * @param n
    */
   void addChild(Object child_name, Node n) {
      if(child_name == null)
         return;
      children().put(child_name, n);
   }

   public Object remove(Object key) {
      return data != null?  data.remove(key) : null;
   }

   public void clear() {
      if(data != null) {
         data.clear();
         data=null;
      }
   }

   public void removeChild(Object child_name) {
      if(children != null) {
         children.remove(child_name);
         if(log.isTraceEnabled())
            log.trace("removed child " + child_name);
      }
   }

   public void removeAllChildren() {
      if(children != null)
         children.clear();
   }

   public void print(StringBuffer sb, int indent) {
      printIndent(sb, indent);
      sb.append(TreeCache.SEPARATOR).append(name);
      if(children != null && children.size() > 0) {
         Collection values=children.values();
         for(Iterator it=values.iterator(); it.hasNext();) {
            sb.append("\n");
            ((Node)it.next()).print(sb, indent + INDENT);
         }
      }
   }

   public void printDetails(StringBuffer sb, int indent) {
      Map.Entry entry;
      printIndent(sb, indent);
      sb.append(TreeCache.SEPARATOR).append(name);
      sb.append("\n");
      if(data != null) {
         for(Iterator it=data.entrySet().iterator(); it.hasNext();) {
            entry=(Map.Entry)it.next();
            sb.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
         }
      }
      if(children != null && children.size() > 0) {
         Collection values=children.values();
         for(Iterator it=values.iterator(); it.hasNext();) {
            sb.append("\n");
            ((Node)it.next()).printDetails(sb, indent);
         }
      }
   }

   public void printLockInfo(StringBuffer sb, int indent) {
      boolean locked=lock_ != null ? lock_.isLocked() : false;

      printIndent(sb, indent);
      sb.append(TreeCache.SEPARATOR).append(name);
      if(locked) {
         sb.append("\t(");
         lock_.toString(sb);
         sb.append(")");
      }

      if(children != null && children.size() > 0) {
         Collection values=children.values();
         for(Iterator it=values.iterator(); it.hasNext();) {
            sb.append("\n");
            ((Node)it.next()).printLockInfo(sb, indent + INDENT);
         }
      }
   }

   public void printIndent(StringBuffer sb, int indent) {
      if(sb != null) {
         for(int i=0; i < indent; i++)
            sb.append(" ");
      }
   }

   public String toString() {
      StringBuffer sb=new StringBuffer();
      if(name != null) sb.append("\nname=" + name);
      if(fqn != null) sb.append("\nfqn=" + fqn);
      if(data != null)
         sb.append("\ndata=" + data);
      if(lock_ != null) {
         sb.append("\n read locked=").append(isReadLocked());
         sb.append("\n write locked=").append(isWriteLocked());
      }
      return sb.toString();
   }


   public Object clone() throws CloneNotSupportedException {
      Node n=new Node(name, fqn != null? (Fqn)fqn.clone() : null, parent != null ? (Node)parent.clone() : null, data, cache);
      n.children=children == null? null : (HashMap)((HashMap)children).clone();
      return n;
   }

   /**
    * Acquire a node object by locking it first. The lock type is specified via
    * <code>lock_type</code>.
    *
    * @param caller     Tansaction context owner.
    * @param timeout   Timeout in milliseconds
    * @param lock_type @see Node
    * @throws LockingException
    * @throws TimeoutException
    * @return boolean True if node could be acquired, false if owner already held the lock. An exception is
    * thrown if the owner didn't already hold the lock, but it couldn't be acquired
    */
   public boolean acquire(Object caller, long timeout, int lock_type) throws LockingException, TimeoutException, InterruptedException {
      // Note that we rely on IdentityLock for synchronization
      try {
         if(lock_type == LOCK_TYPE_READ)
            return acquireReadLock(caller, timeout);
         else
            return acquireWriteLock(caller, timeout);
      }
      catch(UpgradeException e) {
         if(log.isDebugEnabled())
            log.debug("failure upgrading lock: fqn=" + fqn + ", caller=" + caller + ", lock=" + lock_.toString(true));
         throw e;
      }
      catch(LockingException e) {
         if(log.isDebugEnabled())
            log.debug("failure acquiring lock: fqn=" + fqn + ", caller=" + caller + ", lock=" + lock_.toString(true));
         throw e;
      }
      catch(TimeoutException e) {
         if(log.isDebugEnabled())
            log.debug("failure acquiring lock: fqn=" + fqn + ", caller=" + caller + ", lock=" + lock_.toString(true));
         throw e;
      }
   }

   protected boolean acquireReadLock(Object caller, long timeout) throws LockingException, TimeoutException, InterruptedException {
      if(log.isTraceEnabled()) {
         StringBuffer sb=new StringBuffer();
         sb.append("acquiring RL: fqn=").append(fqn).append(", caller=").append(caller);
         sb.append(", lock=").append(lock_.toString(PRINT_LOCK_DETAILS));
         log.trace(sb.toString());
      }
      boolean flag=lock_.acquireReadLock(caller, timeout);
      if(log.isTraceEnabled()) {
         StringBuffer sb=new StringBuffer();
         sb.append("acquired RL: fqn=").append(fqn).append(", caller=").append(caller);
         sb.append(", lock=").append(lock_.toString(PRINT_LOCK_DETAILS));
         log.trace(sb.toString());
      }
      return flag;
   }

   protected boolean acquireWriteLock(Object caller, long timeout) throws LockingException, TimeoutException, InterruptedException {
      if(log.isTraceEnabled()) {
         StringBuffer sb=new StringBuffer();
         sb.append("acquiring WL: fqn=").append(fqn).append(", caller=").append(caller);
         sb.append(", lock=").append(lock_.toString(PRINT_LOCK_DETAILS));
         log.trace(sb.toString());
      }
      boolean flag=lock_.acquireWriteLock(caller, timeout);
      if(log.isTraceEnabled()) {
         StringBuffer sb=new StringBuffer();
         sb.append("acquired WL: fqn=").append(fqn).append(", caller=").append(caller);
         sb.append(", lock=").append(lock_.toString(PRINT_LOCK_DETAILS));
         log.trace(sb.toString());
      }
      return flag;
   }

   /**
    * Acquires locks for the entire subtree
    */
   public Set acquireAll(Object caller, long timeout, int lock_type) throws LockingException, TimeoutException, InterruptedException {
      Node tmp;
      Set retval=new LinkedHashSet();
      boolean acquired=acquire(caller, timeout, lock_type);
      if(acquired)
         retval.add(getLock());

      if(children != null) {
         for(Iterator it=children.values().iterator(); it.hasNext();) {
            tmp=(Node)it.next();
            retval.addAll(tmp.acquireAll(caller, timeout, lock_type));
         }
      }
      return retval;
   }

   public void release(Object caller) {
      lock_.release(caller);
   }

   public void releaseForce() {
      lock_.releaseForce();
   }

   /**
    * Releases locks for the entire subtree
    */
   public void releaseAll(Object owner) {
      Node tmp;
      if(children != null) {
         for(Iterator it=children.values().iterator(); it.hasNext();) {
            tmp=(Node)it.next();
            tmp.releaseAll(owner);
         }
      }
      release(owner);
   }

   /**
    * Releases locks for the entire subtree. Forces release regardless of owner
    */
   public void releaseAllForce() {
      Node tmp;
      if(children != null) {
         for(Iterator it=children.values().iterator(); it.hasNext();) {
            tmp=(Node)it.next();
            tmp.releaseAllForce();
         }
      }
      releaseForce();
   }


   public void writeExternal(ObjectOutput out) throws IOException {
      out.writeObject(name);
      out.writeObject(fqn);
      out.writeObject(parent);
      out.writeObject(children);
      out.writeObject(data);
   }

   public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
      name=in.readObject();
      fqn=(Fqn)in.readObject();
      parent=(Node)in.readObject();
      children=(Map)in.readObject();
      data=(Map)in.readObject();

      // Perform default initialisation now that we have all of the required data
      init();
   }


}