/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.remoting.transport.http;

import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import org.apache.commons.httpclient.Header;
import org.apache.commons.httpclient.HttpParser;
import org.jboss.remoting.InvocationRequest;
import org.jboss.remoting.InvokerLocator;
import org.jboss.remoting.ServerInvoker;
import org.jboss.remoting.marshal.MarshalFactory;
import org.jboss.remoting.marshal.Marshaller;
import org.jboss.remoting.marshal.UnMarshaller;
import org.jboss.remoting.marshal.http.HTTPMarshaller;
import org.jboss.remoting.marshal.http.HTTPUnMarshaller;
import org.jboss.remoting.marshal.serializable.SerializableMarshaller;

import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;

/**
 * @author <a href="mailto:telrod@e2technologies.net">Tom Elrod</a>
 */

/**
 * Server invoker implementation based on http protocol.  Is basically
 * a stand alone http server whose request are forwared to the invocation handler
 * and responses from invocation handler are sent back to caller as http response.
 */
public class HTTPServerInvoker extends ServerInvoker implements Runnable
{
   private static int BACKLOG_DEFAULT = 1000;
   private static int MAX_POOL_SIZE_DEFAULT = 250;

   private ServerSocket serverSocket = null;
   private int serverBindPort = 8888;
   private InetAddress bindAddress = null;

   private boolean running = false;

   private Thread[] acceptThreads;
   private int numAcceptThreads = 1;
   private PooledExecutor threadpool;
   private int maxPoolSize = MAX_POOL_SIZE_DEFAULT;


   private String serverBindAddress = "localhost";

   protected int backlog = BACKLOG_DEFAULT;

   // list of content types
   public static String HTML = "text/html";
   public static String PLAIN = "text/plain";
   public static String SOAP = "application/soap+xml";
   public static String BINARY = "application/octet-stream";

   // header constants
   public static String HEADER_SESSION_ID = "sessionId";
   public static String HEADER_SUBSYSTEM = "subsystem";

   public HTTPServerInvoker(InvokerLocator locator)
   {
      super(locator);
   }

   public HTTPServerInvoker(InvokerLocator locator, Map configuration)
   {
      super(locator, configuration);
   }

   protected String getDefaultDataType()
   {
      return SerializableMarshaller.DATATYPE;
   }

   /**
    * returns true if the transport is bi-directional in nature, for example,
    * HTTP in unidirectional and SOCKETs are bi-directional (unless behind a firewall
    * for example).
    *
    * @return false (HTTP is unidirrectional)
    */
   public boolean isTransportBiDirectional()
   {
      return false;
   }

   public void start() throws IOException
   {
      if(!running)
      {
         running = true;

         try
         {
            threadpool = new PooledExecutor(new BoundedBuffer(maxPoolSize));
            //threadpool = new PooledExecutor(new LinkedQueue(), maxPoolSize);
            //TODO: -TME setup thread exhausted policy
            threadpool.waitWhenBlocked();
            //threadpool.runWhenBlocked();

            bindAddress = InetAddress.getByName(serverBindAddress);
            serverBindPort = getLocator().getPort();
            serverSocket = new ServerSocket(serverBindPort, backlog, bindAddress);

            acceptThreads = new Thread[numAcceptThreads];
            for(int i = 0; i < numAcceptThreads; i++)
            {
               String name = "HTTPServerInvoker#" + i + "-" + serverBindPort;
               acceptThreads[i] = new Thread(this, name);
               acceptThreads[i].start();
            }
         }
         catch(IOException e)
         {
            log.error("Error starting ServerSocket.  Bind port: " + serverBindPort + ", bind address: " + bindAddress);
            throw e;
         }
      }
      super.start();
   }

   public int getCurrentThreadPoolSize()
   {
      return threadpool.getPoolSize();
   }

   public void run()
   {
      while(running)
      {
         try
         {
            Socket socket = serverSocket.accept();

            if(socket != null)
            {
               Runnable worker = new HTTPServerWorker(socket);
               try
               {
                  threadpool.execute(worker);
               }
               catch(InterruptedException e)
               {
                  log.error("HTTP server worker thread interrupted.", e);
               }
            }
         }
         catch(Throwable thr)
         {
            log.error("Error processing incoming request.", thr);
         }
      }
   }

   /**
    * Will write out the object to byte array and check size of byte array.
    * This is VERY expensive, but need for the content length.
    *
    * @param response
    * @return
    */
   private int getContentLength(Object response) throws IOException
   {
      ByteArrayOutputStream bos = new ByteArrayOutputStream();
      ObjectOutputStream oos = new ObjectOutputStream(bos);
      oos.writeObject(response);
      oos.flush();
      bos.flush();
      byte buffer[] = bos.toByteArray();
      return buffer.length;
   }

