package org.jboss.remoting;
import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
import org.jboss.remoting.invocation.InternalInvocation;
import org.jboss.remoting.invocation.OnewayInvocation;
import org.jboss.remoting.loading.ClassBytes;
import javax.management.MBeanServer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public abstract class ServerInvoker extends AbstractInvoker implements ServerInvokerMBean
{
public static final int MAX_NUM_ONEWAY_THREADS = 100;
protected Map handlers = new HashMap();
protected Map callbackHandlers = new HashMap();
private Map clientCallbackListener = new HashMap();
private boolean started = false;
private static PooledExecutor onewayExecutor;
private static int onewayThreadCounter = 0;
private MBeanServer mbeanServer = null;
private Map configuration = new HashMap();
private String dataType;
public ServerInvoker(InvokerLocator locator)
{
super(locator);
Map params = locator.getParameters();
if(configuration != null && params != null)
{
configuration.putAll(locator.getParameters());
}
}
public ServerInvoker(InvokerLocator locator, Map configuration)
{
super(locator);
this.configuration = configuration;
Map locatorParams = locator.getParameters();
if(configuration != null && locatorParams != null)
{
configuration.putAll(locator.getParameters());
}
}
private synchronized static Executor getOnewayExecutor()
{
if(onewayExecutor == null)
{
onewayExecutor = new PooledExecutor(MAX_NUM_ONEWAY_THREADS);
onewayExecutor.setKeepAliveTime(3000);
onewayExecutor.waitWhenBlocked();
onewayExecutor.setThreadFactory(new ThreadFactory()
{
public Thread newThread(Runnable runnable)
{
return new Thread(runnable, "Remoting server oneway " + onewayThreadCounter++);
}
});
}
return onewayExecutor;
}
public MBeanServer getMBeanServer()
{
return mbeanServer;
}
public void setMBeanServer(MBeanServer server)
{
this.mbeanServer = server;
}
public synchronized boolean hasInvocationHandler(String subsystem)
{
return handlers.containsKey(subsystem);
}
public synchronized String[] getSupportedSubsystems()
{
String subsystems [] = new String[handlers.size()];
return (String[]) handlers.keySet().toArray(subsystems);
}
public synchronized ServerInvocationHandler[] getInvocationHandlers()
{
ServerInvocationHandler ih [] = new ServerInvocationHandler[handlers.size()];
return (ServerInvocationHandler[]) handlers.values().toArray(ih);
}
public synchronized void addInvocationHandler(String subsystem, ServerInvocationHandler handler)
{
handler.setInvoker(this);
handlers.put(subsystem.toUpperCase(), handler);
}
public synchronized ServerInvocationHandler removeInvocationHandler(String subsystem)
{
return (ServerInvocationHandler) handlers.remove(subsystem.toUpperCase());
}
public synchronized ServerInvocationHandler getInvocationHandler(String subsystem)
{
return (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
}
public Object invoke(Object invoke) throws IOException
{
InvocationRequest request = null;
InvocationResponse response = null;
if(log.isTraceEnabled())
{
log.trace("server received invocation =>" + invoke);
}
if(invoke != null && invoke instanceof InvocationRequest)
{
request = (InvocationRequest) invoke;
try
{
if("$PING$".equals(request.getParameter()))
{
return new InvocationResponse(request.getSessionId(), Boolean.TRUE, false, null);
}
Object result = invoke(request);
response = new InvocationResponse(request.getSessionId(),
result, false, request.getReturnPayload());
}
catch(Throwable throwable)
{
if(log.isDebugEnabled())
{
throwable.printStackTrace();
}
response = new InvocationResponse(request.getSessionId(),
throwable, true, request.getReturnPayload());
}
}
else
{
log.error("server invoker received " + invoke + " as invocation. Must not be null and must be of type InvocationRequest.");
response = new InvocationResponse(request.getSessionId(),
new Exception("Error processing invocation request on " + getLocator() + ". Either invocation was null or of wrong type."),
true, request.getReturnPayload());
}
return response;
}
public String getDataType()
{
if(dataType == null)
{
dataType = getDataType(getLocator());
if(dataType == null)
{
dataType = getDefaultDataType();
}
}
return dataType;
}
private String getDataType(InvokerLocator locator)
{
String type = null;
if(locator != null)
{
Map params = locator.getParameters();
if(params != null)
{
type = (String) params.get(InvokerLocator.DATATYPE);
}
}
return type;
}
protected abstract String getDefaultDataType();
public Object invoke(InvocationRequest invocation) throws Throwable
{
Object param = invocation.getParameter();
Object result = null;
if(param instanceof OnewayInvocation)
{
handleOnewayInvocation((OnewayInvocation) param, invocation);
}
else {
String subsystem = invocation.getSubsystem();
InvokerLocator client = invocation.getLocator();
ServerInvocationHandler handler = null;
if(subsystem != null)
{
handler = (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
}
else
{
if(!handlers.isEmpty())
{
handler = (ServerInvocationHandler) handlers.values().iterator().next();
}
else
{
throw new RuntimeException("Can not find a subsystem handler to take invocation request.");
}
}
if(handler == null)
{
throw new SubsystemNotSupported(subsystem, locator);
}
if(param instanceof InternalInvocation)
{
result = handleInternalInvocation((InternalInvocation) param, invocation, handler);
}
else
{
if(log.isTraceEnabled())
{
log.trace("dispatching invocation: " + invocation + " to subsystem: " + subsystem + " from client: " + client);
}
result = handler.invoke(invocation);
}
if(log.isTraceEnabled())
{
log.trace("dispatch invocation, returning back: " + result + " from subsystem: " + subsystem +
" to client: " + client);
}
}
return result;
}
private void handleOnewayInvocation(OnewayInvocation onewayInvocation, InvocationRequest invocation) throws Throwable
{
Object[] objs = onewayInvocation.getParameters();
Object realParam = objs[0];
invocation.setParameter(realParam);
final InvocationRequest newInvocation = invocation;
Executor executor = getOnewayExecutor();
Runnable onewayRun = new Runnable()
{
public void run()
{
try
{
invoke(newInvocation);
}
catch(Throwable e)
{
log.error("Error executing server oneway invocation request: " + newInvocation, e);
}
}
};
executor.execute(onewayRun);
}
private Object handleInternalInvocation(InternalInvocation param,
InvocationRequest invocation,
ServerInvocationHandler handler)
throws Throwable
{
Object result = null;
String methodName = param.getMethodName();
if(log.isTraceEnabled())
{
log.trace("handling InternalInvocation where method name = " + methodName);
}
if(InternalInvocation.ADDLISTENER.equals(methodName))
{
InvokerCallbackHandler callbackHandler = getCallbackHandler(invocation);
handler.addListener(callbackHandler);
}
else if(InternalInvocation.REMOVELISTENER.equals(methodName))
{
ServerInvokerCallbackHandler callbackHandler = removeCallbackHandler(invocation);
if(callbackHandler != null)
{
handler.removeListener(callbackHandler);
if(log.isTraceEnabled())
{
log.trace("ServerInvoker (" + this + ") removing server callback handler " + callbackHandler + ".");
}
callbackHandler.destroy();
}
}
else if(InternalInvocation.GETCALLBACKS.equals(methodName))
{
ServerInvokerCallbackHandler callbackHandler = getCallbackHandler(invocation);
if(log.isTraceEnabled())
{
log.trace("ServerInvoker (" + this + ") getting callbacks for callback handler " + callbackHandler + ".");
}
result = callbackHandler.getCallbacks();
}
else if(InternalInvocation.ADDCLIENTLISTENER.equals(methodName))
{
String sessionId = invocation.getSessionId();
Object[] params = param.getParameters();
if(params == null || params.length < 0 || params.length > 3)
{
log.error("Recieved addClientListener InternalInvocation, but getParameters() " +
"returned: " + params);
throw new RuntimeException("InvokerCallbackHandler and callback handle object (optional) must be supplied as the only " +
"parameter objects within the InternalInvocation when " +
"calling addClientListener.");
}
InvokerCallbackHandler callbackHandler = (InvokerCallbackHandler) params[0];
Object callbackHandleObject = params[1];
CallbackContainer callbackContainer = new CallbackContainer(callbackHandler, callbackHandleObject);
clientCallbackListener.put(sessionId, callbackContainer);
if(log.isDebugEnabled())
{
log.debug("ServerInvoker (" + this + ") added client callback handler " + callbackHandler + " with session id of " + sessionId +
" and callback handle object of " + callbackHandleObject + ".");
}
}
else if(InternalInvocation.REMOVECLIENTLISTENER.equals(methodName))
{
String sessionId = invocation.getSessionId();
clientCallbackListener.remove(sessionId);
if(log.isDebugEnabled())
{
log.debug("ServerInvoker (" + this + ") removing client callback handler with session id of " + sessionId + ".");
}
}
else if(InternalInvocation.HANDLECALLBACK.equals(methodName))
{
String sessionId = invocation.getSessionId();
if(log.isTraceEnabled())
{
log.trace("ServerInvoker (" + this + ") is being asked to deliver callback on client callback handler with session id of " + sessionId + ".");
}
CallbackContainer callbackContainer = (CallbackContainer) clientCallbackListener.get(sessionId);
if(callbackContainer != null && callbackContainer.getCallbackHandler() != null)
{
Object[] params = param.getParameters();
InvocationRequest callbackRequest = (InvocationRequest) params[0];
Map callbackHandleObject = callbackRequest.getReturnPayload();
if(callbackHandleObject == null)
{
callbackHandleObject = new HashMap();
}
callbackHandleObject.put(Callback.CALLBACK_HANDLE_OBJECT_KEY, callbackContainer.getCallbackHandleObject());
callbackRequest.setReturnPayload(callbackHandleObject);
InvokerCallbackHandler callbackHandler = callbackContainer.getCallbackHandler();
callbackHandler.handleCallback(callbackRequest);
}
else
{
log.error("Could not find callback handler to call upon for handleCallback " +
"where session id equals " + sessionId);
}
}
else
{
log.error("Error processing InternalInvocation. Unable to process method " +
methodName + ". Please make sure this should be an InternalInvocation.");
throw new RuntimeException("Error processing InternalInvocation. Unable to process method " +
methodName);
}
return result;
}
private ServerInvokerCallbackHandler getCallbackHandler(InvocationRequest invocation) throws Exception
{
ServerInvokerCallbackHandler callbackHandler = null;
String id = ServerInvokerCallbackHandler.getId(invocation);
synchronized(callbackHandlers)
{
callbackHandler = (ServerInvokerCallbackHandler) callbackHandlers.get(id);
if(callbackHandler == null)
{
callbackHandler = new ServerInvokerCallbackHandler(invocation, getLocator(), this);
callbackHandlers.put(id, callbackHandler);
}
}
if(log.isTraceEnabled())
{
log.trace("ServerInvoker (" + this + ") adding server callback handler " + callbackHandler + " with id of " + id + ".");
}
return callbackHandler;
}
private ServerInvokerCallbackHandler removeCallbackHandler(InvocationRequest invocation)
{
String id = ServerInvokerCallbackHandler.getId(invocation);
ServerInvokerCallbackHandler callbackHandler = null;
synchronized(callbackHandlers)
{
callbackHandler = (ServerInvokerCallbackHandler) callbackHandlers.remove(id);
}
return callbackHandler;
}
protected void preProcess(String sessionId, ClassBytes arg, Map payload, InvokerLocator locator)
{
}
protected void postProcess(String sessionId, Object param, Map payload, InvokerLocator locator)
{
}
public void start() throws IOException
{
started = true;
}
public boolean isStarted()
{
return started;
}
public void stop()
{
started = false;
}
public void destroy ()
{
if(classbyteloader != null)
{
classbyteloader.destroy();
}
}
public void setConfigration(Map configuration)
{
this.configuration = configuration;
}
public Map getConfiguration()
{
return configuration;
}
public abstract String getMBeanObjectName();
private class CallbackContainer
{
private InvokerCallbackHandler handler;
private Object handleObject;
public CallbackContainer(InvokerCallbackHandler handler, Object handleObject)
{
this.handler = handler;
this.handleObject = handleObject;
}
public InvokerCallbackHandler getCallbackHandler()
{
return handler;
}
public Object getCallbackHandleObject()
{
return handleObject;
}
}
}