| TimeoutFactory.java |
/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.util.timeout;
import org.jboss.util.ThrowableHandler;
import org.jboss.util.threadpool.BasicThreadPool;
import org.jboss.util.threadpool.ThreadPool;
/**
* The timeout factory.
*
* This is written with performance in mind. In case of <code>n</code>
* active timeouts, creating, cancelling and firing timeouts all operate
* in time <code>O(log(n))</code>.
*
* If a timeout is cancelled, the timeout is not discarded. Instead the
* timeout is saved to be reused for another timeout. This means that if
* no timeouts are fired, this class will eventually operate without
* allocating anything on the heap.
*
* @author <a href="osh@sparre.dk">Ole Husgaard</a>
* @version $Revision: 1.2.6.1 $
*/
public class TimeoutFactory
{
// Code commented out with the mark "INV:" are runtime checks
// of invariants that are not needed for a production system.
// For problem solving, you can remove these comments.
// Multithreading notes:
//
// While a TimeoutImpl is enqueued, its index field contains the index
// of the instance in the queue; that is, for 1 <= n <= size,
// q[n].index = n.
// Modifications of an enqueued TimeoutImpl instance may only happen
// in code synchronized on the TimeoutFactory instance that has it
// enqueued.
// Modifications on the priority queue may only happen while running in
// code synchronized on the TimeoutFactory instance that holds the queue.
// When a TimeoutImpl instance is no longer enqueued, its index field
// changes to one of the negative constants declared in the TimeoutImpl
// class.
// When a TimeoutImpl is not in use, its index field is TimeoutImpl.DONE
// and it is on the freeList.
//
// Cancellation may race with the timeout.
// To avoid problems with this, the TimeoutImpl index field is set to
// TimeoutImpl.TIMEOUT when the TimeoutImpl is taken out of the queue.
// Finally the index field is set to TimeoutImpl.DONE, and
// the TimeoutImpl instance is discarded.
/** The threadpool used to invoke timeouts */
private ThreadPool threadPool = new BasicThreadPool("Timeouts");
/**
* Our private Timeout implementation.
*/
private class TimeoutImpl implements Timeout
{
static final int DONE = -1; // done, may be finalized or reused
static final int TIMEOUT = -2; // target being called
int index; // index in queue, or one of constants above.
long time; // time to fire
TimeoutTarget target; // target to fire at
TimeoutImpl nextFree; // next on free list
public void cancel()
{
TimeoutFactory.this.dropTimeout(this);
}
}
/**
* A worker thread that fires the timeout.
*/
private static class TimeoutWorker implements Runnable
{
private TimeoutImpl work;
/**
* Create a new instance.
*
* @param work The timeout that should be fired.
*/
TimeoutWorker(TimeoutImpl work)
{
this.work = work;
}
/**
* Override to fire the timeout.
*/
public void run()
{
try
{
work.target.timedOut(work);
}
catch (Throwable t)
{
ThrowableHandler.add(ThrowableHandler.Type.ERROR, t);
}
synchronized (work)
{
work.index = TimeoutImpl.DONE;
}
}
}
/** Linked list of free TimeoutImpl instances. */
private TimeoutImpl freeList;
/** The size of the timeout queue. */
private int size;
/**
* Our priority queue.
*
* This is a balanced binary tree. If nonempty, the root is at index 1,
* and all nodes are at indices 1..size. Nodes with index greater than
* size are null. Index 0 is never used.
* Children of the node at index <code>j</code> are at <code>j*2</code>
* and <code>j*2+1</code>. The children of a node always fire the timeout
* no earlier than the node.
*
*
* Or, more formally:
*
* Only indices <code>1</code>..<code>size</code> of this array are used.
* All other indices contain the null reference.
* This array represent a balanced binary tree.
*
* If <code>size</code> is <code>0</code> the tree is empty, otherwise
* the root of the tree is at index <code>1</code>.
*
* Given an arbitrary node at index <code>n</code> that is not the root
* node, the parent node of <code>n</code> is at index <code>n/2</code>.
*
* Given an arbitrary node at index <code>n</code>; if
* <code>2*n <= size</code> the node at <code>n</code> has its left child
* at index <code>2*n</code>, otherwise the node at <code>n</code> has
* no left child.
*
* Given an arbitrary node at index <code>n</code>; if
* <code>2*n+1 <= size</code> the node at <code>n</code> has its right child
* at index <code>2*n+1</code>, otherwise the node at <code>n</code> has
* no right child.
*
* The priority function is called T. Given a node <code>n</code>,
* <code>T(n)</code> denotes the absolute time (in milliseconds since
* the epoch) that the timeout for node <code>n</code> should happen.
* Smaller values of <code>T</code> means higher priority.
*
* The tree satisfies the following invariant:
* <i>
* For any node <code>n</code> in the tree:
* If node <code>n</code> has a left child <code>l</code>,
* <code>T(n) <= T(l)</code>.
* If node <code>n</code> has a right child <code>r</code>,
* <code>T(n) <= T(r)</code>.
* </i>
*
*
* The invariant may be temporarily broken while executing synchronized
* on <code>this</code> instance, but is always reestablished before
* leaving the synchronized code.
*
* The node at index <code>1</code> is always the first node to timeout,
* as can be deduced from the invariant.
*
* For the following algorithm pseudocode, the operation
* <code>swap(n,m)</code> denotes the exchange of the nodes at indices
* <code>n</code> and <code>m</code> in the tree.
*
* Insertion of a new node happend as follows:
* <pre>
* IF size = q.length THEN
* "expand q array to be larger";
* ENDIF
* size <- size + 1;
* q[size] <- "new node";
* n <- size;
* WHILE n > 1 AND T(n/2) > T(n) DO
* swap(n/2, n);
* n <- n/2;
* ENDWHILE
* </pre>
* Proof that this insertion algorithm respects the invariant is left to
* the interested reader.
*
* The removal algorithm is a bit more complicated. To remove the node
* at index <code>n</code>:
* <pre>
* swap(n, size);
* size <- size - 1;
* IF n > 1 AND T(n/2) > T(n) THEN
* WHILE n > 1 AND T(n/2) > T(n) DO
* swap(n/2, n);
* n <- n/2;
* ENDWHILE
* ELSE
* WHILE 2*n <= size DO
* IF 2*n+1 <= size THEN
* // Both children present
* IF T(2*n) <= T(2*n+1) THEN
* IF T(n) <= T(2*n) THEN
* EXIT;
* ENDIF
* swap(n, 2*n);
* n <- 2*n;
* ELSE
* IF T(n) <= T(2*n+1) THEN
* EXIT;
* ENDIF
* swap(n, 2*n+1);
* n <- 2*n+1;
* ENDIF
* ELSE
* // Only left child, right child not present.
* IF T(n) <= T(2*n) THEN
* EXIT;
* ENDIF
* swap(n, 2*n);
* n <- 2*n;
* ENDIF
* ENDWHILE
* ENDIF
* </pre>
* Proof that this removal algorithm respects the invariant is left to
* the interested reader. Really, I am not going to prove it here.
*
* If you are interested, you can find this data structure and its
* associated operations in most textbooks on algorithmics.
*
* @see checkTree
*/
private TimeoutImpl[] q;
/**
* Debugging helper.
*/
private void assertExpr(boolean expr)
{
if (!expr)
{
RuntimeException ex = new RuntimeException("***** assert failed *****");
try
{
Thread.sleep(30000);
}
catch (Exception e)
{
}
}
}
/**
* Check invariants of the queue.
*/
private void checkTree()
{
assertExpr(size >= 0);
assertExpr(size < q.length);
assertExpr(q[0] == null);
if (size > 0)
{
assertExpr(q[1] != null);
assertExpr(q[1].index == 1);
for (int i = 2; i <= size; ++i)
{
assertExpr(q[i] != null);
assertExpr(q[i].index == i);
assertExpr(q[i >> 1].time <= q[i].time); // parent fires first
}
for (int i = size + 1; i < q.length; ++i)
assertExpr(q[i] == null);
}
}
/**
* Check invariants of the free list.
*/
private void checkFreeList()
{
TimeoutImpl to = freeList;
while (to != null)
{
assertExpr(to.index == TimeoutImpl.DONE);
to = to.nextFree;
}
}
/**
* Swap two nodes in the tree.
*/
private void swap(int a, int b)
{
// INV: assertExpr(a > 0);
// INV: assertExpr(a <= size);
// INV: assertExpr(b > 0);
// INV: assertExpr(b <= size);
// INV: assertExpr(q[a] != null);
// INV: assertExpr(q[b] != null);
// INV: assertExpr(q[a].index == a);
// INV: assertExpr(q[b].index == b);
TimeoutImpl temp = q[a];
q[a] = q[b];
q[a].index = a;
q[b] = temp;
q[b].index = b;
}
/**
* A new node has been added at index <code>index</code>.
* Normalize the tree by moving the new node up the tree.
*
* @return True iff the tree was modified.
*/
private boolean normalizeUp(int index)
{
// INV: assertExpr(index > 0);
// INV: assertExpr(index <= size);
// INV: assertExpr(q[index] != null);
if (index == 1)
return false; // at root
boolean ret = false;
long t = q[index].time;
int p = index >> 1;
while (q[p].time > t)
{
// INV: assertExpr(q[index].time == t);
swap(p, index);
ret = true;
if (p == 1)
break; // at root
index = p;
p >>= 1;
}
return ret;
}
/**
* Remove a node from the tree and normalize.
*
* @return The removed node.
*/
private TimeoutImpl removeNode(int index)
{
// INV: assertExpr(index > 0);
// INV: assertExpr(index <= size);
TimeoutImpl res = q[index];
// INV: assertExpr(res != null);
// INV: assertExpr(res.index == index);
if (index == size)
{
--size;
q[index] = null;
return res;
}
swap(index, size); // Exchange removed node with last leaf node
--size;
// INV: assertExpr(res.index == size + 1);
q[res.index] = null;
if (normalizeUp(index))
return res; // Node moved up, so it shouldn't move down
long t = q[index].time;
int c = index << 1;
while (c <= size)
{
// INV: assertExpr(q[index].time == t);
TimeoutImpl l = q[c];
// INV: assertExpr(l != null);
// INV: assertExpr(l.index == c);
if (c + 1 <= size)
{
// two children, swap with smallest
TimeoutImpl r = q[c + 1];
// INV: assertExpr(r != null);
// INV: assertExpr(r.index == c+1);
if (l.time <= r.time)
{
if (t <= l.time)
break; // done
swap(index, c);
index = c;
}
else
{
if (t <= r.time)
break; // done
swap(index, c + 1);
index = c + 1;
}
}
else
{ // one child
if (t <= l.time)
break; // done
swap(index, c);
index = c;
}
c = index << 1;
}
return res;
}
/**
* Create a new timeout.
*/
private synchronized Timeout newTimeout(long time, TimeoutTarget target)
{
// INV: checkTree();
// INV: assertExpr(size < q.length);
if (++size == q.length)
{
TimeoutImpl[] newQ = new TimeoutImpl[2 * q.length];
System.arraycopy(q, 0, newQ, 0, q.length);
q = newQ;
}
// INV: assertExpr(size < q.length);
// INV: assertExpr(q[size] == null);
TimeoutImpl timeout;
if (freeList != null)
{
timeout = q[size] = freeList;
freeList = timeout.nextFree;
// INV: checkFreeList();
// INV: assertExpr(timeout.index == TimeoutImpl.DONE);
}
else
timeout = q[size] = new TimeoutImpl();
timeout.index = size;
timeout.time = time;
timeout.target = target;
normalizeUp(size);
if (timeout.index == 1)
notify();
// INV: checkTree();
return timeout;
}
/**
* Cancel a timeout.
*/
private void dropTimeout(TimeoutImpl timeout)
{
synchronized (this)
{
if (timeout.index > 0)
{
// Active timeout, remove it.
// INV: assertExpr(q[timeout.index] == timeout);
// INV: checkTree();
removeNode(timeout.index);
// INV: checkTree();
timeout.index = TimeoutImpl.DONE;
timeout.nextFree = freeList;
freeList = timeout;
// INV: checkFreeList();
return;
}
}
}
/**
* Timeout worker method.
* This method never returns. Whenever it is time to do a timeout,
* the callback method is called from here.
*/
private void doWork()
{
while (true)
{
TimeoutImpl work = null;
// Look for work
synchronized (this)
{
if (size == 0)
{
try
{
wait();
}
catch (InterruptedException ex)
{
}
}
else
{
long now = System.currentTimeMillis();
if (q[1].time > now)
{
try
{
wait(q[1].time - now);
}
catch (InterruptedException ex)
{
}
}
if (size > 0 && q[1].time <= System.currentTimeMillis())
{
work = removeNode(1);
work.index = TimeoutImpl.TIMEOUT;
}
}
}
// Do work, if any
if (work != null)
{
// Create a new thread to do the callback.
TimeoutWorker worker = new TimeoutWorker(work);
threadPool.run(worker);
}
}
}
/** Our singleton instance. */
static private TimeoutFactory singleton;
/** Our private constructor. */
private TimeoutFactory()
{
freeList = null;
size = 0;
q = new TimeoutImpl[16];
}
/**
* Initialize class.
* The will initialize the singleton and create a single
* worker thread.
*/
static
{
singleton = new TimeoutFactory();
Thread thread = new Thread("TimeoutFactory")
{
public void run()
{
singleton.doWork();
}
};
thread.setDaemon(true);
thread.start();
}
/**
* Schedule a new timeout.
*/
static public Timeout createTimeout(long time, TimeoutTarget target)
{
if (time <= 0)
throw new IllegalArgumentException("Time not positive");
if (target == null)
throw new IllegalArgumentException("Null target");
return singleton.newTimeout(time, target);
}
}| TimeoutFactory.java |