/*
 * JBoss, the OpenSource J2EE webOS
 *
 * Distributable under LGPL license.
 * See terms of license at gnu.org.
 */
package org.jboss.util.timeout;

import org.jboss.util.ThrowableHandler;
import org.jboss.util.threadpool.BasicThreadPool;
import org.jboss.util.threadpool.ThreadPool;

/**
 *  The timeout factory.
 *
 *  This is written with performance in mind. In case of <code>n</code>
 *  active timeouts, creating, cancelling and firing timeouts all operate
 *  in time <code>O(log(n))</code>.
 *
 *  If a timeout is cancelled, the timeout is not discarded. Instead the
 *  timeout is saved to be reused for another timeout. This means that if
 *  no timeouts are fired, this class will eventually operate without
 *  allocating anything on the heap.
 *
 *  @author <a href="osh@sparre.dk">Ole Husgaard</a>
 *  @version $Revision: 1.2.6.1 $
 */
public class TimeoutFactory
{
   //  Code commented out with the mark "INV:" are runtime checks
   //  of invariants that are not needed for a production system.
   //  For problem solving, you can remove these comments.
   //  Multithreading notes:
   //
   //  While a TimeoutImpl is enqueued, its index field contains the index
   //  of the instance in the queue; that is, for 1 <= n <= size,
   //  q[n].index = n.
   //  Modifications of an enqueued TimeoutImpl instance may only happen
   //  in code synchronized on the TimeoutFactory instance that has it
   //  enqueued.
   //  Modifications on the priority queue may only happen while running in
   //  code synchronized on the TimeoutFactory instance that holds the queue.
   //  When a TimeoutImpl instance is no longer enqueued, its index field
   //  changes to one of the negative constants declared in the TimeoutImpl
   //  class.
   //  When a TimeoutImpl is not in use, its index field is TimeoutImpl.DONE
   //  and it is on the freeList.
   //
   //  Cancellation may race with the timeout.
   //  To avoid problems with this, the TimeoutImpl index field is set to
   //  TimeoutImpl.TIMEOUT when the TimeoutImpl is taken out of the queue.
   //  Finally the index field is set to TimeoutImpl.DONE, and
   //  the TimeoutImpl instance is discarded.

   /** The threadpool used to invoke timeouts */
   private ThreadPool threadPool = new BasicThreadPool("Timeouts");

   /**
    *  Our private Timeout implementation.
    */
   private class TimeoutImpl implements Timeout
   {
      static final int DONE = -1; // done, may be finalized or reused

      static final int TIMEOUT = -2; // target being called

      int index; // index in queue, or one of constants above.

      long time; // time to fire

      TimeoutTarget target; // target to fire at

      TimeoutImpl nextFree; // next on free list

      public void cancel()
      {
         TimeoutFactory.this.dropTimeout(this);
      }
   }

   /**
    *  A worker thread that fires the timeout.
    */
   private static class TimeoutWorker implements Runnable
   {
      private TimeoutImpl work;

      /**
       *  Create a new instance.
       *
       *  @param work The timeout that should be fired.
       */
      TimeoutWorker(TimeoutImpl work)
      {
         this.work = work;
      }

      /**
       *  Override to fire the timeout.
       */
      public void run()
      {
         try
         {
            work.target.timedOut(work);
         }
         catch (Throwable t)
         {
            ThrowableHandler.add(ThrowableHandler.Type.ERROR, t);
         }
         synchronized (work)
         {
            work.index = TimeoutImpl.DONE;
         }
      }
   }

   /** Linked list of free TimeoutImpl instances. */
   private TimeoutImpl freeList;

   /** The size of the timeout queue. */
   private int size;

