 * JBoss, the OpenSource J2EE webOS
 * Distributable under LGPL license.
 * See terms of license at gnu.org.
package org.jboss.util.threadpool;

import java.util.Collections;
import java.util.Map;

import org.jboss.util.collection.WeakValueHashMap;
import org.jboss.logging.Logger;

import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
import EDU.oswego.cs.dl.util.concurrent.Heap;

 * A basic thread pool.
 * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
 * @author Scott.Stark@jboss.org
 * @version $Revision: $
public class BasicThreadPool implements ThreadPool, BasicThreadPoolMBean
   // Constants -----------------------------------------------------

   /** The jboss thread group */
   private static final ThreadGroup JBOSS_THREAD_GROUP = new ThreadGroup("JBoss Pooled Threads");

   /** The thread groups */
   private static final Map threadGroups = Collections.synchronizedMap(new WeakValueHashMap());

   /** The internal pool number */
   private static final SynchronizedInt lastPoolNumber = new SynchronizedInt(0);

   private static Logger log = Logger.getLogger(BasicThreadPool.class);

   // Attributes ----------------------------------------------------

   /** The thread pool name */
   private String name;

   /** The internal pool number */
   private int poolNumber;

   /** The blocking mode */
   private BlockingMode blockingMode = BlockingMode.ABORT;

   /** The pooled executor */
   private MinPooledExecutor executor;

   /** The queue */
   private BoundedLinkedQueue queue;

   /** The thread group */
   private ThreadGroup threadGroup = JBOSS_THREAD_GROUP;

   /** The last thread number */
   private SynchronizedInt lastThreadNumber = new SynchronizedInt(0);

   /** Has the pool been stopped? */
   private SynchronizedBoolean stopped = new SynchronizedBoolean(false);
   /** The Heap<TimeoutInfo> of tasks ordered by their completion timeout */
   private Heap tasksWithTimeouts = new Heap(13);
   /** The task completion timeout monitor runnable */
   private TimeoutMonitor timeoutTask;
   /** The trace level logging flag */
   private boolean trace;

   // Static --------------------------------------------------------

   // Constructors --------------------------------------------------

    * Create a new thread pool
   public BasicThreadPool()

    * Create a new thread pool with a default queue size of 1024, min/max pool
    * sizes of 100 and a keep alive of 60 seconds.
    * @param name the pool name
   public BasicThreadPool(String name)
      trace = log.isTraceEnabled();
      ThreadFactory factory = new ThreadPoolThreadFactory();

      queue = new BoundedLinkedQueue(1024);

      executor = new MinPooledExecutor(queue, 100);
      executor.setKeepAliveTime(60 * 1000);

      poolNumber = lastPoolNumber.increment();

   // Public --------------------------------------------------------

   // ThreadPool ----------------------------------------------------

   public void stop(boolean immediate)
      log.debug("stop, immediate="+immediate);
      if (immediate)

   public void waitForTasks() throws InterruptedException
   public void waitForTasks(long maxWaitTime) throws InterruptedException

   public void runTaskWrapper(TaskWrapper wrapper)
      if( trace )
         log.trace("runTaskWrapper, wrapper="+wrapper);
      if (stopped.get())
         wrapper.rejectTask(new ThreadPoolStoppedException("Thread pool has been stopped"));


      long completionTimeout = wrapper.getTaskCompletionTimeout();
      TimeoutInfo info = null;
      if( completionTimeout > 0 )
         // Install the task in the
         info = new TimeoutInfo(wrapper, completionTimeout);
      int waitType = wrapper.getTaskWaitType();
      switch (waitType)
         case Task.WAIT_FOR_COMPLETE:

   public void runTask(Task task)
      BasicTaskWrapper wrapper = new BasicTaskWrapper(task);

   public void run(Runnable runnable)
      run(runnable, 0, 0);
   public void run(Runnable runnable, long startTimeout, long completeTimeout)
      RunnableTaskWrapper wrapper = new RunnableTaskWrapper(runnable, startTimeout, completeTimeout);

   // ThreadPoolMBean implementation --------------------------------

   public String getName()
      return name;

   public void setName(String name)
      this.name = name;

   public int getPoolNumber()
      return poolNumber;

   public String getThreadGroupName()
      return threadGroup.getName();

   public void setThreadGroupName(String threadGroupName)
      ThreadGroup group;
         group = (ThreadGroup) threadGroups.get(threadGroupName);
         if (group == null)
            group = new ThreadGroup(JBOSS_THREAD_GROUP, threadGroupName);
            threadGroups.put(threadGroupName, group);
      threadGroup = group;

   public int getQueueSize()
      return queue.size();

   public int getMaximumQueueSize()
      return queue.capacity();

   public void setMaximumQueueSize(int size)

   public int getPoolSize()
      return executor.getPoolSize();

   public int getMinimumPoolSize()
      return executor.getMinimumPoolSize();

   public void setMinimumPoolSize(int size)
      synchronized (executor)
         // Don't let the min size > max size
         if (executor.getMaximumPoolSize() < size)

   public int getMaximumPoolSize()
      return executor.getMaximumPoolSize();
   public void setMaximumPoolSize(int size)
      synchronized (executor)
         // Don't let the min size > max size
         if (executor.getKeepAliveSize() > size)

   public long getKeepAliveTime()
      return executor.getKeepAliveTime();

   public void setKeepAliveTime(long time)

   public BlockingMode getBlockingMode()
      return blockingMode;

   public void setBlockingMode(BlockingMode mode)
      blockingMode = mode;
      if( blockingMode == BlockingMode.RUN )
      else if( blockingMode == BlockingMode.WAIT )
      else if( blockingMode == BlockingMode.DISCARD )
      else if( blockingMode == BlockingMode.DISCARD_OLDEST )
      else if( blockingMode == BlockingMode.ABORT )
         throw new IllegalArgumentException("Failed to recognize mode: "+mode);

    * For backward compatibility with the previous string based mode
    * @param name - the string form of the mode enum
   public void setBlockingMode(String name)
      blockingMode = BlockingMode.toBlockingMode(name);
      if( blockingMode == null )
         blockingMode = BlockingMode.ABORT;

   public ThreadPool getInstance()
      return this;

   public void stop()

   // Object overrides ----------------------------------------------

   public String toString()
      return name + '(' + poolNumber + ')';

   // Package protected ---------------------------------------------

   // Protected -----------------------------------------------------

    * Execute a task on the same thread
    * @param wrapper the task wrapper
   protected void executeOnThread(TaskWrapper wrapper)
      if( trace )
         log.trace("executeOnThread, wrapper="+wrapper);

    * Execute a task
    * @param wrapper the task wrapper
   protected void execute(TaskWrapper wrapper)
      if( trace )
         log.trace("execute, wrapper="+wrapper);
      catch (Throwable t)
         wrapper.rejectTask(new ThreadPoolFullException(t.toString()));

    * Wait for a task
    * @param wrapper the task wrapper
   protected void waitForTask(TaskWrapper wrapper)

    * Used to lazily create the task completion timeout thread and monitor
   protected synchronized void checkTimeoutMonitor()
      if( timeoutTask == null )
         timeoutTask = new TimeoutMonitor(name, log);      
   protected TimeoutInfo getNextTimeout()
      TimeoutInfo info = (TimeoutInfo) this.tasksWithTimeouts.extract();
      return info;

   // Private -------------------------------------------------------

   // Inner classes -------------------------------------------------

    * A factory for threads
   private class ThreadPoolThreadFactory implements ThreadFactory
      public Thread newThread(Runnable runnable)
         String threadName = BasicThreadPool.this.toString() + "-" + lastThreadNumber.increment();
         Thread thread = new Thread(threadGroup, runnable, threadName);
         return thread;

   /** An encapsulation of a task and its completion timeout
   private static class TimeoutInfo implements Comparable
      long start;
      long timeoutMS;
      TaskWrapper wrapper;
      boolean firstStop;
      TimeoutInfo(TaskWrapper wrapper, long timeout)
         this.start = System.currentTimeMillis();
         this.timeoutMS = start + timeout;
         this.wrapper = wrapper;
      public void setTimeout(long timeout)
         this.start = System.currentTimeMillis();
         this.timeoutMS = start + timeout;         
      /** Order TimeoutInfo based on the timestamp at which the task needs to
       * be completed by.
       * @param o a TimeoutInfo
       * @return the diff between this timeoutMS and the argument timeoutMS
      public int compareTo(Object o)
         TimeoutInfo ti = (TimeoutInfo) o;
         long to0 = timeoutMS;
         long to1 = ti.timeoutMS;
         int diff = (int) (to0 - to1);
         return diff;
      TaskWrapper getTaskWrapper()
         return wrapper;
      public long getTaskCompletionTimeout()
         return wrapper.getTaskCompletionTimeout();
      /** Get the time remaining to the complete timeout timestamp in MS.
       * @param now - the current System.currentTimeMillis value
       * @return the time remaining to the complete timeout timestamp in MS.
      public long getTaskCompletionTimeout(long now)
         return timeoutMS - now;
      /** Invoke stopTask on the wrapper and indicate whether this was the first
       * time the task has been notified to stop.
       * @return true if this is the first stopTask, false on the second.
      public boolean stopTask()
         boolean wasFirstStop = firstStop == false;
         firstStop = true;
         return wasFirstStop;
    * The monitor runnable which validates that threads are completing within
    * the task completion timeout limits.
   private class TimeoutMonitor implements Runnable
      final Logger log;
      TimeoutMonitor(String name, Logger log)
         this.log = log;
         Thread t = new Thread(this, name+" TimeoutMonitor");
      /** The monitor thread loops until the pool is shutdown. It waits for
       * tasks with completion timeouts and sleeps until the next completion
       * timeout and then interrupts the associated task thread, and invokes
       * stopTask on the TaskWrapper. A new timeout check is then inserted with
       * a 1 second timeout to validate that the TaskWrapper has exited the
       * run method. If it has not, then the associated task thread is stopped
       * using the deprecated Thread.stop method since this is the only way to
       * abort a thread that is in spin loop for example.
       * @todo this is not responsive to new tasks with timeouts smaller than
       * the current shortest completion expiration. We probably should interrupt
       * the thread on each insertion into the timeout heap to ensure better
       * responsiveness.
      public void run()
         boolean isStopped = stopped.get();
         while( isStopped == false )
            boolean trace = log.isTraceEnabled();
               TimeoutInfo info = getNextTimeout();
               if( info != null )
                  long now = System.currentTimeMillis();
                  long timeToTimeout = info.getTaskCompletionTimeout(now);
                  if( timeToTimeout > 0 )
                     if( trace )
                        log.trace("Will check wrapper="+info.getTaskWrapper()
                           +" after "+timeToTimeout);
                  // Check the status of the task
                  TaskWrapper wrapper = info.getTaskWrapper();
                  if( wrapper.isComplete() == false )
                     if( trace )
                        log.trace("Failed completion check for wrapper="+wrapper);
                     if( info.stopTask() == true )
                        // Requeue the TimeoutInfo to see that the task exits run
                        if( trace )
                           log.trace("Rescheduled completion check for wrapper="+wrapper);
            catch(InterruptedException e)
               log.debug("Timeout monitor has been interrupted", e);
            catch(Throwable e)
               log.debug("Timeout monitor saw unexpected error", e);               
            isStopped = stopped.get();            