/***************************************
 *                                     *
 *  JBoss: The OpenSource J2EE WebOS   *
 *                                     *
 *  Distributable under LGPL license.  *
 *  See terms of license at gnu.org.   *
 *                                     *
 ***************************************/

package org.jboss.remoting.transport.socket;

import org.jboss.logging.Logger;
import org.jboss.remoting.marshal.MarshalFactory;
import org.jboss.remoting.marshal.Marshaller;
import org.jboss.remoting.marshal.UnMarshaller;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.LinkedList;

/**
 * This Thread object hold a single Socket connection to a client
 * and is kept alive until a timeout happens, or it is aged out of the
 * SocketServerInvoker's LRU cache.
 * <p/>
 * There is also a separate thread pool that is used if the client disconnects.
 * This thread/object is re-used in that scenario and that scenario only.
 * <p/>
 * This is a customization of the same ServerThread class used witht the PookedInvoker.
 * The custimization was made to allow for remoting marshaller/unmarshaller.
 *
 * @author <a href="mailto:bill@jboss.org">Bill Burke</a>
 * @author <a href="mailto:tom@jboss.org">Tom Elrod</a>
 * @version $Revision: 1.4.2.2 $
 */
public class ServerThread extends Thread
{
   final static private Logger log = Logger.getLogger(ServerThread.class);

   protected BufferedInputStream bis;
   protected BufferedOutputStream bos;
   protected ObjectInputStream in;
   protected ObjectOutputStream out;
   protected Socket socket;
   protected SocketServerInvoker invoker;
   protected LRUPool clientpool;
   protected LinkedList threadpool;
   protected volatile boolean running = true;
   protected volatile boolean handlingResponse = true; // start off as true so that nobody can interrupt us
   protected volatile boolean shutdown = false;
   protected static int id = 0;

   public static synchronized int nextID()
   {
      int nextID = id++;
      return nextID;
   }

   public ServerThread(Socket socket, SocketServerInvoker invoker, LRUPool clientpool,
                       LinkedList threadpool, int timeout) throws Exception
   {
      super("SocketServerInvokerThread-" + socket.getInetAddress().getHostAddress() + "-" + nextID());
      this.socket = socket;
      this.invoker = invoker;
      this.clientpool = clientpool;
      this.threadpool = threadpool;
      socket.setSoTimeout(timeout);
   }

   public void shutdown()
   {
      shutdown = true;
      running = false;
      // This is a race and there is a chance
      // that a invocation is going on at the time
      // of the interrupt.  But I see no way right
      // now to protect for this.

      // NOTE ALSO!:
      // Shutdown should never be synchronized.
      // We don't want to hold up accept() thread! (via LRUpool)
      if(!handlingResponse)
      {
         try
         {
            this.interrupt();
            Thread.interrupted(); // clear
         }
         catch(Exception ignored)
         {
         }
      }

   }

   public void evict()
   {
      running = false;
      // This is a race and there is a chance
      // that a invocation is going on at the time
      // of the interrupt.  But I see no way right
      // now to protect for this.
      // There may not be a problem because interrupt only effects
      // threads blocking on IO.


      // NOTE ALSO!:
      // Shutdown should never be synchronized.
      // We don't want to hold up accept() thread! (via LRUpool)
      if(!handlingResponse)
      {
         try
         {
            this.interrupt();
            Thread.interrupted(); // clear
         }
         catch(Exception ignored)
         {
         }
      }
   }


   public synchronized void wakeup(Socket socket, int timeout) throws Exception
   {
      this.socket = socket;
      String name = "SocketServerInvokerThread-" + socket.getInetAddress().getHostAddress() + "-" + nextID();
      super.setName(name);
      socket.setSoTimeout(timeout);
      running = true;
      handlingResponse = true;
      this.notify();
   }

   public void run()
   {
      try
      {
         while(true)
         {
            dorun();
            if(shutdown)
            {
               synchronized(clientpool)
               {
                  clientpool.remove(this);
               }
               return; // exit thread
            }
            else
            {
               synchronized(this)
               {
                  synchronized(clientpool)
                  {
                     synchronized(threadpool)
                     {
                        clientpool.remove(this);
                        threadpool.add(this);
                        Thread.interrupted(); // clear any interruption so that we can be pooled.
                        clientpool.notify();
                     }
                  }
                  log.debug("begin thread wait");
                  this.wait();
                  log.debug("WAKEUP in SERVER THREAD");
               }
            }
         }
      }
      catch(Exception ignored)
      {
         log.debug("Exiting run on exception", ignored);
      }
   }

