/*
 * JBoss, the OpenSource J2EE webOS
 *
 * Distributable under LGPL license.
 * See terms of license at gnu.org.
 */
package org.jboss.mq.il.oil;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.rmi.RemoteException;
import java.util.Properties;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.naming.InitialContext;
import javax.net.ServerSocketFactory;

import org.jboss.logging.Logger;
import org.jboss.mq.AcknowledgementRequest;
import org.jboss.mq.ConnectionToken;
import org.jboss.mq.DurableSubscriptionID;
import org.jboss.mq.SpyDestination;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.Subscription;
import org.jboss.mq.TransactionRequest;
import org.jboss.mq.il.Invoker;
import org.jboss.mq.il.ServerIL;
import org.jboss.security.SecurityDomain;
import org.jboss.system.server.ServerConfigUtil;

/**
 * Implements the ServerILJMXService which is used to manage the JVM IL.
 *
 * @author    Hiram Chirino (Cojonudo14@hotmail.com)
 * @version   $Revision: 1.30 $
 *
 * @jmx:mbean extends="org.jboss.mq.il.ServerILJMXServiceMBean"
 */
public final class OILServerILService
   extends org.jboss.mq.il.ServerILJMXService
   implements java.lang.Runnable, OILServerILServiceMBean
{
   /**
    * logger instance.
    */
   final static private Logger log = Logger.getLogger(OILServerILService.class);

   /**
    * The default timeout for the server socket. This is
    * set so the socket will periodically return to check
    * the running flag.
    */
   private final static int SO_TIMEOUT = 5000;

   /**
    * The JMS server where requests are forwarded to.
    */
   //private static JMSServerInvoker server;
   private Invoker server;

   /**
    * If the TcpNoDelay option should be used on the socket.
    */
   private boolean enableTcpNoDelay = false;

   /**
    * The read timeout
    */
   private int readTimeout = 0;

   /** The security domain name to use with SSL aware socket factories.
    */
   private String securityDomain;

   /* The javax.net.SocketFactory implementation class to use on the client.
    */
   private String clientSocketFactoryName;
   /** The socket factory used to obtain the server socket.
    */
   private ServerSocketFactory serverSocketFactory;
   /**
    * The listening socket that receives incomming connections
    * for servicing.
    */
   private ServerSocket serverSocket;

   /**
    * The managed serverIL.
    */
   private OILServerIL serverIL;

   /**
    * The running flag that all worker and server
    * threads check to determine if the service should
    * be stopped.
    */
   private volatile boolean running;

   /**
    * The server port to bind to.
    */
   private int serverBindPort = 0;

   /**
    * The internet address to bind to by
    * default.
    */
   private InetAddress bindAddress = null;

   /**
    * Number of OIL Worker threads started.
    */
   private int threadNumber = 0;

   /**
    * The connection properties passed to the client to connect to this IL
    */
   private Properties connectionProperties;

   /**
    * This class is used to encapsulate the basic connection and
    * work for a connected client thread. The run() method of this
    * class processes requests and sends responses to and from a 
    * single client. All requests are forwarded to the outer class'
    * JMS server instance.
    *
    * @author Brian Weaver (weave@opennms.org)
    */
   private final class Client implements Runnable
   {
      /**
       * The TCP/IP socket for communications.
       */
      private Socket sock;

      /**
       * The object output stream running on top of the
       * socket's output stream.
       */
      private ObjectOutputStream out;

      /**
       * The objec5t input stream running on top of the
       * socket's input stream
       */
      private ObjectInputStream in;

      /**
       * Allocates a new runnable instance to process requests from
       * the server's client and pass them to the JMS server. The
       * passed socket is used for all communications between the 
       * service and the client.
       *
       * @param s The socket used for communications.
       *
       * @throws java.io.IOException Thrown if an I/O error occurs
       *    constructing the object streams.
       */
      Client(Socket s) throws IOException
      {
         this.sock = s;
         this.out = new ObjectOutputStream(new BufferedOutputStream(this.sock.getOutputStream()));
         this.out.flush();
         this.in = new ObjectInputStream(new BufferedInputStream(this.sock.getInputStream()));
         sock.setTcpNoDelay(enableTcpNoDelay);
         if (log.isTraceEnabled())
            log.trace("Setting TcpNoDelay Option to:" + enableTcpNoDelay);
      }

      /**
       * The main threads processing routine. This loop processes
       * requests from the server and sends the appropriate responses
       * based upon the results.
       */
      public void run()
      {
         int code = 0;
         boolean closed = false;
         ConnectionToken connectionToken = null;

         while (!closed && running)
         {
            try
            {
               // read in the next request/directive
               // from the client
               //
               code = in.readByte();
            }
            catch (EOFException e)
            {
               // end of file, exit processing loop
               //
               break;
            }
            catch (IOException e)
            {
               if (closed || !running)
               {
                  // exit out of the loop if the connection
                  // is closed or the service is shutdown
                  //
                  break;
               }
               log.warn("Connection failure (1).", e);
               break;
            }

            // now based upon the input directive, preform the 
            // requested action. Any exceptions are processed
            // and potentially returned to the client.
            //
            try
            {
               Object result = null;

               switch (code)
               {
                  case OILConstants.SET_SPY_DISTRIBUTED_CONNECTION :
                     // assert connectionToken == null
                     connectionToken = (ConnectionToken) in.readObject();
                     break;

                  case OILConstants.ACKNOWLEDGE :
                     AcknowledgementRequest ack = new AcknowledgementRequest();
                     ack.readExternal(in);
                     server.acknowledge(connectionToken, ack);
                     break;

                  case OILConstants.ADD_MESSAGE :
                     server.addMessage(connectionToken, SpyMessage.readMessage(in));
                     break;

                  case OILConstants.BROWSE :
                     result = server.browse(connectionToken, (Destination) in.readObject(), (String) in.readObject());
                     break;

                  case OILConstants.CHECK_ID :
                     String ID = (String) in.readObject();
                     server.checkID(ID);
                     if (connectionToken != null)
                        connectionToken.setClientID(ID);
                     break;

                  case OILConstants.CONNECTION_CLOSING :
                     server.connectionClosing(connectionToken);
                     closed = true;
                     break;

                  case OILConstants.CREATE_QUEUE :
                     result = server.createQueue(connectionToken, (String) in.readObject());
                     break;

                  case OILConstants.CREATE_TOPIC :
                     result = server.createTopic(connectionToken, (String) in.readObject());
                     break;

                  case OILConstants.DELETE_TEMPORARY_DESTINATION :
                     server.deleteTemporaryDestination(connectionToken, (SpyDestination) in.readObject());
                     break;

                  case OILConstants.GET_ID :
                     result = server.getID();
                     if (connectionToken != null)
                        connectionToken.setClientID((String) result);
                     break;

                  case OILConstants.GET_TEMPORARY_QUEUE :
                     result = server.getTemporaryQueue(connectionToken);
                     break;

                  case OILConstants.GET_TEMPORARY_TOPIC :
                     result = server.getTemporaryTopic(connectionToken);
                     break;

                  case OILConstants.RECEIVE :
                     result = server.receive(connectionToken, in.readInt(), in.readLong());
                     break;

                  case OILConstants.SET_ENABLED :
                     server.setEnabled(connectionToken, in.readBoolean());
                     break;

                  case OILConstants.SUBSCRIBE :
                     server.subscribe(connectionToken, (Subscription) in.readObject());
                     break;

                  case OILConstants.TRANSACT :
                     TransactionRequest trans = new TransactionRequest();
                     trans.readExternal(in);
                     server.transact(connectionToken, trans);
                     break;

                  case OILConstants.UNSUBSCRIBE :
                     server.unsubscribe(connectionToken, in.readInt());
                     break;

                  case OILConstants.DESTROY_SUBSCRIPTION :
                     server.destroySubscription(connectionToken, (DurableSubscriptionID) in.readObject());
                     break;

                  case OILConstants.CHECK_USER :
                     result = server.checkUser((String) in.readObject(), (String) in.readObject());
                     break;

                  case OILConstants.PING :
                     server.ping(connectionToken, in.readLong());
                     break;

                  case OILConstants.AUTHENTICATE :
                     result = server.authenticate((String) in.readObject(), (String) in.readObject());
                     break;

                  default :
                     throw new RemoteException("Bad method code !");
               }

               //Everthing was OK, otherwise
               //an exception would have prevented us from getting
               //to this point in the code ;-)
               //
               try
               {
                  if (result == null)
                  {
                     out.writeByte(OILConstants.SUCCESS);
                  }
                  else
                  {
                     out.writeByte(OILConstants.SUCCESS_OBJECT);
                     out.writeObject(result);
                     out.reset();
                  }
                  out.flush();
               }
               catch (IOException e)
               {
                  if (closed)
                     break;

                  log.warn("Connection failure (2).", e);
                  break;
               }

            }
            catch (Exception e)
            {
               // this processes any exceptions from the actually switch
               // statement processing
               //
               if (closed)
                  break;

               log.warn("Client request resulted in a server exception: ", e);

               try
               {
                  out.writeByte(OILConstants.EXCEPTION);
                  out.writeObject(e);
                  out.reset();
                  out.flush();
               }
               catch (IOException e2)
               {
                  if (closed)
                     break;

                  log.warn("Connection failure (3).", e);
                  break;
               }
            } // end catch of try { switch(code) ... }
         } // end whlie loop

         try
         {
            if (!closed)
            {
               try
               {
                  server.connectionClosing(connectionToken);
               }
               catch (JMSException e)
               {
                  // do nothing
               }
            }
            in.close();
            out.close();
         }
         catch (IOException e)
         {
            log.warn("Connection failure during connection close.", e);
         }
         finally
         {
            try
            {
               sock.close();
            }
            catch (IOException e)
            {
               log.warn("Connection failure during connection close.", e);
            }
         }
      } // end run method
   }

   /**
    * Used to construct the GenericConnectionFactory (bindJNDIReferences()
    * builds it) Sets up the connection properties need by a client to use this
    * IL
    *
    * @return   The ClientConnectionProperties value
    */
   public java.util.Properties getClientConnectionProperties()
   {
      return connectionProperties;
   }

   /**
    * Gives this JMX service a name.
    *
    * @return   The Name value
    */
   public String getName()
   {
      return "JBossMQ-OILServerIL";
   }

   /**
    * Used to construct the GenericConnectionFactory (bindJNDIReferences()
    * builds it)
    *
    * @return    The ServerIL value
    * @returns   ServerIL the instance of this IL
    */
   public ServerIL getServerIL()
   {
      return serverIL;
   }

   /**
    * Main processing method for the OILServerILService object
    */
   public void run()
   {
      try
      {
         while (running)
         {
            Socket socket = null;
            try
            {
               socket = serverSocket.accept();
            }
            catch (java.io.InterruptedIOException e)
            {
               continue;
            }

            // it's possible that the service is no longer
            // running but it got a connection, no point in
            // starting up a thread!
            //
            if (!running)
            {
               if (socket != null)
               {
                  try
                  {
                     socket.close();
                  }
                  catch (Exception e)
                  {
                     // do nothing
                  }
               }
               return;
            }

            try
            {
               socket.setSoTimeout(readTimeout);
               new Thread(new Client(socket), "OIL Worker-" + threadNumber++).start();
            }
            catch (IOException ie)
            {
               if (log.isDebugEnabled())
               {
                  log.debug("IOException processing client connection", ie);
                  log.debug("Dropping client connection, server will not terminate");
               }
            }
         }
      }
      catch (SocketException e)
      {
         // There is no easy way (other than string comparison) to
         // determine if the socket exception is caused by connection
         // reset by peer. In this case, it's okay to ignore both
         // SocketException and IOException.
         if (running)
            log.warn("SocketException occured (Connection reset by peer?). Cannot initialize the OILServerILService.");
      }
      catch (IOException e)
      {
         if (running)
            log.warn("IOException occured. Cannot initialize the OILServerILService.", e);
      }
      catch (Throwable t)
      {
         log.warn("Unexpected error occured. Cannot initialize the OILServerILService.", t);
      }
      try
      {
         serverSocket.close();
      }
      catch (Exception e)
      {
         log.debug("error closing server socket", e);
      }
      return;
   }

   /**
    * Starts this IL, and binds it to JNDI
    *
    * @exception Exception  Description of Exception
    */
   public void startService() throws Exception
   {
      super.startService();

      running = true;
      this.server = lookupJMSServer();

      // Use the default javax.net.ServerSocketFactory if none was set
      if (serverSocketFactory == null)
         serverSocketFactory = ServerSocketFactory.getDefault();

      /* See if the server socket supports setSecurityDomain(SecurityDomain)
      if an securityDomain was specified
      */
      if (securityDomain != null)
      {
         try
         {
            InitialContext ctx = new InitialContext();
            Class ssfClass = serverSocketFactory.getClass();
            SecurityDomain domain = (SecurityDomain) ctx.lookup(securityDomain);
            Class[] parameterTypes = { SecurityDomain.class };
            Method m = ssfClass.getMethod("setSecurityDomain", parameterTypes);
            Object[] args = { domain };
            m.invoke(serverSocketFactory, args);
         }
         catch (NoSuchMethodException e)
         {
            log.error("Socket factory does not support setSecurityDomain(SecurityDomain)");
         }
         catch (Exception e)
         {
            log.error("Failed to setSecurityDomain=" + securityDomain + " on socket factory");
         }
      }

      // Create the server socket using the socket factory
      serverSocket = serverSocketFactory.createServerSocket(serverBindPort, 50, bindAddress);
      serverSocket.setSoTimeout(SO_TIMEOUT);

      InetAddress socketAddress = serverSocket.getInetAddress();
      if (log.isInfoEnabled())
         log.info("JBossMQ OIL service available at : " + socketAddress + ":" + serverSocket.getLocalPort());

      new Thread(server.getThreadGroup(), this, "OIL Worker Server").start();

      /* We need to check the socketAddress against "0.0.0.0/0.0.0.0"
         because this is not a valid address on Win32 while it is for
         *NIX. See BugParade bug #4343286.
         */
      socketAddress = ServerConfigUtil.fixRemoteAddress(socketAddress);

      serverIL = new OILServerIL(socketAddress, serverSocket.getLocalPort(),
         clientSocketFactoryName, enableTcpNoDelay);

      // Initialize the connection poperties using the base class.
      connectionProperties = super.getClientConnectionProperties();
      connectionProperties.setProperty(
         OILServerILFactory.CLIENT_IL_SERVICE_KEY,
         "org.jboss.mq.il.oil.OILClientILService");
      connectionProperties.setProperty(OILServerILFactory.OIL_PORT_KEY, "" + serverSocket.getLocalPort());
      connectionProperties.setProperty(OILServerILFactory.OIL_ADDRESS_KEY, "" + socketAddress.getHostAddress());
      connectionProperties.setProperty(OILServerILFactory.OIL_TCPNODELAY_KEY, enableTcpNoDelay ? "yes" : "no");

      bindJNDIReferences();

   }

   /**
    * Stops this IL, and unbinds it from JNDI.
    */
   public void stopService()
   {
      try
      {
         unbindJNDIReferences();
      }
      catch (Exception e)
      {
         log.error("Exception unbinding from JNDI", e);
      }
      try
      {
         running = false;
         if (serverSocket != null)
            serverSocket.close();
      }
      catch (Exception e)
      {
         log.debug("Exception stopping server thread", e);
      }
   }

   /**
    * Get the OIL server listening port
    *
    * @return Value of property serverBindPort.
    *
    * @jmx:managed-attribute
    */
   public int getServerBindPort()
   {
      return serverBindPort;
   }

   /**
    * Set the OIL server listening port
    *
    * @param serverBindPort New value of property serverBindPort.
    *
    * @jmx:managed-attribute
    */
   public void setServerBindPort(int serverBindPort)
   {
      this.serverBindPort = serverBindPort;
   }

   /**
    * Get the interface address the OIL server bind its listening port on.
    *
    * @return The hostname or dotted decimal address that the service is
    *    bound to.
    *
    * @jmx:managed-attribute
    */
   public String getBindAddress()
   {
      String addr = "0.0.0.0";
      if (bindAddress != null)
         addr = bindAddress.getHostName();
      return addr;
   }
   /**
    * Set the interface address the OIL server bind its listening port on.
    *
    * @param host The host address to bind to, if any.
    *
    * @throws java.net.UnknownHostException Thrown if the hostname cannot
    *    be resolved to an InetAddress object.
    *
    * @jmx:managed-attribute
    */
   public void setBindAddress(String host) throws UnknownHostException
   {
      // If host is null or empty use any address
      if (host == null || host.length() == 0)
         bindAddress = null;
      else
         bindAddress = InetAddress.getByName(host);
   }

   /**
    * Gets the enableTcpNoDelay.
    * @return Returns a boolean
    *
    * @jmx:managed-attribute
    */
   public boolean getEnableTcpNoDelay()
   {
      return enableTcpNoDelay;
   }

   /**
    * Sets the enableTcpNoDelay.
    * @param enableTcpNoDelay The enableTcpNoDelay to set
    *
    * @jmx:managed-attribute
    */
   public void setEnableTcpNoDelay(boolean enableTcpNoDelay)
   {
      this.enableTcpNoDelay = enableTcpNoDelay;
   }

   /**
    * Gets the socket read timeout.
    * @return Returns the read timeout in milli-seconds
    *
    * @jmx:managed-attribute
    */
   public int getReadTimeout()
   {
      return readTimeout;
   }

   /**
    * Sets the read time out.
    * @param timeout The read time out in milli seconds
    *
    * @jmx:managed-attribute
    */
   public void setReadTimeout(int timeout)
   {
      this.readTimeout = timeout;
   }

   /** Get the javax.net.SocketFactory implementation class to use on the
    *client.
    * @jmx:managed-attribute
    */
   public String getClientSocketFactory()
   {
      return clientSocketFactoryName;
   }
   /** Set the javax.net.SocketFactory implementation class to use on the
    *client.
    * @jmx:managed-attribute
    */
   public void setClientSocketFactory(String name)
   {
      this.clientSocketFactoryName = name;
   }

   /** Set the javax.net.ServerSocketFactory implementation class to use to
    *create the service SocketFactory.
    *@jmx:managed-attribute
    */
   public void setServerSocketFactory(String name) throws Exception
   {
      ClassLoader loader = Thread.currentThread().getContextClassLoader();
      Class ssfClass = loader.loadClass(name);
      serverSocketFactory = (ServerSocketFactory) ssfClass.newInstance();
   }
   /** Get the javax.net.ServerSocketFactory implementation class to use to
    *create the service SocketFactory.
    *@jmx:managed-attribute
    */
   public String getServerSocketFactory()
   {
      String name = null;
      if (serverSocketFactory != null)
         name = serverSocketFactory.getClass().getName();
      return name;
   }

   /** Set the security domain name to use with SSL aware socket factories
    *@jmx:managed-attribute
    */
   public void setSecurityDomain(String domainName)
   {
      this.securityDomain = domainName;
   }
   /** Get the security domain name to use with SSL aware socket factories
    *@jmx:managed-attribute
    */
   public String getSecurityDomain()
   {
      return this.securityDomain;
   }
}
// vim:expandtab:tabstop=3:shiftwidth=3