package org.jboss.remoting;
import org.jboss.logging.Logger;
import org.jboss.remoting.invocation.InternalInvocation;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ServerInvokerCallbackHandler implements InvokerCallbackHandler
{
private InvocationRequest invocation;
private Client callBackClient;
private ArrayList callbacks = new ArrayList();
private String sessionId;
private InvokerLocator serverLocator;
private SerializableStore callbackStore = null;
public static final String CALLBACK_STORE_KEY = "callbackStore";
public static final String CALLBACK_MEM_CEILING = "callbackMemCeiling";
private double memPercentCeiling = 20;
private static final Logger log = Logger.getLogger(ServerInvokerCallbackHandler.class);
public ServerInvokerCallbackHandler(InvocationRequest invocation, InvokerLocator serverLocator, ServerInvoker owner) throws Exception
{
if(invocation == null)
{
throw new Exception("Can not construct ServerInvokerCallbackHandler with null InvocationRequest.");
}
this.invocation = invocation;
this.serverLocator = serverLocator;
init(invocation, owner);
}
private void init(InvocationRequest invocation, ServerInvoker owner) throws Exception
{
sessionId = invocation.getSessionId();
if(invocation.getLocator() != null)
{
callBackClient = new Client(invocation.getLocator(), invocation.getSubsystem());
callBackClient.connect();
}
else
{
createCallbackStore(owner, sessionId);
}
if(log.isDebugEnabled())
{
log.debug("Session id for callback handler is " + sessionId);
}
}
public void setMemPercentCeiling(Double ceiling)
{
if(ceiling != null)
{
memPercentCeiling = ceiling.doubleValue();
}
}
public Double getMemPercentCeiling()
{
return new Double(memPercentCeiling);
}
private void createCallbackStore(ServerInvoker owner, String sessionId) throws Exception
{
Map config = owner.getConfiguration();
if(config != null)
{
String storeName = (String) config.get(CALLBACK_STORE_KEY);
if(storeName != null)
{
try
{
MBeanServer server = owner.getMBeanServer();
ObjectName storeObjectName = new ObjectName(storeName);
if(server != null)
{
callbackStore = (SerializableStore)
MBeanServerInvocationHandler.newProxyInstance(server,
storeObjectName,
SerializableStore.class,
false);
}
}
catch(Exception ex)
{
log.debug("Could not create callback store from the configration value given (" + storeName + ") as an MBean.");
if(log.isTraceEnabled())
{
log.trace("Error is: " + ex.getMessage(), ex);
}
callbackStore = null;
}
if(callbackStore == null)
{
try
{
Class storeClass = Class.forName(storeName);
callbackStore = (SerializableStore) storeClass.newInstance();
}
catch(Exception e)
{
log.debug("Could not create callback store from the configuration value given (" + storeName + ") as a fully qualified class name.");
if(log.isTraceEnabled())
{
log.trace("Error is: " + e.getMessage(), e);
}
}
}
}
}
if(callbackStore == null)
{
callbackStore = new NullCallbackStore();
}
else
{
Map storeConfig = new HashMap();
storeConfig.putAll(owner.getConfiguration());
String newFilePath = null;
String filePath = (String) storeConfig.get(CallbackStore.FILE_PATH_KEY);
if(filePath == null)
{
newFilePath = System.getProperty("jboss.server.data.dir", "data");
}
newFilePath = newFilePath + System.getProperty("file.separator") + "remoting" +
System.getProperty("file.separator") + sessionId;
storeConfig.put(CallbackStore.FILE_PATH_KEY, newFilePath);
callbackStore.setConfig(storeConfig);
}
callbackStore.create();
callbackStore.start();
configureMemCeiling(owner.getConfiguration());
}
private void configureMemCeiling(Map configuration)
{
if(configuration != null)
{
String ceiling = (String) configuration.get(CALLBACK_MEM_CEILING);
if(ceiling != null)
{
try
{
double newCeiling = Double.parseDouble(ceiling);
setMemPercentCeiling(new Double(newCeiling));
}
catch(NumberFormatException e)
{
log.warn("Found new store memory ceiling seting (" + ceiling + "), but can not convert to type double.", e);
}
}
}
}
public Client getCallbackClient()
{
return callBackClient;
}
public static String getId(InvocationRequest invocation)
{
String sessionId = invocation.getSessionId();
return sessionId;
}
public String getId()
{
return getId(invocation);
}
public List getCallbacks()
{
List callbackList = null;
synchronized(callbacks)
{
callbackList = (List) callbacks.clone();
callbacks.clear();
}
List persistedCallbacks = null;
try
{
persistedCallbacks = getPersistedCallbacks();
}
catch(IOException e)
{
log.error("Can not get persisted callbacks.", e);
throw new RuntimeException("Error getting callbacks", e);
}
callbackList.addAll(persistedCallbacks);
return callbackList;
}
private List getPersistedCallbacks() throws IOException
{
List callbacks = new ArrayList();
int size = callbackStore.size();
for(int x = 0; x < size; x++)
{
callbacks.add(callbackStore.getNext());
if(isMemLow())
{
new Thread()
{
public void run()
{
System.gc();
}
}.start();
break;
}
}
return callbacks;
}
public boolean isPullCallbackHandler()
{
return (callBackClient == null);
}
public void handleCallback(InvocationRequest callback)
throws HandleCallbackException
{
try
{
if(callBackClient == null)
{
if(shouldPersist())
{
try
{
persistCallback(callback);
callback = null;
new Thread()
{
public void run()
{
System.gc();
}
}.start();
}
catch(IOException e)
{
log.error("Unable to persist callback.", e);
throw new HandleCallbackException("Unable to persist callback and will not be able to deliver.", e);
}
}
else
{
synchronized(callbacks)
{
if(log.isDebugEnabled())
{
log.debug("pull callback. adding to callback list");
}
callbacks.add(callback);
}
}
}
else
{
try
{
if(!callBackClient.isConnected())
{
callBackClient.connect();
}
if(callBackClient.isConnected())
{
if(log.isDebugEnabled())
{
log.debug("push callback. Calling client now.");
}
if(callback != null)
{
Map returnPayload = callback.getReturnPayload();
if(returnPayload == null)
{
returnPayload = new HashMap();
}
returnPayload.put(Callback.SERVER_LOCATOR_KEY, serverLocator);
callback.setReturnPayload(returnPayload);
}
InternalInvocation internalInvocation = new InternalInvocation(InternalInvocation.HANDLECALLBACK,
new Object[]{callback});
callBackClient.setSessionId(sessionId);
callBackClient.invoke(internalInvocation,
callback.getRequestPayload());
}
else
{
log.error("Can not handle callback since can not connect to client invoker.");
throw new HandleCallbackException("Can not handle callback since can not connect to client invoker.");
}
}
catch(Throwable ex)
{
log.debug("Error dispatching callback to handler.", ex);
throw new HandleCallbackException("Error dispatching callback to handler.", ex);
}
}
}
catch(Throwable thr)
{
log.error("Error handling callback.", thr);
throw new HandleCallbackException("Error handling callback.", thr);
}
}
private void persistCallback(InvocationRequest callback) throws IOException
{
callbackStore.add(callback);
}
private boolean shouldPersist()
{
return isMemLow();
}
private boolean isMemLow()
{
Runtime runtime = Runtime.getRuntime();
long max = runtime.maxMemory();
long total = runtime.totalMemory();
long free = runtime.freeMemory();
float percentage = 100 * free / total;
if(max == total && memPercentCeiling >= percentage)
{
return true;
}
else
{
return false;
}
}
public String toString()
{
return getClass().getName() + " - id: " + getId();
}
public void destroy()
{
if(callBackClient != null)
{
callBackClient.disconnect();
callBackClient = null;
}
if(callbackStore != null)
{
callbackStore.purgeFiles();
}
}
}