/*
 * JBoss, the OpenSource J2EE webOS
 *
 * Distributable under LGPL license.
 * See terms of license at gnu.org.
 */
package org.jboss.mq.threadpool;

import java.lang.Thread;
import java.lang.ThreadGroup;

import java.util.ArrayList;
import java.util.LinkedList;


/**
 *  This is an implementation of a simple thread pool with
 *  an embedded work queue.
 *
 * @author    Ole Husgaard (osh@sparre.dk)
 * @version   $Revision: 1.1 $
 */
public class ThreadPool
{
   /**
    *  The name of this thread pool
    */
   private String name;

   /**
    *  Flags that worker threads should be created as daemon threads.
    */
   private boolean daemon;

   /**
    *  The ThreadGroup of threads in this pool.
    */
   private ThreadGroup threadGroup;

   /**
    *  The worker threads.
    */
   private ArrayList workers;

   /**
    *  Maximum number of worker threads.
    */
   private int maxWorkers;

   /**
    *  Count of idle worker threads.
    *  Synchronized on the [@link #workers} field.
    */
   private int idleWorkers;

   /**
    *  Flags that we are shutting down the pool.
    *  Synchronized on the [@link #workers} field.
    */
   private volatile boolean stopping;

   /**
    *  The work queue.
    */
   private LinkedList queue;


   /**
    *  Create a new thread pool instance.
    *
    *  @param Name The name of this thread pool. This is used for naming its
    *         worker threads.
    *  @param threadGroup The <code>ThreadGroup</code> that worker threads
    *         in this pool should belong to.
    *  @param maxWorkers The maximum number of worker threads in this pool.
    *  @param daemon If <code>true</code>, worker threads will be created as
    *         daemon threads.
    */
   public ThreadPool(String name, ThreadGroup threadGroup, int maxWorkers,
                     boolean daemon)
   {
      if (name == null || threadGroup == null || maxWorkers <= 0)
         throw new IllegalArgumentException();

      this.name = name;
      this.daemon = daemon;
      this.threadGroup = threadGroup;
      workers = new ArrayList();
      this.maxWorkers = maxWorkers;
      idleWorkers = 0;
      stopping = false;
      queue = new LinkedList();
   }

   /**
    *  Shutdown this thread pool.
    *  This will not return until all enqueued work has been cancelled,
    *  and all worker threads have done any work they started and have
    *  died.
    */
    public void shutdown()
    {
       stopping = true;

       synchronized (queue) {
          // Remove all queued work
          queue.clear();
          // Notify all waiting threads
          queue.notifyAll();
       }

       // wait for all worker threads to die.
       synchronized (workers) {
          while (workers.size() > 0) {
             try {
                // wait for some worker threads to die.
                workers.wait();
             } catch (InterruptedException ex) {
                // ignore
             }
          }
       }
    }


   /**
    *  Enqueue a piece of work for this thread to handle.
    *  As soon as a thread becomes available, it will call
    *  {@link Work#doWork} of the argument.
    *  If the pool is shutting down, this method will not enqueue the
    *  work, but instead simply return.
    *
    *  @param work The piece of work to be enqueued.
    */
   public void enqueueWork(Work work)
   {
//System.err.println("ThreadPool("+name+"): enqueueWork() entered.");
      // We may want to start a worker thread
      synchronized (workers) {
//System.err.println("ThreadPool("+name+"): enqueueWork(): idleWorkers="+idleWorkers+" stopping="+stopping+".");
//System.err.println("ThreadPool("+name+"): enqueueWork(): workers.size()="+workers.size()+" maxWorkers="+maxWorkers+".");
         if (idleWorkers == 0 && !stopping && workers.size() < maxWorkers)
{
            new WorkerThread(name + "-" + (workers.size() + 1)).start();
//System.err.println("ThreadPool("+name+"): started new WorkerThread.");
}
      }

      synchronized (queue) {
         if (stopping)
            return; // we are shutting down, cannot take new work.

         queue.addLast(work);
//System.err.println("ThreadPool("+name+"): enqueueWork(): enqueued work..");
         queue.notify();
      }
   }

   /**
    *  Cancel a piece of enqueued work.
    *
    *  @param work The piece of work to be cancel.
    */
   public void cancelWork(Work work)
   {
      synchronized (queue) {
         // It may be enqueued several times.
         while (queue.remove(work))
            ;
      }
   }


   /**
    *  The threads that do the actual work.
    */
   private class WorkerThread
      extends Thread
   {
      /**
       *  Create a new WorkerThread.
       *  This must be called when holding the workers monitor.
       */
      WorkerThread(String name)
      {
         super(threadGroup, name);
         setDaemon(daemon);
         workers.add(this);
//System.err.println("ThreadPool("+name+"): " + getName() + " created.");
      }

      /**
       *  Wait for work do to.
       *  This must be called when holding the queue monitor.
       *  This will temporarily increment the count of idle workers
       *  while waiting.
       */
      private void idle()
      {
         try {
            synchronized (workers) {
              ++idleWorkers;
            }
            //System.err.println("ThreadPool("+name+"): " + getName() + " starting to wait.");
            queue.wait();
         } catch (InterruptedException ex) {
            // ignore
         } finally {
            //System.err.println("ThreadPool("+name+"): " + getName() + " done waiting.");
            synchronized (workers) {
              --idleWorkers;
            }
         }
      }

      public void run()
      {
//System.err.println("ThreadPool("+name+"): " + getName() + " started to run.");
         while (!stopping) {
            Work work = null;

            synchronized (queue) {
               if (queue.size() == 0)
                  idle();
               if (!stopping && queue.size() > 0)
                  work = (Work)queue.removeFirst();
            }

            if (work != null)
               work.doWork();
         }
         synchronized (workers) {
            workers.remove(this);
            // Notify the shutdown thread.
            workers.notify();
         }
      }
   }
}