/*
 * JBoss, the OpenSource J2EE webOS
 *
 * Distributable under LGPL license.
 * See terms of license at gnu.org.
 */
package org.jboss.cache.lock;

import EDU.oswego.cs.dl.util.concurrent.Sync;
import org.jboss.cache.TreeCache;
import org.jboss.cache.Fqn;
import org.jboss.logging.Logger;

import java.util.Collection;
import java.util.List;
import java.util.ArrayList;

/**
 * Lock object which grants and releases locks, and associates locks with <em>owners</em>.
 * The methods to acquire and release a lock require an owner (Object). When a lock is acquired,
 * we store the current owner with the lock. When the same owner (<em>but possibly
 * running in a different thread</em>) wants to acquire the lock, it is immediately
 * granted. When an owner different from the one currently owning the lock wants to release
 * the lock, we do nothing (no-op).<p>
 * Consider the following example:
 * <pre>
 * FIFOSemaphore lock=new FIFOSemaphore(1);
 * lock.acquire();
 * lock.acquire();
 * lock.release();
 * </pre>
 * This would block on the second <tt>acquire()</tt> (although we currently already hold
 * the lock) because the lock only has 1 permit. So <tt>IdentityLock</tt> will allow the
 * following code to work properly:
 * <pre>
 * IdentityLock lock=new IdentityLock();
 * lock.readLock().acquire(this, timeout);
 * lock.writeLock().acquire(this, timeout);
 * lock.release(this);
 * </pre>
 * If the owner is null, then the current thread is taken by default. This allow the following
 * code to work:
 * <pre>
 * IdentityLock lock=new IdentityLock();
 * lock.readLock().attempt(null, timeout);
 * lock.writeLock().attempt(null, timeout);
 * lock.release(null);
 * </pre>
 * <br/>
 * Note that the Object given as owner is required to implement {@link Object#equals}. For
 * a use case see the examples in the testsuite.
 *
 * @author <a href="mailto:bela@jboss.org">Bela Ban</a> Apr 11, 2003
 * @author Ben Wang July 2003
 * @version $Revision: 1.13.2.2 $
 */
public class IdentityLock
{
   private static Logger log = Logger.getLogger(IdentityLock.class);
   private LockStrategy lock_;
   private LockMap map_;
   private TreeCache cache_;
   private transient Fqn fqn;

   public IdentityLock(TreeCache cache, Fqn fqn)
   {
      cache_ = cache;
      this.fqn=fqn;
      if(cache_ == null) {
         if(log.isDebugEnabled()) {
            log.debug("Cache instance is null. Use default lock strategy");
         }
         lock_ = LockStrategyFactory.getLockStrategy();
      } else {
         lock_ = LockStrategyFactory.getLockStrategy(cache_.getIsolationLevelClass());
      }
      map_ = new LockMap();
      cache_ = cache;
   }


   public Fqn getFqn() {
      return fqn;
   }

   /**
    * Return a copy of the reader lock owner in List. Size is zero is not available. Note that this list
    * is synchronized.
    *
    * @return
    */
   public List getReaderOwners()
   {
      return map_.readerOwners();
   }

   /**
    * Return the writer lock owner object. Null if not available.
    *
    * @return
    */
   public Object getWriterOwner()
   {
      return map_.writerOwner();
   }

   /**
    * Acquire a write lock with a timeout of <code>timeout</code> milliseconds.
    * Note that if the current owner owns a read lock, it will be upgraded
    * automatically. However, if upgrade fails, i.e., timeout, the read lock will
    * be released automatically.
    *
    * @param caller   Can't be null.
    * @param timeout
    * @throws LockingException
    * @throws TimeoutException
    * @return boolean True if lock was acquired and was not held before, false if lock was held
    */
   public boolean acquireWriteLock(Object caller, long timeout) throws LockingException, TimeoutException, InterruptedException
   {
      boolean rc;

      if (caller == null) {
         throw new IllegalArgumentException("acquireWriteLock(): null caller");
      }

      synchronized(this) {
         if (map_.isOwner(caller, LockMap.OWNER_WRITE)) {
            if (log.isTraceEnabled())
               log.trace("acquireWriteLock(): caller already owns lock for " + fqn + " (caller=" + caller + ")");
            return false; // owner already has the lock
         }
      }

      // Check first if we need to upgrade
      if (map_.isOwner(caller, LockMap.OWNER_READ)) {
         // Currently is a reader owner. Obtain the writer ownership then.
         Sync wLock = null;
         try {
            if(log.isTraceEnabled())
               log.trace("upgrading RL to WL for " + caller + ", timeout=" + timeout + ", locks: " + map_.printInfo());
            wLock = lock_.upgradeLockAttempt(timeout);
         } catch (UpgradeException ue) {
            String errStr="acquireWriteLock(): lock upgrade failed for " + fqn + " (caller=" + caller + ")";
            log.error(errStr, ue);
            throw new UpgradeException(errStr, ue);
         }
         if (wLock == null) {
            release(caller);   // bug fix: remember to release the read lock before throwing the exception
            map_.removeReader(caller);
            String errStr="upgrade lock for " + fqn + " could not be acquired after " + timeout + " ms." +
                  " Lock map ownership " + map_.printInfo() + " (caller=" + caller + ")";
            log.error(errStr);
            throw new UpgradeException(errStr);
         }
         try {
            if (log.isTraceEnabled())
               log.trace("upgrading lock for " + fqn);
            map_.upgrade(caller);
         } catch (OwnerNotExistedException ex) {
            throw new UpgradeException("Can't upgrade lock to WL for " + fqn + ", error in LockMap.upgrade(): " + ex);
         }
      }
      else {
         // Not a current reader owner. Obtain the writer ownership then.
         rc = lock_.writeLock().attempt(timeout);

         // we don't need to synchronize from here on because we own the semaphore
         if (rc == false) {
            String errStr = "write lock for " + fqn + " could not be acquired after " + timeout + " ms. " +
                  "Locks: " + map_.printInfo() + " (caller=" + caller + ", lock info: " + toString(true) + ")";
            log.error(errStr);
            throw new TimeoutException(errStr);
         }
         map_.addWriter(caller);
      }
      return true;
   }