   /**
    *  Our priority queue.
    *
    *  This is a balanced binary tree. If nonempty, the root is at index 1,
    *  and all nodes are at indices 1..size. Nodes with index greater than
    *  size are null. Index 0 is never used.
    *  Children of the node at index <code>j</code> are at <code>j*2</code>
    *  and <code>j*2+1</code>. The children of a node always fire the timeout
    *  no earlier than the node.
    *
    *
    *  Or, more formally:
    *
    *  Only indices <code>1</code>..<code>size</code> of this array are used.
    *  All other indices contain the null reference.
    *  This array represent a balanced binary tree.
    *
    *  If <code>size</code> is <code>0</code> the tree is empty, otherwise
    *  the root of the tree is at index <code>1</code>.
    *
    *  Given an arbitrary node at index <code>n</code> that is not the root
    *  node, the parent node of <code>n</code> is at index <code>n/2</code>.
    *
    *  Given an arbitrary node at index <code>n</code>; if
    *  <code>2*n <= size</code> the node at <code>n</code> has its left child
    *  at index <code>2*n</code>, otherwise the node at <code>n</code> has
    *  no left child.
    *
    *  Given an arbitrary node at index <code>n</code>; if
    *  <code>2*n+1 <= size</code> the node at <code>n</code> has its right child
    *  at index <code>2*n+1</code>, otherwise the node at <code>n</code> has
    *  no right child.
    *
    *  The priority function is called T. Given a node <code>n</code>,
    *  <code>T(n)</code> denotes the absolute time (in milliseconds since
    *  the epoch) that the timeout for node <code>n</code> should happen.
    *  Smaller values of <code>T</code> means higher priority.
    *
    *  The tree satisfies the following invariant:
    *  <i>
    *  For any node <code>n</code> in the tree:
    *  If node <code>n</code> has a left child <code>l</code>,
    *  <code>T(n) <= T(l)</code>.
    *  If node <code>n</code> has a right child <code>r</code>,
    *  <code>T(n) <= T(r)</code>.
    *  </i>
    *
    *
    *  The invariant may be temporarily broken while executing synchronized
    *  on <code>this</code> instance, but is always reestablished before
    *  leaving the synchronized code.
    *
    *  The node at index <code>1</code> is always the first node to timeout,
    *  as can be deduced from the invariant.
    *
    *  For the following algorithm pseudocode, the operation
    *  <code>swap(n,m)</code> denotes the exchange of the nodes at indices
    *  <code>n</code> and <code>m</code> in the tree.
    *
    *  Insertion of a new node happend as follows:
    *  <pre>
    *    IF size = q.length THEN
    *      "expand q array to be larger";
    *    ENDIF
    *    size <- size + 1;
    *    q[size] <- "new node";
    *    n <- size;
    *    WHILE n > 1 AND T(n/2) > T(n) DO
    *      swap(n/2, n);
    *      n <- n/2;
    *    ENDWHILE
    *  </pre>
    *  Proof that this insertion algorithm respects the invariant is left to
    *  the interested reader.
    *
    *  The removal algorithm is a bit more complicated. To remove the node
    *  at index <code>n</code>:
    *  <pre>
    *    swap(n, size);
    *    size <- size - 1;
    *    IF n > 1 AND T(n/2) > T(n) THEN
    *      WHILE n > 1 AND T(n/2) > T(n) DO
    *        swap(n/2, n);
    *        n <- n/2;
    *      ENDWHILE
    *    ELSE
    *      WHILE 2*n <= size DO
    *        IF 2*n+1 <= size THEN
    *          // Both children present
    *          IF T(2*n) <= T(2*n+1) THEN
    *            IF T(n) <= T(2*n) THEN
    *              EXIT;
    *            ENDIF
    *            swap(n, 2*n);
    *            n <- 2*n;
    *          ELSE
    *            IF T(n) <= T(2*n+1) THEN
    *              EXIT;
    *            ENDIF
    *            swap(n, 2*n+1);
    *            n <- 2*n+1;
    *          ENDIF
    *        ELSE
    *          // Only left child, right child not present.
    *          IF T(n) <= T(2*n) THEN
    *            EXIT;
    *          ENDIF
    *          swap(n, 2*n);
    *          n <- 2*n;
    *        ENDIF
    *      ENDWHILE
    *    ENDIF
    *  </pre>
    *  Proof that this removal algorithm respects the invariant is left to
    *  the interested reader. Really, I am not going to prove it here.
    *
    *  If you are interested, you can find this data structure and its
    *  associated operations in most textbooks on algorithmics.
    *
    *  @see checkTree
    */
   private TimeoutImpl[] q;

