/*
 * JBoss, the OpenSource J2EE webOS
 *
 * Distributable under LGPL license.
 * See terms of license at gnu.org.
 */
package org.jboss.mq.il.oil;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;

import java.net.Socket;
import javax.jms.Destination;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import javax.net.SocketFactory;

import org.jboss.logging.Logger;
import org.jboss.mq.AcknowledgementRequest;
import org.jboss.mq.ConnectionToken;
import org.jboss.mq.DurableSubscriptionID;
import org.jboss.mq.SpyDestination;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.TransactionRequest;
import org.jboss.mq.il.ServerIL;

/**
 * The JVM implementation of the ServerIL object
 *
 * @author    Hiram Chirino (Cojonudo14@hotmail.com)
 * @author    Norbert Lataille (Norbert.Lataille@m4x.org)
 * @version   $Revision: 1.12.6.1 $
 * @created   August 16, 2001
 */
public final class OILServerIL
   implements java.io.Serializable, 
      java.lang.Cloneable,
      org.jboss.mq.il.ServerIL
{
   static final long serialVersionUID = 5576846920031604128L;
   private static Logger log = Logger.getLogger(OILServerIL.class);

   /** The org.jboss.mq.il.oil2.localAddr system property allows a client to
    *define the local interface to which its sockets should be bound
    */
   private final static String LOCAL_ADDR = "org.jboss.mq.il.oil.localAddr";
   /** The org.jboss.mq.il.oil2.localPort system property allows a client to
    *define the local port to which its sockets should be bound
    */
   private final static String LOCAL_PORT = "org.jboss.mq.il.oil.localPort";

   /** The server host name/IP to connect to
    */
   private InetAddress addr;
   /** The server port to connect to.
    */
   private int port;
   /** The name of the class implementing the javax.net.SocketFactory to
    * use for creating the client socket.
    */
   private String socketFactoryName;

   /**
    * If the TcpNoDelay option should be used on the socket.
    */
   private boolean enableTcpNoDelay = false;
   /** The local interface name/IP to use for the client
    */
   private transient InetAddress localAddr;
   /** The local port to use for the client
    */
   private transient int localPort;

   /**
    * Description of the Field
    */
   private transient ObjectInputStream in;

   /**
    * Description of the Field
    */
   private transient ObjectOutputStream out;
   
   /**
    * Description of the Field
    */
   private transient Socket socket;

   /**
    * Constructor for the OILServerIL object
    *
    * @param a     Description of Parameter
    * @param port  Description of Parameter
    */
   public OILServerIL(InetAddress addr, int port, 
      String socketFactoryName, boolean enableTcpNoDelay)
   {
      this.addr = addr;
      this.port = port;
      this.socketFactoryName = socketFactoryName;
      this.enableTcpNoDelay = enableTcpNoDelay;
   }

   /**
    * Sets the ConnectionToken attribute of the OILServerIL object
    *
    * @param dest           The new ConnectionToken value
    * @exception Exception  Description of Exception
    */
   public synchronized void setConnectionToken(ConnectionToken dest)
          throws Exception
   {
      checkConnection();
      out.writeByte(OILConstants.SET_SPY_DISTRIBUTED_CONNECTION);
      out.writeObject(dest);
      waitAnswer();
   }

   /**
    * Sets the Enabled attribute of the OILServerIL object
    *
    * @param dc                The new Enabled value
    * @param enabled           The new Enabled value
    * @exception JMSException  Description of Exception
    * @exception Exception     Description of Exception
    */
   public synchronized void setEnabled(ConnectionToken dc, boolean enabled)
          throws JMSException, Exception
   {
      checkConnection();
      out.writeByte(OILConstants.SET_ENABLED);
      out.writeBoolean(enabled);
      waitAnswer();
   }

   /**
    * Gets the ID attribute of the OILServerIL object
    *
    * @return               The ID value
    * @exception Exception  Description of Exception
    */
   public synchronized String getID()
          throws Exception
   {
      checkConnection();
      out.writeByte(OILConstants.GET_ID);
      return (String)waitAnswer();
   }

   /**
    * Gets the TemporaryQueue attribute of the OILServerIL object
    *
    * @param dc                Description of Parameter
    * @return                  The TemporaryQueue value
    * @exception JMSException  Description of Exception
    * @exception Exception     Description of Exception
    */
   public synchronized TemporaryQueue getTemporaryQueue(ConnectionToken dc)
          throws JMSException, Exception
   {
      checkConnection();
      out.writeByte(OILConstants.GET_TEMPORARY_QUEUE);
      return (TemporaryQueue)waitAnswer();
   }

   /**
    * Gets the TemporaryTopic attribute of the OILServerIL object
    *
    * @param dc                Description of Parameter
    * @return                  The TemporaryTopic value
    * @exception JMSException  Description of Exception
    * @exception Exception     Description of Exception
    */
   public synchronized TemporaryTopic getTemporaryTopic(ConnectionToken dc)
          throws JMSException, Exception
   {
      checkConnection();
      out.writeByte(OILConstants.GET_TEMPORARY_TOPIC);
      return (TemporaryTopic)waitAnswer();
   }

   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @param item              Description of Parameter
    * @exception JMSException  Description of Exception
    * @exception Exception     Description of Exception
    */
   public synchronized void acknowledge(ConnectionToken dc, AcknowledgementRequest item)
          throws JMSException, Exception
   {
      checkConnection();
      out.writeByte(OILConstants.ACKNOWLEDGE);
      item.writeExternal(out);
      waitAnswer();
   }

   /**
    * Adds a feature to the Message attribute of the OILServerIL object
    *
    * @param dc             The feature to be added to the Message attribute
    * @param val            The feature to be added to the Message attribute
    * @exception Exception  Description of Exception
    */
   public synchronized void addMessage(ConnectionToken dc, SpyMessage val)
          throws Exception
   {
      checkConnection();
      out.writeByte(OILConstants.ADD_MESSAGE);
      SpyMessage.writeMessage(val, out);
      waitAnswer();
   }

   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @param dest              Description of Parameter
    * @param selector          Description of Parameter
    * @return                  Description of the Returned Value
    * @exception JMSException  Description of Exception
    * @exception Exception     Description of Exception
    */
   public synchronized SpyMessage[] browse(ConnectionToken dc, Destination dest, String selector)
          throws JMSException, Exception
   {
      checkConnection();
      out.writeByte(OILConstants.BROWSE);
      out.writeObject(dest);
      out.writeObject(selector);
      return (SpyMessage[])waitAnswer();
   }

   /**
    * #Description of the Method
    *
    * @param ID                Description of Parameter
    * @exception JMSException  Description of Exception
    * @exception Exception     Description of Exception
    */
   public synchronized void checkID(String ID)
          throws JMSException, Exception
   {
      checkConnection();
      out.writeByte(OILConstants.CHECK_ID);
      out.writeObject(ID);
      waitAnswer();
   }

   /**
    * #Description of the Method
    *
    * @param userName          Description of Parameter
    * @param password          Description of Parameter
    * @return                  Description of the Returned Value
    * @exception JMSException  Description of Exception
    * @exception Exception     Description of Exception
    */
   public synchronized String checkUser(String userName, String password)
          throws JMSException, Exception
   {
      checkConnection();
      out.writeByte(OILConstants.CHECK_USER);
      out.writeObject(userName);
      out.writeObject(password);
      return (String)waitAnswer();
   }

   /**
    * #Description of the Method
    *
    * @param userName          Description of Parameter
    * @param password          Description of Parameter
    * @return                  Description of the Returned Value
    * @exception JMSException  Description of Exception
    * @exception Exception     Description of Exception
    */
   public synchronized String authenticate(String userName, String password)
          throws JMSException, Exception
   {
      checkConnection();
      out.writeByte(OILConstants.AUTHENTICATE);
      out.writeObject(userName);
      out.writeObject(password);
      return (String)waitAnswer();
   }
   /**
    * #Description of the Method
    *
    * @return                                Description of the Returned Value
    * @exception CloneNotSupportedException  Description of Exception
    */
   public Object clone()
          throws CloneNotSupportedException
   {
      return super.clone();
   }

   /**
    * Need to clone because there are instance variables tha can get clobbered.
    * All Multiple connections can NOT share the same JVMServerIL object
    *
    * @return               Description of the Returned Value
    * @exception Exception  Description of Exception
    */
   public ServerIL cloneServerIL()
          throws Exception
   {
      return (ServerIL)clone();
   }

   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @exception JMSException  Description of Exception
    * @exception Exception     Description of Exception
    */
   public synchronized void connectionClosing(ConnectionToken dc)
          throws JMSException, Exception
   {
      try
      {
         checkConnection();
         out.writeByte(OILConstants.CONNECTION_CLOSING);
         waitAnswer();
      }
      finally
      {
         destroyConnection();
      }
   }

   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @param dest              Description of Parameter
    * @return                  Description of the Returned Value
    * @exception JMSException  Description of Exception
    * @exception Exception     Description of Exception
    */
   public synchronized Queue createQueue(ConnectionToken dc, String dest)
          throws JMSException, Exception
   {
      checkConnection();
      out.writeByte(OILConstants.CREATE_QUEUE);
      out.writeObject(dest);
      return (Queue)waitAnswer();
   }

   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @param dest              Description of Parameter
    * @return                  Description of the Returned Value
    * @exception JMSException  Description of Exception
    * @exception Exception     Description of Exception
    */
   public synchronized Topic createTopic(ConnectionToken dc, String dest)
          throws JMSException, Exception
   {
      checkConnection();
      out.writeByte(OILConstants.CREATE_TOPIC);
      out.writeObject(dest);
      return (Topic)waitAnswer();
   }

   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @param dest              Description of Parameter
    * @exception JMSException  Description of Exception
    * @exception Exception     Description of Exception
    */
   public synchronized void deleteTemporaryDestination(ConnectionToken dc, SpyDestination dest)
          throws JMSException, Exception
   {
      checkConnection();
      out.writeByte(OILConstants.DELETE_TEMPORARY_DESTINATION);
      out.writeObject(dest);
      waitAnswer();
   }

   /**
    * #Description of the Method
    *
    * @param id                Description of Parameter
    * @exception JMSException  Description of Exception
    * @exception Exception     Description of Exception
    */
   public synchronized void destroySubscription(ConnectionToken dc,DurableSubscriptionID id)
          throws JMSException, Exception
   {
      checkConnection();
      out.writeByte(OILConstants.DESTROY_SUBSCRIPTION);
      out.writeObject(id);
      waitAnswer();
   }

   /**
    * #Description of the Method
    *
    * @param dc             Description of Parameter
    * @param clientTime     Description of Parameter
    * @exception Exception  Description of Exception
    */
   public synchronized void ping(ConnectionToken dc, long clientTime)
          throws Exception
   {
      checkConnection();
      out.writeByte(OILConstants.PING);
      out.writeLong(clientTime);
      waitAnswer();
   }

   /**
    * #Description of the Method
    *
    * @param dc             Description of Parameter
    * @param subscriberId   Description of Parameter
    * @param wait           Description of Parameter
    * @return               Description of the Returned Value
    * @exception Exception  Description of Exception
    */
   public synchronized SpyMessage receive(ConnectionToken dc, int subscriberId, long wait)
          throws Exception, Exception
   {
      checkConnection();
      out.writeByte(OILConstants.RECEIVE);
      out.writeInt(subscriberId);
      out.writeLong(wait);
      return (SpyMessage)waitAnswer();
   }

   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @param s                 Description of Parameter
    * @exception JMSException  Description of Exception
    * @exception Exception     Description of Exception
    */
   public synchronized void subscribe(ConnectionToken dc, org.jboss.mq.Subscription s)
          throws JMSException, Exception
   {
      checkConnection();
      out.writeByte(OILConstants.SUBSCRIBE);
      out.writeObject(s);
      waitAnswer();
   }

   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @param t                 Description of Parameter
    * @exception JMSException  Description of Exception
    * @exception Exception     Description of Exception
    */
   public synchronized void transact(org.jboss.mq.ConnectionToken dc, TransactionRequest t)
          throws JMSException, Exception
   {
      checkConnection();
      out.writeByte(OILConstants.TRANSACT);
      t.writeExternal(out);
      waitAnswer();
   }

   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @param subscriptionId    Description of Parameter
    * @exception JMSException  Description of Exception
    * @exception Exception     Description of Exception
    */
   public synchronized void unsubscribe(ConnectionToken dc, int subscriptionId)
          throws JMSException, Exception
   {
      checkConnection();
      out.writeByte(OILConstants.UNSUBSCRIBE);
      out.writeInt(subscriptionId);
      waitAnswer();
   }

   /**
    * #Description of the Method
    *
    * @exception Exception  Description of Exception
    */
   private void checkConnection()
          throws Exception
   {
      if (socket == null)
      {
         createConnection();
      }
   }

   /**
    * Used to establish a new connection to the server
    *
    * @exception Exception  Description of Exception
    */
   private void createConnection()
          throws Exception
   {
      boolean tracing = log.isTraceEnabled();
      if( tracing )
         log.trace("Connecting to : "+addr+":"+port);

      /** Attempt to load the socket factory and if this fails, use the
       * default socket factory impl.
       */
      SocketFactory socketFactory = null;
      if( socketFactoryName != null )
      {
         try
         {
            ClassLoader loader = Thread.currentThread().getContextClassLoader();
            Class factoryClass = loader.loadClass(socketFactoryName);
            socketFactory = (SocketFactory) factoryClass.newInstance();
         }
         catch(Exception e)
         {
            log.debug("Failed to load socket factory: "+socketFactoryName, e);
         }
      }
      // Use the default socket factory
      if( socketFactory == null )
      {
         socketFactory = SocketFactory.getDefault();
      }

      // Look for a local address and port as properties
      String tmp = System.getProperty(LOCAL_ADDR);
      if( tmp != null )
         this.localAddr = InetAddress.getByName(tmp);
      tmp = System.getProperty(LOCAL_PORT);
      if( tmp != null )
         this.localPort = Integer.parseInt(tmp);
      if( tracing )
      {
         log.trace("Connecting with addr="+addr+", port="+port
            + ", localAddr="+localAddr+", localPort="+localPort
            + ", socketFactory="+socketFactory);
      }

      if( localAddr != null )
         socket = socketFactory.createSocket(addr, port, localAddr, localPort);
      else
         socket = socketFactory.createSocket(addr, port);

      socket.setTcpNoDelay(enableTcpNoDelay);
      in = new ObjectInputStream(new BufferedInputStream(socket.getInputStream()));
      out = new ObjectOutputStream(new BufferedOutputStream(socket.getOutputStream()));
      out.flush();
   }
   
   /**
    * Used to close the current connection with the server
    *
    * @exception Exception  Description of Exception
    */
   private void destroyConnection()
          throws Exception
   {
      try
      {
         out.close();
         in.close();
      }
      finally
      {
         socket.close();
      }
   }

   /**
    * #Description of the Method
    *
    * @return               Description of the Returned Value
    * @exception Exception  Description of Exception
    */
   private Object waitAnswer()
          throws Exception
   {
      out.reset();
      out.flush();
      int val = in.readByte();
      if (val == OILConstants.SUCCESS)
      {
         return null;
      }
      if (val == OILConstants.SUCCESS_OBJECT)
      {
         return in.readObject();
      }
      else
      {
         Exception e = (Exception)in.readObject();
         throw e;
      }
   }
}