   /**
    * Acquire a read lock with a timeout period of <code>timeout</code> milliseconds.
    *
    * @param caller   Can't be null.
    * @param timeout
    * @throws LockingException
    * @throws TimeoutException
    * @return boolean True if lock was acquired and was not held before, false if lock was held
    */
   public boolean acquireReadLock(Object caller, long timeout) throws LockingException, TimeoutException, InterruptedException
   {
      boolean rc;

      if (caller == null) {
         throw new IllegalArgumentException("owner is null");
      }

      synchronized(this) {
         if (map_.isOwner(caller, LockMap.OWNER_ANY)) {
            if (log.isTraceEnabled())
               log.trace("acquireReadLock(): caller " + caller + " already owns lock for " + fqn);
            return false; // owner already has the lock
         }
      }

      rc = lock_.readLock().attempt(timeout);

      // we don't need to synchronize from here on because we own the semaphore
      if (rc == false) {
         String errStr = "read lock for " + fqn + " could not be acquired by " + caller +
               " after " + timeout + " ms. " + "Locks: " + map_.printInfo() + ", lock info: " + toString(true);
         log.error(errStr);
         throw new TimeoutException(errStr);
      }

      map_.addReader(caller); // this is synchronized internally, we don't need to synchronize here
      return true;
   }

   /**
    * Release the lock held by the owner.
    *
    * @param caller Can't be null.
    */
   public void release(Object caller)
   {
      if (caller == null) {
         throw new IllegalArgumentException("IdentityLock.release(): null owner object.");
      }

      synchronized(this) {
         // Check whether to release reader or writer lock.
         if (map_.isOwner(caller, LockMap.OWNER_READ)) {
            map_.removeReader(caller);
            lock_.readLock().release();
         }
         else if (map_.isOwner(caller, LockMap.OWNER_WRITE)) {
            map_.removeWriter(caller);
            lock_.writeLock().release();
         }
      }
   }

   /**
    * Release all locks associated with this instance.
    */
   public void releaseAll()
   {
      Object obj = null;
      Collection col = null;
      if ((obj = map_.writerOwner()) != null) {
         // lock_.readLock().release();
         lock_.writeLock().release();
         map_.removeWriter(obj);
      }

      if ((col = map_.readerOwners()).size() != 0) {
         java.util.Iterator it = col.iterator();
         while (it.hasNext()) {
            lock_.readLock().release();
         }
         col.clear();
      }
   }

   /**
    * Same as releaseAll now.
    */
   public void releaseForce()
   {
      releaseAll();
   }

   /**
    * Check if there is a read lock.
    *
    * @return
    */
   public boolean isReadLocked()
   {
      return (map_.readerOwners().size() != 0);
   }

   /**
    * Check if there is a write lock.
    *
    * @return
    */
   public boolean isWriteLocked()
   {
      return (map_.writerOwner() != null);
   }

   /**
    * Check if there is a read or write lock
    * @return
    */
   public boolean isLocked()
   {
      return isReadLocked() || isWriteLocked();
   }

   /**
    * Am I a lock owner?
    *
    * @param o
    * @return
    */
   public boolean isOwner(Object o)
   {
      return map_.isOwner(o, LockMap.OWNER_ANY);
   }

   public String toString()
   {
      return toString(false);
   }

   public String toString(boolean print_lock_details)
   {
      StringBuffer sb=new StringBuffer();
      toString(sb, print_lock_details);
      return sb.toString();
   }

   public void toString(StringBuffer sb)
   {
      toString(sb, false);
   }

   public synchronized void toString(StringBuffer sb, boolean print_lock_details)
   {
      boolean printed_read_owners=false;
      Collection read_owners=lock_ != null ? new ArrayList(getReaderOwners()) : null;
      if(read_owners != null && read_owners.size() > 0) {
         sb.append("read owners=").append(read_owners);
         printed_read_owners=true;
      }
      else
         read_owners=null;

      Object write_owner=lock_ != null ? getWriterOwner() : null;
      if(write_owner != null) {
         if(printed_read_owners)
            sb.append(", ");
         sb.append("write owner=").append(write_owner);
      }
      if(read_owners == null && write_owner == null)
      {
        sb.append("<unlocked>");
      }
      if(print_lock_details)
      {
         sb.append(" (").append(lock_.toString()).append(")");
      }
   }
}