   /**
    *  Debugging helper.
    */
   private void assertExpr(boolean expr)
   {
      if (!expr)
      {
         RuntimeException ex = new RuntimeException("***** assert failed *****");
         try
         {
            Thread.sleep(30000);
         }
         catch (Exception e)
         {
         }
      }
   }

   /**
    *  Check invariants of the queue.
    */
   private void checkTree()
   {
      assertExpr(size >= 0);
      assertExpr(size < q.length);
      assertExpr(q[0] == null);
      if (size > 0)
      {
         assertExpr(q[1] != null);
         assertExpr(q[1].index == 1);
         for (int i = 2; i <= size; ++i)
         {
            assertExpr(q[i] != null);
            assertExpr(q[i].index == i);
            assertExpr(q[i >> 1].time <= q[i].time); // parent fires first
         }
         for (int i = size + 1; i < q.length; ++i)
            assertExpr(q[i] == null);
      }
   }

   /**
    *  Check invariants of the free list.
    */
   private void checkFreeList()
   {
      TimeoutImpl to = freeList;
      while (to != null)
      {
         assertExpr(to.index == TimeoutImpl.DONE);
         to = to.nextFree;
      }
   }

   /**
    *  Swap two nodes in the tree.
    */
   private void swap(int a, int b)
   {
      // INV: assertExpr(a > 0);
      // INV: assertExpr(a <= size);
      // INV: assertExpr(b > 0);
      // INV: assertExpr(b <= size);
      // INV: assertExpr(q[a] != null);
      // INV: assertExpr(q[b] != null);
      // INV: assertExpr(q[a].index == a);
      // INV: assertExpr(q[b].index == b);
      TimeoutImpl temp = q[a];
      q[a] = q[b];
      q[a].index = a;
      q[b] = temp;
      q[b].index = b;
   }

   /**
    *  A new node has been added at index <code>index</code>.
    *  Normalize the tree by moving the new node up the tree.
    *
    *  @return True iff the tree was modified.
    */
   private boolean normalizeUp(int index)
   {
      // INV: assertExpr(index > 0);
      // INV: assertExpr(index <= size);
      // INV: assertExpr(q[index] != null);
      if (index == 1)
         return false; // at root
      boolean ret = false;
      long t = q[index].time;
      int p = index >> 1;
      while (q[p].time > t)
      {
         // INV: assertExpr(q[index].time == t);
         swap(p, index);
         ret = true;
         if (p == 1)
            break; // at root
         index = p;
         p >>= 1;
      }
      return ret;
   }

   /**
    *  Remove a node from the tree and normalize.
    *
    *  @return The removed node.
    */
   private TimeoutImpl removeNode(int index)
   {
      // INV: assertExpr(index > 0);
      // INV: assertExpr(index <= size);
      TimeoutImpl res = q[index];
      // INV: assertExpr(res != null);
      // INV: assertExpr(res.index == index);
      if (index == size)
      {
         --size;
         q[index] = null;
         return res;
      }
      swap(index, size); // Exchange removed node with last leaf node
      --size;
      // INV: assertExpr(res.index == size + 1);
      q[res.index] = null;
      if (normalizeUp(index))
         return res; // Node moved up, so it shouldn't move down
      long t = q[index].time;
      int c = index << 1;
      while (c <= size)
      {
         // INV: assertExpr(q[index].time == t);
         TimeoutImpl l = q[c];
         // INV: assertExpr(l != null);
         // INV: assertExpr(l.index == c);
         if (c + 1 <= size)
         {
            // two children, swap with smallest
            TimeoutImpl r = q[c + 1];
            // INV: assertExpr(r != null);
            // INV: assertExpr(r.index == c+1);
            if (l.time <= r.time)
            {
               if (t <= l.time)
                  break; // done
               swap(index, c);
               index = c;
            }
            else
            {
               if (t <= r.time)
                  break; // done
               swap(index, c + 1);
               index = c + 1;
            }
         }
         else
         { // one child
            if (t <= l.time)
               break; // done
            swap(index, c);
            index = c;
         }
         c = index << 1;
      }
      return res;
   }