   private InvocationRequest getInvocationRequest(Map metadata, Object obj)
   {
      InvocationRequest request = null;

      if(obj instanceof InvocationRequest)
      {
         request = (InvocationRequest) obj;
         if(request.getRequestPayload() == null)
         {
            request.setRequestPayload(metadata);
         }
         else
         {
            request.getRequestPayload().putAll(metadata);
         }
      }
      else
      {
         request = createNewInvocationRequest(metadata, obj);
      }
      return request;
   }

   private InvocationRequest createNewInvocationRequest(Map metadata, Object payload)
   {
      // will try to use the same session id if possible to track
      String sessionId = getSessionId(metadata);
      String subSystem = (String) metadata.get(HEADER_SUBSYSTEM);

      InvocationRequest request = new InvocationRequest(sessionId, subSystem, payload, metadata, null, null);

      return request;
   }

   private String getSessionId(Map metadata)
   {
      String sessionId = (String) metadata.get(HEADER_SESSION_ID);

      if(sessionId == null || sessionId.length() == 0)
      {
         String userAgent = (String) metadata.get("User-Agent");
         String host = (String) metadata.get("Host");
         String idSeed = userAgent + ":" + host;
         sessionId = Integer.toString(idSeed.hashCode());
      }

      return sessionId;
   }

   private boolean isBinary(String requestContentType)
   {
      if(BINARY.equalsIgnoreCase(requestContentType))
      {
         return true;
      }
      else
      {
         return false;
      }
   }

   public void stop()
   {
      if(running)
      {
         running = false;

         maxPoolSize = 0; // so ServerThreads don't reinsert themselves
         for(int i = 0; i < acceptThreads.length; i++)
         {
            try
            {
               acceptThreads[i].interrupt();
            }
            catch(Exception ignored)
            {
            }
         }

         try
         {
            threadpool.shutdownAfterProcessingCurrentlyQueuedTasks();
            threadpool.awaitTerminationAfterShutdown();
         }
         catch(InterruptedException e)
         {
            e.printStackTrace();
         }

         try
         {
            if(serverSocket != null && !serverSocket.isClosed())
            {
               serverSocket.close();
            }
            serverSocket = null;
         }
         catch(Exception e)
         {
         }
      }
      super.stop();

      log.debug("HTTPServerInvoker stopped.");
   }

   public String getMBeanObjectName()
   {
      return "jboss.remoting:service=invoker,transport=http";
   }

   private class HTTPServerWorker implements Runnable
   {
      private Socket socket;

      public HTTPServerWorker(Socket socket)
      {
         this.socket = socket;
      }

      public void run()
      {
         try
         {
            processRequest(this.socket);
         }
         catch(Throwable e)
         {
            log.error("Error processing http request in HTTPServerInvoker::HTTPServerWorker " + Thread.currentThread().getName(), e);
         }
      }

