package org.jboss.util.threadpool;
import java.util.Collections;
import java.util.Map;
import org.jboss.util.collection.WeakValueHashMap;
import org.jboss.logging.Logger;
import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
import EDU.oswego.cs.dl.util.concurrent.Heap;
public class BasicThreadPool implements ThreadPool, BasicThreadPoolMBean
{
private static final ThreadGroup JBOSS_THREAD_GROUP = new ThreadGroup("JBoss Pooled Threads");
private static final Map threadGroups = Collections.synchronizedMap(new WeakValueHashMap());
private static final SynchronizedInt lastPoolNumber = new SynchronizedInt(0);
private static Logger log = Logger.getLogger(BasicThreadPool.class);
private String name;
private int poolNumber;
private BlockingMode blockingMode = BlockingMode.ABORT;
private MinPooledExecutor executor;
private BoundedLinkedQueue queue;
private ThreadGroup threadGroup = JBOSS_THREAD_GROUP;
private SynchronizedInt lastThreadNumber = new SynchronizedInt(0);
private SynchronizedBoolean stopped = new SynchronizedBoolean(false);
private Heap tasksWithTimeouts = new Heap(13);
private TimeoutMonitor timeoutTask;
private boolean trace;
public BasicThreadPool()
{
this("ThreadPool");
}
public BasicThreadPool(String name)
{
trace = log.isTraceEnabled();
ThreadFactory factory = new ThreadPoolThreadFactory();
queue = new BoundedLinkedQueue(1024);
executor = new MinPooledExecutor(queue, 100);
executor.setMinimumPoolSize(100);
executor.setKeepAliveTime(60 * 1000);
executor.setThreadFactory(factory);
executor.abortWhenBlocked();
poolNumber = lastPoolNumber.increment();
setName(name);
}
public void stop(boolean immediate)
{
log.debug("stop, immediate="+immediate);
stopped.set(true);
if (immediate)
executor.shutdownNow();
else
executor.shutdownAfterProcessingCurrentlyQueuedTasks();
}
public void waitForTasks() throws InterruptedException
{
executor.awaitTerminationAfterShutdown();
}
public void waitForTasks(long maxWaitTime) throws InterruptedException
{
executor.awaitTerminationAfterShutdown(maxWaitTime);
}
public void runTaskWrapper(TaskWrapper wrapper)
{
if( trace )
log.trace("runTaskWrapper, wrapper="+wrapper);
if (stopped.get())
{
wrapper.rejectTask(new ThreadPoolStoppedException("Thread pool has been stopped"));
return;
}
wrapper.acceptTask();
long completionTimeout = wrapper.getTaskCompletionTimeout();
TimeoutInfo info = null;
if( completionTimeout > 0 )
{
checkTimeoutMonitor();
info = new TimeoutInfo(wrapper, completionTimeout);
tasksWithTimeouts.insert(info);
}
int waitType = wrapper.getTaskWaitType();
switch (waitType)
{
case Task.WAIT_FOR_COMPLETE:
{
executeOnThread(wrapper);
break;
}
default:
{
execute(wrapper);
}
}
waitForTask(wrapper);
}
public void runTask(Task task)
{
BasicTaskWrapper wrapper = new BasicTaskWrapper(task);
runTaskWrapper(wrapper);
}
public void run(Runnable runnable)
{
run(runnable, 0, 0);
}
public void run(Runnable runnable, long startTimeout, long completeTimeout)
{
RunnableTaskWrapper wrapper = new RunnableTaskWrapper(runnable, startTimeout, completeTimeout);
runTaskWrapper(wrapper);
}
public String getName()
{
return name;
}
public void setName(String name)
{
this.name = name;
}
public int getPoolNumber()
{
return poolNumber;
}
public String getThreadGroupName()
{
return threadGroup.getName();
}
public void setThreadGroupName(String threadGroupName)
{
ThreadGroup group;
synchronized(threadGroups)
{
group = (ThreadGroup) threadGroups.get(threadGroupName);
if (group == null)
{
group = new ThreadGroup(JBOSS_THREAD_GROUP, threadGroupName);
threadGroups.put(threadGroupName, group);
}
}
threadGroup = group;
}
public int getQueueSize()
{
return queue.size();
}
public int getMaximumQueueSize()
{
return queue.capacity();
}
public void setMaximumQueueSize(int size)
{
queue.setCapacity(size);
}
public int getPoolSize()
{
return executor.getPoolSize();
}
public int getMinimumPoolSize()
{
return executor.getMinimumPoolSize();
}
public void setMinimumPoolSize(int size)
{
synchronized (executor)
{
executor.setKeepAliveSize(size);
if (executor.getMaximumPoolSize() < size)
{
executor.setMinimumPoolSize(size);
executor.setMaximumPoolSize(size);
}
}
}
public int getMaximumPoolSize()
{
return executor.getMaximumPoolSize();
}
public void setMaximumPoolSize(int size)
{
synchronized (executor)
{
executor.setMinimumPoolSize(size);
executor.setMaximumPoolSize(size);
if (executor.getKeepAliveSize() > size)
executor.setKeepAliveSize(size);
}
}
public long getKeepAliveTime()
{
return executor.getKeepAliveTime();
}
public void setKeepAliveTime(long time)
{
executor.setKeepAliveTime(time);
}
public BlockingMode getBlockingMode()
{
return blockingMode;
}
public void setBlockingMode(BlockingMode mode)
{
blockingMode = mode;
if( blockingMode == BlockingMode.RUN )
{
executor.runWhenBlocked();
}
else if( blockingMode == BlockingMode.WAIT )
{
executor.waitWhenBlocked();
}
else if( blockingMode == BlockingMode.DISCARD )
{
executor.discardWhenBlocked();
}
else if( blockingMode == BlockingMode.DISCARD_OLDEST )
{
executor.discardOldestWhenBlocked();
}
else if( blockingMode == BlockingMode.ABORT )
{
executor.abortWhenBlocked();
}
else
{
throw new IllegalArgumentException("Failed to recognize mode: "+mode);
}
}
public void setBlockingMode(String name)
{
blockingMode = BlockingMode.toBlockingMode(name);
if( blockingMode == null )
blockingMode = BlockingMode.ABORT;
}
public ThreadPool getInstance()
{
return this;
}
public void stop()
{
stop(false);
}
public String toString()
{
return name + '(' + poolNumber + ')';
}
protected void executeOnThread(TaskWrapper wrapper)
{
if( trace )
log.trace("executeOnThread, wrapper="+wrapper);
wrapper.run();
}
protected void execute(TaskWrapper wrapper)
{
if( trace )
log.trace("execute, wrapper="+wrapper);
try
{
executor.execute(wrapper);
}
catch (Throwable t)
{
wrapper.rejectTask(new ThreadPoolFullException(t.toString()));
}
}
protected void waitForTask(TaskWrapper wrapper)
{
wrapper.waitForTask();
}
protected synchronized void checkTimeoutMonitor()
{
if( timeoutTask == null )
timeoutTask = new TimeoutMonitor(name, log);
}
protected TimeoutInfo getNextTimeout()
{
TimeoutInfo info = (TimeoutInfo) this.tasksWithTimeouts.extract();
return info;
}
private class ThreadPoolThreadFactory implements ThreadFactory
{
public Thread newThread(Runnable runnable)
{
String threadName = BasicThreadPool.this.toString() + "-" + lastThreadNumber.increment();
Thread thread = new Thread(threadGroup, runnable, threadName);
thread.setDaemon(true);
return thread;
}
}
private static class TimeoutInfo implements Comparable
{
long start;
long timeoutMS;
TaskWrapper wrapper;
boolean firstStop;
TimeoutInfo(TaskWrapper wrapper, long timeout)
{
this.start = System.currentTimeMillis();
this.timeoutMS = start + timeout;
this.wrapper = wrapper;
}
public void setTimeout(long timeout)
{
this.start = System.currentTimeMillis();
this.timeoutMS = start + timeout;
}
public int compareTo(Object o)
{
TimeoutInfo ti = (TimeoutInfo) o;
long to0 = timeoutMS;
long to1 = ti.timeoutMS;
int diff = (int) (to0 - to1);
return diff;
}
TaskWrapper getTaskWrapper()
{
return wrapper;
}
public long getTaskCompletionTimeout()
{
return wrapper.getTaskCompletionTimeout();
}
public long getTaskCompletionTimeout(long now)
{
return timeoutMS - now;
}
public boolean stopTask()
{
wrapper.stopTask();
boolean wasFirstStop = firstStop == false;
firstStop = true;
return wasFirstStop;
}
}
private class TimeoutMonitor implements Runnable
{
final Logger log;
TimeoutMonitor(String name, Logger log)
{
this.log = log;
Thread t = new Thread(this, name+" TimeoutMonitor");
t.setDaemon(true);
t.start();
}
public void run()
{
boolean isStopped = stopped.get();
while( isStopped == false )
{
boolean trace = log.isTraceEnabled();
try
{
TimeoutInfo info = getNextTimeout();
if( info != null )
{
long now = System.currentTimeMillis();
long timeToTimeout = info.getTaskCompletionTimeout(now);
if( timeToTimeout > 0 )
{
if( trace )
{
log.trace("Will check wrapper="+info.getTaskWrapper()
+" after "+timeToTimeout);
}
Thread.sleep(timeToTimeout);
}
TaskWrapper wrapper = info.getTaskWrapper();
if( wrapper.isComplete() == false )
{
if( trace )
log.trace("Failed completion check for wrapper="+wrapper);
if( info.stopTask() == true )
{
info.setTimeout(1000);
tasksWithTimeouts.insert(info);
if( trace )
log.trace("Rescheduled completion check for wrapper="+wrapper);
}
}
}
else
{
Thread.sleep(1000);
}
}
catch(InterruptedException e)
{
log.debug("Timeout monitor has been interrupted", e);
}
catch(Throwable e)
{
log.debug("Timeout monitor saw unexpected error", e);
}
isStopped = stopped.get();
}
}
}
}