/***************************************
 *                                     *
 *  JBoss: The OpenSource J2EE WebOS   *
 *                                     *
 *  Distributable under LGPL license.  *
 *  See terms of license at gnu.org.   *
 *                                     *
 ***************************************/
package org.jboss.remoting;

import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
import org.jboss.remoting.invocation.InternalInvocation;
import org.jboss.remoting.invocation.OnewayInvocation;
import org.jboss.remoting.loading.ClassBytes;

import javax.management.MBeanServer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * ServerInvoker is the server-side part of a remote Invoker.  The ServerInvoker implementation is
 * responsible for calling transport, depending on how the protocol receives the incoming data.
 *
 * @author <a href="mailto:jhaynie@vocalocity.net">Jeff Haynie</a>
 * @author <a href="mailto:tom.elrod@jboss.com">Tom Elrod</a>
 * @version $Revision: 1.12.8.2 $
 */
public abstract class ServerInvoker extends AbstractInvoker implements ServerInvokerMBean
{
   /**
    * The max number of worker threads to be used in the
    * pool for processing one way calls on the server side.
    * Value is is 100.
    */
   public static final int MAX_NUM_ONEWAY_THREADS = 100;

   protected Map handlers = new HashMap();
   protected Map callbackHandlers = new HashMap();
   private Map clientCallbackListener = new HashMap();
   private boolean started = false;
   private static PooledExecutor onewayExecutor;
   private static int onewayThreadCounter = 0;

   private MBeanServer mbeanServer = null;

   private Map configuration = new HashMap();

   private String dataType;


   public ServerInvoker(InvokerLocator locator)
   {
      super(locator);
      Map params = locator.getParameters();
      if(configuration != null && params != null)
      {
         configuration.putAll(locator.getParameters());
      }
   }

   public ServerInvoker(InvokerLocator locator, Map configuration)
   {
      super(locator);
      this.configuration = configuration;
      Map locatorParams = locator.getParameters();
      if(configuration != null && locatorParams != null)
      {
         configuration.putAll(locator.getParameters());
      }
   }

   private synchronized static Executor getOnewayExecutor()
   {
      if(onewayExecutor == null)
      {
         onewayExecutor = new PooledExecutor(MAX_NUM_ONEWAY_THREADS);
         onewayExecutor.setKeepAliveTime(3000);
         onewayExecutor.waitWhenBlocked();
         onewayExecutor.setThreadFactory(new ThreadFactory()
         {
            public Thread newThread(Runnable runnable)
            {
               return new Thread(runnable, "Remoting server oneway " + onewayThreadCounter++);
            }
         });
      }
      return onewayExecutor;
   }

   public MBeanServer getMBeanServer()
   {
      return mbeanServer;
   }

   public void setMBeanServer(MBeanServer server)
   {
      /**
       * This has been added in order to support mbean service configuration.
       * Now supporting classes, such as the ServerInvokerCallbackHandler can find and use
       * resources such as CallbackStore, which can be run as a service mbean (and specified
       * via object name within config).  The use of JMX throughout remoting is a problem as
       * now have to tie it in all throughout the code for service configuration as is being
       * done here.  When migrate to use under new server model, which does not depend on JMX,
       * can rip out code such as this.
       */
      this.mbeanServer = server;
   }

   /**
    * return true if a server invocation handler has been registered for this subsystem
    *
    * @param subsystem
    * @return
    */
   public synchronized boolean hasInvocationHandler(String subsystem)
   {
      return handlers.containsKey(subsystem);
   }

   /**
    * return array of keys for each subsystem this invoker can handle
    *
    * @return
    */
   public synchronized String[] getSupportedSubsystems()
   {
      String subsystems [] = new String[handlers.size()];
      return (String[]) handlers.keySet().toArray(subsystems);
   }

   /**
    * return an array of the server invocation handlers
    *
    * @return
    */
   public synchronized ServerInvocationHandler[] getInvocationHandlers()
   {
      ServerInvocationHandler ih [] = new ServerInvocationHandler[handlers.size()];
      return (ServerInvocationHandler[]) handlers.values().toArray(ih);
   }

   /**
    * add a server invocation handler for a particular subsystem.  Typically, subsystems are defined
    * in org.jboss.remoting.Subsystem, however, this can be any string that the caller knows about.
    *
    * @param subsystem
    * @param handler
    */
   public synchronized void addInvocationHandler(String subsystem, ServerInvocationHandler handler)
   {
      handler.setInvoker(this);
      handlers.put(subsystem.toUpperCase(), handler);
   }

