/*
 * JBoss, the OpenSource J2EE webOS
 *
 * Distributable under LGPL license.
 * See terms of license at gnu.org.
 */
package org.jboss.mq.il.oil2;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Iterator;

import org.jboss.logging.Logger;

import EDU.oswego.cs.dl.util.concurrent.Channel;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.Slot;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;

/**
 * The OIL2 implementation of the ServerIL object
 *
 * @author    <a href="mailto:hiram.chirino@jboss.org">Hiram Chirino</a>
 * @version   $Revision: 1$
 */
public final class OIL2SocketHandler implements java.lang.Cloneable, Runnable
{
   final static private Logger log = Logger.getLogger(OIL2SocketHandler.class);

   /**
    * Messages will be read from the input stream
    */
   private ObjectInputStream in;

   /**
    * Messages will be writen to the output stream
    */
   private ObjectOutputStream out;

   /** 
    * Should we be receiving messages??
    */
   private boolean running;

   /** 
    * The thread group that the reader thread should join.
    */
   private final ThreadGroup partentThreadGroup;

   /** 
    * Reader thread.
    */
   private Thread worker;

   /**
    * Number of OIL2 Worker threads started.
    */
   private static int threadNumber = 0;

   /**
    * Requst create slots to wait for responses,
    * those slots are stored in this hashmap.
    * 
    * This field uses copy on write semantics.
    */
   volatile ConcurrentHashMap responseSlots = new ConcurrentHashMap();

   /**
    * The request listner is notified of new requests
    * and of asyncronous IO errors.
    */
   OIL2RequestListner requestListner;

   /**
    * If the socket handler is currently pumping messages.
    */
   private volatile boolean pumpingData = false;

   /**
    * Pump mutex
    */
   private Object pumpMutex = new Object();

   /**
    * The that new request get placed into when they arrived.
    */
   LinkedQueue requestQueue = new LinkedQueue();

   /**
    * The thread pool used to service incoming requests..
    */
   PooledExecutor pool;

   /**
    * Constructor for the OILServerIL object
    *
    * @param a     Description of Parameter
    * @param port  Description of Parameter
    */
   public OIL2SocketHandler(ObjectInputStream in, ObjectOutputStream out, ThreadGroup partentThreadGroup)
   {
      this.in = in;
      this.out = out;
      this.partentThreadGroup = partentThreadGroup;

      synchronized (OIL2SocketHandler.class)
      {
         if (pool == null)
         {
            pool = new PooledExecutor(50);
            // supply a ThreadFactory to the pool to create daemon threads
            log.debug("Setting the OIL2SocketHandler's thread factory");
            pool.setThreadFactory(
               new ThreadFactory()
               {
                  private int threadNo = 0;
                  public Thread newThread(Runnable r)
                  {
                     Thread t = new Thread(OIL2SocketHandler.this.partentThreadGroup, r, "OIL2SocketHandler Thread-" + threadNo++);
                     t.setDaemon(true);
                     return t;
                  }
               }      
            );
            pool.setMinimumPoolSize(1);
            pool.setKeepAliveTime(1000 * 60);
            pool.runWhenBlocked();
            pool.createThreads(1);
         }
      }
   }

   /**
    * #Description of the Method
    *
    * @return               Description of the Returned Value
    * @exception Exception  Description of Exception
    */
   public void sendRequest(OIL2Request request) throws IOException
   {
//      if (log.isTraceEnabled())
//         log.trace("Sending request: " + request);

      try
      {
         synchronized (out)
         {
            out.writeByte(1);
            request.writeExternal(out);
            out.reset();
            out.flush();
         }
      }
      catch (IOException e)
      {
         throw e;
      }

   }

   /**
    * #Description of the Method
    */
   private void registerResponseSlot(OIL2Request request, Slot responseSlot) throws IOException
   {
      responseSlots.put(request.requestId, responseSlot);
   }

   /**
    * #Description of the Method
    */
   public void setRequestListner(OIL2RequestListner requestListner)
   {
      this.requestListner = requestListner;
   }

   /**
    * #Description of the Method
    *
    * @return               Description of the Returned Value
    * @exception Exception  Description of Exception
    */
   public void sendResponse(OIL2Response response) throws IOException
   {
//      if (log.isTraceEnabled())
//         log.trace("Sending response: " + response);

      try
      {
         synchronized (out)
         {
            out.writeByte(2);
            response.writeExternal(out);
            out.reset();
            out.flush();
         }
      }
      catch (IOException e)
      {
         throw e;
      }
   }