   /**
    *  Create a new timeout.
    */
   private synchronized Timeout newTimeout(long time, TimeoutTarget target)
   {
      // INV: checkTree();
      // INV: assertExpr(size < q.length);
      if (++size == q.length)
      {
         TimeoutImpl[] newQ = new TimeoutImpl[2 * q.length];
         System.arraycopy(q, 0, newQ, 0, q.length);
         q = newQ;
      }
      // INV: assertExpr(size < q.length);
      // INV: assertExpr(q[size] == null);
      TimeoutImpl timeout;
      if (freeList != null)
      {
         timeout = q[size] = freeList;
         freeList = timeout.nextFree;
         // INV: checkFreeList();
         // INV: assertExpr(timeout.index == TimeoutImpl.DONE);
      }
      else
         timeout = q[size] = new TimeoutImpl();
      timeout.index = size;
      timeout.time = time;
      timeout.target = target;
      normalizeUp(size);
      if (timeout.index == 1)
         notify();
      // INV: checkTree();
      return timeout;
   }

   /**
    *  Cancel a timeout.
    */
   private void dropTimeout(TimeoutImpl timeout)
   {
      synchronized (this)
      {
         if (timeout.index > 0)
         {
            // Active timeout, remove it.
            // INV: assertExpr(q[timeout.index] == timeout);
            // INV: checkTree();
            removeNode(timeout.index);
            // INV: checkTree();
            timeout.index = TimeoutImpl.DONE;
            timeout.nextFree = freeList;
            freeList = timeout;
            // INV: checkFreeList();
            return;
         }
      }
   }

   /**
    *  Timeout worker method.
    *  This method never returns. Whenever it is time to do a timeout,
    *  the callback method is called from here.
    */
   private void doWork()
   {
      while (true)
      {
         TimeoutImpl work = null;
         // Look for work
         synchronized (this)
         {
            if (size == 0)
            {
               try
               {
                  wait();
               }
               catch (InterruptedException ex)
               {
               }
            }
            else
            {
               long now = System.currentTimeMillis();
               if (q[1].time > now)
               {
                  try
                  {
                     wait(q[1].time - now);
                  }
                  catch (InterruptedException ex)
                  {
                  }
               }
               if (size > 0 && q[1].time <= System.currentTimeMillis())
               {
                  work = removeNode(1);
                  work.index = TimeoutImpl.TIMEOUT;
               }
            }
         }
         // Do work, if any
         if (work != null)
         {
            // Create a new thread to do the callback.
            TimeoutWorker worker = new TimeoutWorker(work);
            threadPool.run(worker);
         }
      }
   }

   /** Our singleton instance. */
   static private TimeoutFactory singleton;

   /** Our private constructor. */
   private TimeoutFactory()
   {
      freeList = null;
      size = 0;
      q = new TimeoutImpl[16];
   }

   /**
    *  Initialize class.
    *  The will initialize the singleton and create a single
    *  worker thread.
    */
   static
   {
      singleton = new TimeoutFactory();
      Thread thread = new Thread("TimeoutFactory")
      {
         public void run()
         {
            singleton.doWork();
         }
      };
      thread.setDaemon(true);
      thread.start();
   }

   /**
    *  Schedule a new timeout.
    */
   static public Timeout createTimeout(long time, TimeoutTarget target)
   {
      if (time <= 0)
         throw new IllegalArgumentException("Time not positive");
      if (target == null)
         throw new IllegalArgumentException("Null target");
      return singleton.newTimeout(time, target);
   }
}