   protected void acknowledge() throws Exception
   {

      // Perform acknowledgement to convince client
      // that the socket is still active
      byte ACK = 0;
      long startWait = System.currentTimeMillis();
      try
      {
         ACK = in.readByte();
      }
      catch(EOFException eof)
      {
         if(log.isTraceEnabled())
         {
            log.trace("socket timeout is set to : " + socket.getSoTimeout());
            log.trace("EOFException waiting on ACK in readByte(). Time waited was " + (System.currentTimeMillis() - startWait));
         }
         throw eof;
      }
      catch(IOException e)
      {
         log.debug("IOException when reading in ACK", e);
         throw e;
      }

      if(log.isDebugEnabled())
      {
         log.debug("***acknowledge read byte" + Thread.currentThread());
      }

      // HERE IS THE RACE between ACK received and handlingResponse = true
      // We can't synchronize because readByte blocks and client is expecting
      // a response and we don't want to hang client.
      // see shutdown and evict for more details
      // There may not be a problem because interrupt only effects
      // threads blocking on IO. and this thread will just continue.
      handlingResponse = true;

      out.writeByte(ACK);
      out.flush();


      out.reset();


      /*
      byte ACK = (byte) bis.read();
      bis.read();
//      bis.reset();
      handlingResponse = true;
      bos.write(ACK);
      bos.flush();
      */
   }

   protected void processInvocation() throws Exception
   {
      handlingResponse = true;
      // Ok, now read invocation and invoke


      //TODO: -TME Need better way to get the unmarshaller (via config)
      UnMarshaller unmarshaller = MarshalFactory.getUnMarshaller(invoker.getLocator(), this.getClass().getClassLoader());
      if(unmarshaller == null)
      {
         unmarshaller = MarshalFactory.getUnMarshaller(invoker.getDataType());
      }
      Object obj = unmarshaller.read(in, null);

      Object resp = null;
      try
      {
         // Make absolutely sure thread interrupted is cleared.
         boolean interrupted = Thread.interrupted();
         // call transport on the subclass, get the result to handback
         resp = invoker.invoke(obj);
         /*
         if(log.isDebugEnabled())
         {
            System.err.println("++ returning invocation response : " + resp);
         }
         */
      }
      catch(Exception ex)
      {
         resp = ex;
      }

      Thread.interrupted(); // clear interrupted state so we don't fail on socket writes

      Marshaller marshaller = MarshalFactory.getMarshaller(invoker.getLocator(), this.getClass().getClassLoader());

      if(marshaller == null)
      {
         marshaller = MarshalFactory.getMarshaller(invoker.getDataType());
      }
      marshaller.write(resp, out);

      handlingResponse = false;
   }

   /**
    * This is needed because Object*Streams leak
    */
   protected void dorun()
   {
      log.debug("beginning dorun");
      running = true;
      handlingResponse = true;
      try
      {
         //BufferedOutputStream bos = new BufferedOutputStream(socket.getOutputStream());
         bos = new BufferedOutputStream(socket.getOutputStream());
         //out = new OptimizedObjectOutputStream(bos);
         out = new ObjectOutputStream(bos);
         out.flush();
         //BufferedInputStream bis = new BufferedInputStream(socket.getInputStream());
         bis = new BufferedInputStream(socket.getInputStream());
         //in = new OptimizedObjectInputStream(bis);
         in = new ObjectInputStream(bis);
      }
      catch(Exception e)
      {
         log.error("Failed to initialize", e);
      }

      // Always do first one without an ACK because its not needed
      try
      {
         processInvocation();
      }
      catch(Exception ex)
      {
         log.debug("failed to process invocation.", ex);
         running = false;
      }

      // Re-use loop
      while(running)
      {
         try
         {
            acknowledge();
            processInvocation();
         }
         catch(InterruptedIOException e)
         {
            log.debug("socket timed out", e);
            running = false;
         }
         catch(InterruptedException e)
         {
            log.debug("interrupted", e);
         }
         catch(Exception ex)
         {
            log.debug("failed", ex);
            running = false;
         }
         // clear any interruption so that thread can be pooled.
         Thread.interrupted();
      }
      // Ok, we've been shutdown.  Do appropriate cleanups.
      try
      {
         if(in != null)
         {
            in.close();
         }
         if(out != null)
         {
            out.close();
         }
      }
      catch(Exception ex)
      {
      }
      try
      {
         socket.close();
      }
      catch(Exception ex)
      {
         log.error("Failed cleanup", ex);
      }
      socket = null;
      in = null;
      out = null;
   }
}