   /**
    *  Pumps messages from the input stream.
    *  
    *  If the request object is not null, then the target message is 
    *  the response object for the request argument.  The target
    *  message is returned.
    * 
    *  If the request object is null, then the target message is 
    *  the first new request that is encountered.  The new request 
    *  messag is returned.
    * 
    *  All message received before the target message are pumped.
    *  A pumped message is placed in either Response Slots or
    *  the Request Queue depending on if the message is a response
    *  or requests.
    * 
    * @param request The request object that is waiting for a response.
    * @return the request or reponse object that this method was looking for
    * @exception  IOException  Description of Exception
    */
   private Object pumpMessages(OIL2Request request, Channel mySlot)
      throws IOException, ClassNotFoundException, InterruptedException
   {

      synchronized (pumpMutex)
      {
         // Is somebody else pumping data??
         if (pumpingData)
         {
            return null;
         }
         else
            pumpingData = true;
      }

      try
      {
         while (true)
         {
            if (mySlot != null)
            {
               // Do we have our response sitting in our slot allready??
               Object o;
               while ((o = mySlot.peek()) != null)
               {
                  o = mySlot.take();
                  if (o != this)
                  {
                     return o;
                  }
               }
            }

            byte code = in.readByte();
            switch (code)
            {
               // Request received... pass it up
               case 1 :
                  OIL2Request newRequest = new OIL2Request();
                  newRequest.readExternal(in);

                  // Are we looking for a request??
                  if (request == null)
                  {
                     return newRequest;
                  }
                  else
                  {
                     requestQueue.put(newRequest);
                  }

                  break;

                  // Response received... find the response slot
               case 2 :

                  OIL2Response response = new OIL2Response();
                  response.readExternal(in);

                  // No reponse id to response to..
                  if (response.correlationRequestId == null)
                     continue;

                  // Is this the response object we are looking for
                  if (request != null && request.requestId.equals(response.correlationRequestId))
                  {
                     return response;
                  }
                  else
                  {

                     Slot slot = (Slot) responseSlots.remove(response.correlationRequestId);

                     if (slot != null)
                     {
                        slot.put(response);
                     }
                     else
                     {
                        // This should not happen...
                        if (log.isTraceEnabled())
                           log.warn("No slot registered for: " + response);
                     }
                  }
                  break;
            } // switch
         } // while         
      }
      finally
      {
         synchronized (pumpMutex)
         {
            pumpingData = false;
         }

         Thread thread = Thread.currentThread();
         boolean interrupted = thread.isInterrupted();

         // We are done, let somebody know that they can 
         // start pumping us again.
         Iterator i = responseSlots.values().iterator();
         while (i.hasNext())
         {
            Slot s = (Slot) i.next();
            if (s != mySlot)
               s.offer(this, 0);
         }

         // Only notify the request waiter if we are not
         // giving him a message on this method call.
         if (request != null)
         {
            requestQueue.put(this);
         }

         if (interrupted)
            thread.interrupt();
      }
   }

   public OIL2Response synchRequest(OIL2Request request)
      throws IOException, InterruptedException, ClassNotFoundException
   {

      //      if (log.isTraceEnabled())
      //         log.trace("Sending request: "+request);

      Slot slot = new Slot();
      registerResponseSlot(request, slot);
      sendRequest(request);

      Object o = null;
      while (true)
      {
         // Do we have something in our queue??
         if (o != null)
         {
            // was is a request message??
            if (o != this)
            {
               //               if (log.isTraceEnabled())
               //                  log.trace("Got response: "+o);
               return (OIL2Response) o;
            }
            // See if we have another message in the queue.
            o = slot.peek();
            if (o != null)
               o = slot.take();
         }
         else
         {
            // We did not have any messages in the slot,
            // so we have to go pumping..
            o = pumpMessages(request, slot);
            if (o == null)
            {
               // Somebody else is in the pump, wait till we 
               // are notified to get in.
               o = slot.take();
            }
         }
      } // end while
   }

   public class RequestRunner implements Runnable
   {
      OIL2Request request;
      RequestRunner(OIL2Request request)
      {
         this.request = request;
      }
      public void run()
      {
         requestListner.handleRequest(request);
      }
   }

   /**
    * Main processing method for the OILClientILService object
    */
   public void run()
   {
      try
      {

         Object o = null;
         while (running)
         {
            // Do we have something in our queue??
            if (o != null)
            {
               // was is a request message??
               if (o != this)
               {
                  pool.execute(new RequestRunner((OIL2Request) o));
               }
               // See if we have another message in the queue.
               o = requestQueue.peek();
               if (o != null)
                  o = requestQueue.take();
            }
            else
            {
               // We did not have any messages in the queue,
               // so we have to go pumping..
               o = pumpMessages(null, requestQueue);
               if (o == null)
               {
                  // Somebody else is in the pump, wait till we 
                  // are notified to get in.
                  o = requestQueue.take();
               }
            }
         } // end while

      }
      catch (InterruptedException e)
      {
         if (log.isTraceEnabled())
            log.trace("Stopped due to interruption");
      }
      catch (Exception e)
      {
         if (log.isTraceEnabled())
            log.trace("Stopping due to unexcpected exception: ", e);
         requestListner.handleConnectionException(e);
      }

      // ensure the flag is set correctly
      running = false;
      if (log.isTraceEnabled())
         log.trace("Stopped");
   }

   public void start() //throws java.lang.Exception
   {
      if (log.isTraceEnabled())
         log.trace("Starting");

      running = true;
      worker = new Thread(partentThreadGroup, this, "OIL2 Worker-" + threadNumber++);
      worker.setDaemon(true);
      worker.start();

   }

   public void stop()
   {
      if (log.isTraceEnabled())
         log.trace("Stopping");
      running = false;
      worker.interrupt();
   }

}