| ThreadPool.java |
/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.mq.threadpool;
import java.lang.Thread;
import java.lang.ThreadGroup;
import java.util.ArrayList;
import java.util.LinkedList;
/**
* This is an implementation of a simple thread pool with
* an embedded work queue.
*
* @author Ole Husgaard (osh@sparre.dk)
* @version $Revision: 1.1 $
*/
public class ThreadPool
{
/**
* The name of this thread pool
*/
private String name;
/**
* Flags that worker threads should be created as daemon threads.
*/
private boolean daemon;
/**
* The ThreadGroup of threads in this pool.
*/
private ThreadGroup threadGroup;
/**
* The worker threads.
*/
private ArrayList workers;
/**
* Maximum number of worker threads.
*/
private int maxWorkers;
/**
* Count of idle worker threads.
* Synchronized on the [@link #workers} field.
*/
private int idleWorkers;
/**
* Flags that we are shutting down the pool.
* Synchronized on the [@link #workers} field.
*/
private volatile boolean stopping;
/**
* The work queue.
*/
private LinkedList queue;
/**
* Create a new thread pool instance.
*
* @param Name The name of this thread pool. This is used for naming its
* worker threads.
* @param threadGroup The <code>ThreadGroup</code> that worker threads
* in this pool should belong to.
* @param maxWorkers The maximum number of worker threads in this pool.
* @param daemon If <code>true</code>, worker threads will be created as
* daemon threads.
*/
public ThreadPool(String name, ThreadGroup threadGroup, int maxWorkers,
boolean daemon)
{
if (name == null || threadGroup == null || maxWorkers <= 0)
throw new IllegalArgumentException();
this.name = name;
this.daemon = daemon;
this.threadGroup = threadGroup;
workers = new ArrayList();
this.maxWorkers = maxWorkers;
idleWorkers = 0;
stopping = false;
queue = new LinkedList();
}
/**
* Shutdown this thread pool.
* This will not return until all enqueued work has been cancelled,
* and all worker threads have done any work they started and have
* died.
*/
public void shutdown()
{
stopping = true;
synchronized (queue) {
// Remove all queued work
queue.clear();
// Notify all waiting threads
queue.notifyAll();
}
// wait for all worker threads to die.
synchronized (workers) {
while (workers.size() > 0) {
try {
// wait for some worker threads to die.
workers.wait();
} catch (InterruptedException ex) {
// ignore
}
}
}
}
/**
* Enqueue a piece of work for this thread to handle.
* As soon as a thread becomes available, it will call
* {@link Work#doWork} of the argument.
* If the pool is shutting down, this method will not enqueue the
* work, but instead simply return.
*
* @param work The piece of work to be enqueued.
*/
public void enqueueWork(Work work)
{
//System.err.println("ThreadPool("+name+"): enqueueWork() entered.");
// We may want to start a worker thread
synchronized (workers) {
//System.err.println("ThreadPool("+name+"): enqueueWork(): idleWorkers="+idleWorkers+" stopping="+stopping+".");
//System.err.println("ThreadPool("+name+"): enqueueWork(): workers.size()="+workers.size()+" maxWorkers="+maxWorkers+".");
if (idleWorkers == 0 && !stopping && workers.size() < maxWorkers)
{
new WorkerThread(name + "-" + (workers.size() + 1)).start();
//System.err.println("ThreadPool("+name+"): started new WorkerThread.");
}
}
synchronized (queue) {
if (stopping)
return; // we are shutting down, cannot take new work.
queue.addLast(work);
//System.err.println("ThreadPool("+name+"): enqueueWork(): enqueued work..");
queue.notify();
}
}
/**
* Cancel a piece of enqueued work.
*
* @param work The piece of work to be cancel.
*/
public void cancelWork(Work work)
{
synchronized (queue) {
// It may be enqueued several times.
while (queue.remove(work))
;
}
}
/**
* The threads that do the actual work.
*/
private class WorkerThread
extends Thread
{
/**
* Create a new WorkerThread.
* This must be called when holding the workers monitor.
*/
WorkerThread(String name)
{
super(threadGroup, name);
setDaemon(daemon);
workers.add(this);
//System.err.println("ThreadPool("+name+"): " + getName() + " created.");
}
/**
* Wait for work do to.
* This must be called when holding the queue monitor.
* This will temporarily increment the count of idle workers
* while waiting.
*/
private void idle()
{
try {
synchronized (workers) {
++idleWorkers;
}
//System.err.println("ThreadPool("+name+"): " + getName() + " starting to wait.");
queue.wait();
} catch (InterruptedException ex) {
// ignore
} finally {
//System.err.println("ThreadPool("+name+"): " + getName() + " done waiting.");
synchronized (workers) {
--idleWorkers;
}
}
}
public void run()
{
//System.err.println("ThreadPool("+name+"): " + getName() + " started to run.");
while (!stopping) {
Work work = null;
synchronized (queue) {
if (queue.size() == 0)
idle();
if (!stopping && queue.size() > 0)
work = (Work)queue.removeFirst();
}
if (work != null)
work.doWork();
}
synchronized (workers) {
workers.remove(this);
// Notify the shutdown thread.
workers.notify();
}
}
}
}
| ThreadPool.java |