      private void processRequest(Socket socket)
      {
         DataInputStream dataInput = null;
         DataOutputStream dataOutput = null;
         BufferedOutputStream bufferOutput = null;
         try
         {
            Object response = null;
            boolean isError = false;
            String requestContentType = null;
            try
            {
               dataInput = new DataInputStream(socket.getInputStream());

               // Need to parse the header to find Content-Type

               /**
                * Read the first line, as this will be the POST or GET, path, and HTTP version.
                * Then next comes the headers.  (key value seperated by a ': '
                * Then followed by an empty \r\n, which will be followed by the payload
                */
               ByteArrayOutputStream buffer = new ByteArrayOutputStream();
               int ch;
               while((ch = dataInput.read()) >= 0)
               {
                  buffer.write(ch);
                  if(ch == '\n')
                  {
                     break;
                  }
               }

               String methodType = null;
               String path = null;
               String httpVersion = null;

               byte[] firstLineRaw = buffer.toByteArray();
               // now need to backup and make sure the character before the \n was a \r
               if(firstLineRaw[firstLineRaw.length - 2] == '\r')
               {
                  //Got our first line, now to set the variables
                  String firstLine = new String(firstLineRaw).trim();
                  int startIndex = 0;
                  int endIndex = firstLine.indexOf(' ');
                  methodType = firstLine.substring(startIndex, endIndex);
                  startIndex = endIndex + 1;
                  endIndex = firstLine.indexOf(' ', startIndex);
                  path = firstLine.substring(startIndex, endIndex);
                  startIndex = endIndex + 1;
                  httpVersion = firstLine.substring(startIndex);
               }
               else
               {
                  System.err.println("Error processing first line.  Should have ended in \r\n, but did not");
                  throw new RuntimeException("Error processing HTTP request type.  First line of request is invalid.");
               }

               Map metadata = new HashMap();
               Header[] headers = HttpParser.parseHeaders(dataInput);
               for(int x = 0; x < headers.length; x++)
               {
                  String headerName = headers[x].getName();
                  String headerValue = headers[x].getName();
                  metadata.put(headerName, headerValue);
                  // doing this instead of getting from map since ignores case
                  if("Content-Type".equalsIgnoreCase(headerName))
                  {
                     requestContentType = headers[x].getValue();
                  }
               }

               UnMarshaller unmarshaller = MarshalFactory.getUnMarshaller(HTTPUnMarshaller.DATATYPE);
               Object obj = unmarshaller.read(dataInput, metadata);

               InvocationRequest request = null;

               if(obj instanceof InvocationRequest)
               {
                  request = (InvocationRequest) obj;
               }
               else
               {
                  if(isBinary(requestContentType))
                  {
                     request = getInvocationRequest(metadata, obj);
                  }
                  else
                  {
                     request = createNewInvocationRequest(metadata, obj);
                  }
               }

               try
               {
                  // call transport on the subclass, get the result to handback
                  response = invoke(request);
               }
               catch(Throwable ex)
               {
                  log.debug("Error thrown calling invoke on server invoker.", ex);
                  response = ex;
                  isError = true;
               }

               dataOutput = new DataOutputStream(socket.getOutputStream());
               bufferOutput = new BufferedOutputStream(dataOutput, 1024);
            }
            catch(Throwable thr)
            {
               log.debug("Error thrown processing request.  Probably error with processing headers.", thr);
               if(thr instanceof Exception)
               {
                  response = (Exception) thr;
               }
               else
               {
                  response = new Exception(thr);
               }
               isError = true;
            }
            if(response == null)
            {
               //TODO: -TME Shoudl this be 1.0 or 1.1?
               bufferOutput.write("HTTP/1.1 ".getBytes());
               bufferOutput.write("200 JBoss Remoting: Request Processed Successfully".getBytes());
               String contentType = "\r\n" + "Content-Type" + ": " + "text/html";
               bufferOutput.write(contentType.getBytes());
               String contentLength = "\r\n" + "Content-Length" + ": " + 0;
               bufferOutput.write(contentLength.getBytes());
               //dataOutput.write(("\r\n" + "\r\n").getBytes());
               //dataOutput.write("NULL return".getBytes());
               bufferOutput.flush();
            }
            else
            {
               //TODO: -TME Shoudl this be 1.0 or 1.1?
               bufferOutput.write("HTTP/1.1 ".getBytes());
               String status = isError ? "500 JBoss Remoting: Error occurred within target application." :
                               "200 JBoss Remoting: Request Processed Succussfully";
               bufferOutput.write(status.getBytes());

               // write return headers
               String contentType = "\r\n" + "Content-Type" + ": " + requestContentType;
               bufferOutput.write(contentType.getBytes());
               int iContentLength = getContentLength(response);
               String contentLength = "\r\n" + "Content-Length" + ": " + iContentLength;
               bufferOutput.write(contentLength.getBytes());
               // content seperator
               bufferOutput.write(("\r\n" + "\r\n").getBytes());

               // write response
               //Marshaller marshaller = getMarshallerByContentType(requestContentType);
               Marshaller marshaller = MarshalFactory.getMarshaller(getLocator(), this.getClass().getClassLoader());
               if(marshaller == null)
               {
                  marshaller = MarshalFactory.getMarshaller(HTTPMarshaller.DATATYPE);
               }
               marshaller.write(response, bufferOutput);

            }
         }
         catch(Exception e)
         {
            log.error("Error processing client request.", e);
         }
         finally
         {
            if(dataInput != null)
            {
               try
               {
                  dataInput.close();
               }
               catch(Exception e)
               {
                  log.warn("Error closing resource.", e);
               }
            }
            if(dataOutput != null)
            {
               try
               {
                  dataOutput.close();
               }
               catch(Exception e)
               {
                  log.warn("Error closing resource.", e);
               }
            }
            try
            {
               socket.close();
            }
            catch(Exception e)
            {
               log.warn("Error closing resource.", e);
            }
         }
      }

   }
}