   /**
    * remove a subsystem invocation handler
    *
    * @param subsystem
    */
   public synchronized ServerInvocationHandler removeInvocationHandler(String subsystem)
   {
      return (ServerInvocationHandler) handlers.remove(subsystem.toUpperCase());
   }

   /**
    * get a ServerInvocationHandler for a given subsystem type
    *
    * @param subsystem
    * @return
    */
   public synchronized ServerInvocationHandler getInvocationHandler(String subsystem)
   {
      return (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
   }

   public Object invoke(Object invoke) throws IOException
   {
      InvocationRequest request = null;
      InvocationResponse response = null;

      if(log.isTraceEnabled())
      {
         log.trace("server received invocation =>" + invoke);
      }

      if(invoke != null && invoke instanceof InvocationRequest)
      {
         request = (InvocationRequest) invoke;
         try
         {
            // first check to see if this is a is alive ping
            if("$PING$".equals(request.getParameter()))
            {
               // if this is an invocation ping, just pong back
               return new InvocationResponse(request.getSessionId(), Boolean.TRUE, false, null);
            }

            Object result = invoke(request);

            response = new InvocationResponse(request.getSessionId(),
                                              result, false, request.getReturnPayload());

         }
         catch(Throwable throwable)
         {
            if(log.isDebugEnabled())
            {
               throwable.printStackTrace();
            }
            response = new InvocationResponse(request.getSessionId(),
                                              throwable, true, request.getReturnPayload());
         }
      }
      else
      {
         log.error("server invoker received " + invoke + " as invocation.  Must not be null and must be of type InvocationRequest.");
         response = new InvocationResponse(request.getSessionId(),
                                           new Exception("Error processing invocation request on " + getLocator() + ".  Either invocation was null or of wrong type."),
                                           true, request.getReturnPayload());
      }
      return response;

   }

   /**
    * Will get the data type for the marshaller factory so know which marshaller to
    * get to marshal the data.  Will first check the locator uri for a 'datatype'
    * parameter and take that value if it exists.  Otherwise, will use the
    * default datatype for the client invoker, based on transport.
    *
    * @return
    */
   public String getDataType()
   {
      if(dataType == null)
      {
         dataType = getDataType(getLocator());
         if(dataType == null)
         {
            dataType = getDefaultDataType();
         }
      }
      return dataType;
   }

   //TODO: -TME getting of datatype is duplicated in both the RemoteClientInvoker and the ServerInvoker
   private String getDataType(InvokerLocator locator)
   {
      String type = null;

      if(locator != null)
      {
         Map params = locator.getParameters();
         if(params != null)
         {
            type = (String) params.get(InvokerLocator.DATATYPE);
         }
      }
      return type;
   }

   protected abstract String getDefaultDataType();

   /**
    * Processes invocation request depending on the invocation type (internal, name based, oneway, etc).
    * Can be called on directly when client and server are local to one another (by-passing serialization)
    *
    * @param invocation
    * @return
    * @throws Throwable
    */
   public Object invoke(InvocationRequest invocation) throws Throwable
   {
      Object param = invocation.getParameter();
      Object result = null;
      //TODO: -TME both oneway and internal invocation will be broken since have not
      // deserialized the para yet (removed ClassUtil.deserialize() so would let handler do it).
      if(param instanceof OnewayInvocation)
      {
         handleOnewayInvocation((OnewayInvocation) param, invocation);
      }
      else // no point in delaying return to client if oneway
      {
         String subsystem = invocation.getSubsystem();
         InvokerLocator client = invocation.getLocator();

         // too bad we can't optimize this a little better, since we take a lookup hit for
         // each invocation -JGH
         ServerInvocationHandler handler = null;
         if(subsystem != null)
         {
            handler = (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
         }
         else
         {
            // subsystem not specified, so will hope for a default one being set
            if(!handlers.isEmpty())
            {
               handler = (ServerInvocationHandler) handlers.values().iterator().next();
            }
            else
            {
               throw new RuntimeException("Can not find a subsystem handler to take invocation request.");
            }
         }
         if(handler == null)
         {
            throw new SubsystemNotSupported(subsystem, locator);
         }
         if(param instanceof InternalInvocation)
         {
            result = handleInternalInvocation((InternalInvocation) param, invocation, handler);

         }
         else
         {
            if(log.isTraceEnabled())
            {
               log.trace("dispatching invocation: " + invocation + " to subsystem: " + subsystem + " from client: " + client);
            }

            result = handler.invoke(invocation);
         }
         if(log.isTraceEnabled())
         {
            log.trace("dispatch invocation, returning back: " + result + " from subsystem: " + subsystem +
                      " to client: " + client);
         }
      }


      return result;
   }

   /**
    * Takes the real invocation from the client out of the OnewayInvocation and then executes the invoke()
    * with the real invocation on a seperate thread.
    *
    * @param onewayInvocation
    * @param invocation
    * @throws Throwable
    */
   private void handleOnewayInvocation(OnewayInvocation onewayInvocation, InvocationRequest invocation) throws Throwable
   {
      Object[] objs = onewayInvocation.getParameters();
      // The oneway invocation should contain the real param as it's only param in parameter array
      Object realParam = objs[0];
      invocation.setParameter(realParam);
      final InvocationRequest newInvocation = invocation;

      Executor executor = getOnewayExecutor();
      Runnable onewayRun = new Runnable()
      {
         public void run()
         {
            try
            {
               invoke(newInvocation);
            }
            catch(Throwable e)
            {
               // throw away exception since can't get it back to original caller
               log.error("Error executing server oneway invocation request: " + newInvocation, e);
            }
         }
      };
      executor.execute(onewayRun);
   }

   /**
    * Handles both internal and external invocations (internal meaning only
    * to be used within remoting and external for ones that go to handlers.
    *
    * @param param
    * @param invocation
    * @param handler
    * @return
    * @throws Throwable
    */
   private Object handleInternalInvocation(InternalInvocation param,
                                           InvocationRequest invocation,
                                           ServerInvocationHandler handler)
         throws Throwable
   {
      Object result = null;
      String methodName = param.getMethodName();
      if(log.isTraceEnabled())
      {
         log.trace("handling InternalInvocation where method name = " + methodName);
      }
      // check if the invocation is for callback handling
      if(InternalInvocation.ADDLISTENER.equals(methodName))
      {
         InvokerCallbackHandler callbackHandler = getCallbackHandler(invocation);
         handler.addListener(callbackHandler);
      }
      else if(InternalInvocation.REMOVELISTENER.equals(methodName))
      {
         ServerInvokerCallbackHandler callbackHandler = removeCallbackHandler(invocation);
         if(callbackHandler != null)
         {
            handler.removeListener(callbackHandler);
            if(log.isTraceEnabled())
            {
               log.trace("ServerInvoker (" + this + ") removing server callback handler " + callbackHandler + ".");
            }

            callbackHandler.destroy();
         }
      }
      else if(InternalInvocation.GETCALLBACKS.equals(methodName))
      {
         ServerInvokerCallbackHandler callbackHandler = getCallbackHandler(invocation);
         if(log.isTraceEnabled())
         {
            log.trace("ServerInvoker (" + this + ") getting callbacks for callback handler " + callbackHandler + ".");
         }

         result = callbackHandler.getCallbacks();
      }
      else if(InternalInvocation.ADDCLIENTLISTENER.equals(methodName))
      {
         String sessionId = invocation.getSessionId();
         Object[] params = param.getParameters();
         // the only elements should be the callback handler and possibly the callback handle object
         if(params == null || params.length < 0 || params.length > 3)
         {
            log.error("Recieved addClientListener InternalInvocation, but getParameters() " +
                      "returned: " + params);
            throw new RuntimeException("InvokerCallbackHandler and callback handle object (optional) must be supplied as the only " +
                                       "parameter objects within the InternalInvocation when " +
                                       "calling addClientListener.");
         }
         InvokerCallbackHandler callbackHandler = (InvokerCallbackHandler) params[0];
         Object callbackHandleObject = params[1];
         CallbackContainer callbackContainer = new CallbackContainer(callbackHandler, callbackHandleObject);
         clientCallbackListener.put(sessionId, callbackContainer);
         if(log.isDebugEnabled())
         {
            log.debug("ServerInvoker (" + this + ") added client callback handler " + callbackHandler + " with session id of " + sessionId +
                      " and callback handle object of " + callbackHandleObject + ".");
         }

      }
      else if(InternalInvocation.REMOVECLIENTLISTENER.equals(methodName))
      {
         String sessionId = invocation.getSessionId();
         clientCallbackListener.remove(sessionId);
         if(log.isDebugEnabled())
         {
            log.debug("ServerInvoker (" + this + ") removing client callback handler with session id of " + sessionId + ".");
         }
      }
      else if(InternalInvocation.HANDLECALLBACK.equals(methodName))
      {
         String sessionId = invocation.getSessionId();
         if(log.isTraceEnabled())
         {
            log.trace("ServerInvoker (" + this + ") is being asked to deliver callback on client callback handler with session id of " + sessionId + ".");
         }
         CallbackContainer callbackContainer = (CallbackContainer) clientCallbackListener.get(sessionId);
         if(callbackContainer != null && callbackContainer.getCallbackHandler() != null)
         {
            Object[] params = param.getParameters();
            InvocationRequest callbackRequest = (InvocationRequest) params[0];
            //TODO: -TME (JBREM-57) this needs to be changed to be
            // Callback type instead of InvocationRequest.  But for regression, will use
            // InvocationRequest.
            Map callbackHandleObject = callbackRequest.getReturnPayload();
            if(callbackHandleObject == null)
            {
               callbackHandleObject = new HashMap();
            }
            callbackHandleObject.put(Callback.CALLBACK_HANDLE_OBJECT_KEY, callbackContainer.getCallbackHandleObject());
            callbackRequest.setReturnPayload(callbackHandleObject);
            InvokerCallbackHandler callbackHandler = callbackContainer.getCallbackHandler();
            callbackHandler.handleCallback(callbackRequest);
         }
         else
         {
            log.error("Could not find callback handler to call upon for handleCallback " +
                      "where session id equals " + sessionId);
         }


      }
      else
      {
         log.error("Error processing InternalInvocation.  Unable to process method " +
                   methodName + ".  Please make sure this should be an InternalInvocation.");
         throw new RuntimeException("Error processing InternalInvocation.  Unable to process method " +
                                    methodName);
      }
      return result;
   }

   //TODO: Do we really want to keep track of callback handlers here as well as
   // in the handler?  Seems like duplicate effort.  Might should just leave up
   // to handler to figure out duplicates and what to do with them? -TME
   private ServerInvokerCallbackHandler getCallbackHandler(InvocationRequest invocation) throws Exception
   {
      ServerInvokerCallbackHandler callbackHandler = null;
      String id = ServerInvokerCallbackHandler.getId(invocation);

      synchronized(callbackHandlers)
      {
         callbackHandler = (ServerInvokerCallbackHandler) callbackHandlers.get(id);
         // if does not exist, create it
         if(callbackHandler == null)
         {
            callbackHandler = new ServerInvokerCallbackHandler(invocation, getLocator(), this);
            callbackHandlers.put(id, callbackHandler);
         }
      }
      if(log.isTraceEnabled())
      {
         log.trace("ServerInvoker (" + this + ") adding server callback handler " + callbackHandler + " with id of " + id + ".");
      }

      return callbackHandler;
   }

   private ServerInvokerCallbackHandler removeCallbackHandler(InvocationRequest invocation)
   {
      String id = ServerInvokerCallbackHandler.getId(invocation);
      ServerInvokerCallbackHandler callbackHandler = null;

      synchronized(callbackHandlers)
      {
         callbackHandler = (ServerInvokerCallbackHandler) callbackHandlers.remove(id);
      }
      return callbackHandler;
   }


   /**
    * called prior to an invocation
    *
    * @param sessionId
    * @param payload
    * @param locator
    * @todo is sending in the arg appropriate???
    */
   protected void preProcess(String sessionId, ClassBytes arg, Map payload, InvokerLocator locator)
   {
   }

   /**
    * called after an invocation
    *
    * @param sessionId
    * @param payload
    * @param locator
    * @todo is sending in the arg appropriate???
    */
   protected void postProcess(String sessionId, Object param, Map payload, InvokerLocator locator)
   {
   }

   /**
    * subclasses should override to provide any specific start logic
    *
    * @throws IOException
    */
   public void start() throws IOException
   {
      started = true;
   }

   /**
    * return true if the server invoker is started, false if not
    *
    * @return
    */
   public boolean isStarted()
   {
      return started;
   }

   /**
    * subclasses should override to provide any specific stop logic
    */
   public void stop()
   {
      started = false;
   }

   /**
    * destory the invoker permanently
    */
   public void destroy ()
   {
      if(classbyteloader != null)
      {
         classbyteloader.destroy();
      }
   }

   /**
    * Sets the server invoker's transport specific configuration.  Will need to set before calling
    * start() method (or at least stop() and start() again) before configurations will take affect.
    *
    * @param configuration
    */
   public void setConfigration(Map configuration)
   {
      this.configuration = configuration;
   }

   /**
    * Gets teh server invoker's transport specific configuration.
    *
    * @return
    */
   public Map getConfiguration()
   {
      return configuration;
   }

   /**
    * Returns the String for the object name to be used for the invoker.
    * @return
    */
   public abstract String getMBeanObjectName();

   private class CallbackContainer
   {
      private InvokerCallbackHandler handler;
      private Object handleObject;

      public CallbackContainer(InvokerCallbackHandler handler, Object handleObject)
      {
         this.handler = handler;
         this.handleObject = handleObject;
      }

      public InvokerCallbackHandler getCallbackHandler()
      {
         return handler;
      }

      public Object getCallbackHandleObject()
      {
         return handleObject;
      }
   }
}