package org.jboss.invocation.pooled.server;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.LinkedList;
import java.security.PrivilegedExceptionAction;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.io.IOException;
import javax.management.ObjectName;
import javax.naming.InitialContext;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.jboss.invocation.Invocation;
import org.jboss.invocation.pooled.interfaces.PooledInvokerProxy;
import org.jboss.invocation.pooled.interfaces.ServerAddress;
import org.jboss.invocation.pooled.interfaces.PooledMarshalledInvocation;
import org.jboss.logging.Logger;
import org.jboss.proxy.TransactionInterceptor;
import org.jboss.system.Registry;
import org.jboss.system.ServiceMBeanSupport;
import org.jboss.tm.TransactionPropagationContextFactory;
import org.jboss.tm.TransactionPropagationContextImporter;
import org.jboss.tm.TransactionPropagationContextUtil;
public class PooledInvoker extends ServiceMBeanSupport
implements PooledInvokerMBean, Runnable
{
final static protected Logger log = Logger.getLogger(PooledInvoker.class);
protected boolean enableTcpNoDelay = false;
protected String serverBindAddress = null;
protected int serverBindPort = 0;
protected String clientConnectAddress = null;
protected int clientConnectPort = 0;
protected int backlog = 200;
protected ServerSocket serverSocket = null;
protected int timeout = 60000;
protected int maxPoolSize = 300;
protected int clientMaxPoolSize = 300;
protected int numAcceptThreads = 1;
protected Thread[] acceptThreads;
protected LRUPool clientpool;
protected LinkedList threadpool;
protected boolean running = true;
protected boolean trace = false;
protected ObjectName transactionManagerService;
protected PooledInvokerProxy optimizedInvokerProxy = null;
private MBeanServerAction serverAction = new MBeanServerAction();
protected static TransactionPropagationContextFactory tpcFactory;
protected static TransactionPropagationContextImporter tpcImporter;
protected void jmxBind()
{
Registry.bind(getServiceName(), optimizedInvokerProxy);
}
public void startService() throws Exception
{
trace = log.isTraceEnabled();
InitialContext ctx = new InitialContext();
tpcFactory = TransactionPropagationContextUtil.getTPCFactory();
tpcImporter = TransactionPropagationContextUtil.getTPCImporter();
TransactionInterceptor.setTransactionManager((TransactionManager)ctx.lookup("java:/TransactionManager"));
InetAddress bindAddress =
(serverBindAddress == null || serverBindAddress.length() == 0)
? null
: InetAddress.getByName(serverBindAddress);
clientConnectAddress =
(clientConnectAddress == null || clientConnectAddress.length() == 0)
? InetAddress.getLocalHost().getHostName()
: clientConnectAddress;
clientpool = new LRUPool(2, maxPoolSize);
clientpool.create();
threadpool = new LinkedList();
try
{
serverSocket = new ServerSocket(serverBindPort, backlog, bindAddress);
}catch( java.net.BindException be)
{
throw new Exception("Port "+serverBindPort+" is already in use",be);
}
serverBindPort = serverSocket.getLocalPort();
clientConnectPort = (clientConnectPort == 0) ? serverSocket.getLocalPort() : clientConnectPort;
ServerAddress sa = new ServerAddress(clientConnectAddress, clientConnectPort, enableTcpNoDelay, timeout);
optimizedInvokerProxy = new PooledInvokerProxy(sa, clientMaxPoolSize);
jmxBind();
log.debug("Bound invoker for JMX node");
ctx.close();
acceptThreads = new Thread[numAcceptThreads];
for (int i = 0; i < numAcceptThreads; i++)
{
String name = "PooledInvokerAcceptor#"+i+"-"+serverBindPort;
acceptThreads[i] = new Thread(this, name);
acceptThreads[i].start();
}
}
public void run()
{
while (running)
{
try
{
Socket socket = serverSocket.accept();
if( trace )
log.trace("Accepted: "+socket);
ServerThread thread = null;
boolean newThread = false;
while (thread == null)
{
synchronized(threadpool)
{
if (threadpool.size() > 0)
{
thread = (ServerThread)threadpool.removeFirst();
}
}
if (thread == null)
{
synchronized(clientpool)
{
if (clientpool.size() < maxPoolSize)
{
thread = new ServerThread(socket, this, clientpool, threadpool, timeout);
newThread = true;
}
if (thread == null)
{
clientpool.evict();
if( trace )
log.trace("Waiting for a thread...");
clientpool.wait();
if( trace )
log.trace("Notified of available thread");
}
}
}
}
synchronized(clientpool)
{
clientpool.insert(thread, thread);
}
if (newThread)
{
if( trace )
log.trace("Created a new thread, t="+thread);
thread.start();
}
else
{
if( trace )
log.trace("Reusing thread t="+thread);
thread.wakeup(socket, timeout);
}
}
catch (Throwable ex)
{
if (running)
log.error("Failed to accept socket connection", ex);
}
}
}
public void stopService() throws Exception
{
running = false;
maxPoolSize = 0; for (int i = 0; i < acceptThreads.length; i++)
{
try
{
acceptThreads[i].interrupt();
}
catch (Exception ignored){}
}
clientpool.flush();
for (int i = 0; i < threadpool.size(); i++)
{
ServerThread thread = (ServerThread)threadpool.removeFirst();
thread.shutdown();
}
try
{
serverSocket.close();
}
catch(Exception e)
{
}
}
protected void destroyService() throws Exception
{
Registry.unbind(getServiceName());
}
public Object invoke(Invocation invocation) throws Exception
{
Thread currentThread = Thread.currentThread();
ClassLoader oldCl = currentThread.getContextClassLoader();
try
{
PooledMarshalledInvocation mi = (PooledMarshalledInvocation) invocation;
invocation.setTransaction(importTPC(mi.getTransactionPropagationContext()));
ObjectName mbean = (ObjectName) Registry.lookup(invocation.getObjectName());
Object obj = serverAction.invoke(mbean, "invoke",
new Object[] { invocation }, Invocation.INVOKE_SIGNATURE);
return obj;
}
catch (Exception e)
{
org.jboss.mx.util.JMXExceptionDecoder.rethrow(e);
throw new org.jboss.util.UnreachableStatementException();
}
finally
{
currentThread.setContextClassLoader(oldCl);
}
}
protected Transaction importTPC(Object tpc)
{
if (tpc != null)
return tpcImporter.importTransactionPropagationContext(tpc);
return null;
}
public int getNumAcceptThreads()
{
return numAcceptThreads;
}
public void setNumAcceptThreads(int size)
{
this.numAcceptThreads = size;
}
public int getMaxPoolSize()
{
return maxPoolSize;
}
public void setMaxPoolSize(int maxPoolSize)
{
this.maxPoolSize = maxPoolSize;
}
public int getClientMaxPoolSize()
{
return clientMaxPoolSize;
}
public void setClientMaxPoolSize(int clientMaxPoolSize)
{
this.clientMaxPoolSize = clientMaxPoolSize;
}
public int getSocketTimeout()
{
return timeout;
}
public void setSocketTimeout(int time)
{
this.timeout = time;
}
public int getCurrentClientPoolSize()
{
return clientpool.size();
}
public int getCurrentThreadPoolSize()
{
return threadpool.size();
}
public int getServerBindPort()
{
return serverBindPort;
}
public void setServerBindPort(int serverBindPort)
{
this.serverBindPort = serverBindPort;
}
public String getClientConnectAddress()
{
return clientConnectAddress;
}
public void setClientConnectAddress(String clientConnectAddress)
{
this.clientConnectAddress = clientConnectAddress;
}
public int getClientConnectPort()
{
return clientConnectPort;
}
public void setClientConnectPort(int clientConnectPort)
{
this.clientConnectPort = clientConnectPort;
}
public int getBacklog()
{
return backlog;
}
public void setBacklog(int backlog)
{
this.backlog = backlog;
}
public boolean isEnableTcpNoDelay()
{
return enableTcpNoDelay;
}
public void setEnableTcpNoDelay(boolean enableTcpNoDelay)
{
this.enableTcpNoDelay = enableTcpNoDelay;
}
public String getServerBindAddress()
{
return serverBindAddress;
}
public void setServerBindAddress(String serverBindAddress)
{
this.serverBindAddress = serverBindAddress;
}
public ObjectName getTransactionManagerService()
{
return transactionManagerService;
}
public void setTransactionManagerService(ObjectName transactionManagerService)
{
this.transactionManagerService = transactionManagerService;
}
public PooledInvokerProxy getOptimizedInvokerProxy()
{
return optimizedInvokerProxy;
}
class MBeanServerAction implements PrivilegedExceptionAction
{
private ObjectName target;
String method;
Object[] args;
String[] sig;
MBeanServerAction()
{
}
MBeanServerAction(ObjectName target, String method, Object[] args, String[] sig)
{
this.target = target;
this.method = method;
this.args = args;
this.sig = sig;
}
public Object run() throws Exception
{
Object rtnValue = server.invoke(target, method, args, sig);
return rtnValue;
}
Object invoke(ObjectName target, String method, Object[] args, String[] sig)
throws Exception
{
SecurityManager sm = System.getSecurityManager();
Object rtnValue = null;
if( sm == null )
{
rtnValue = server.invoke(target, method, args, sig);
}
else
{
try
{
MBeanServerAction action = new MBeanServerAction(target, method, args, sig);
rtnValue = AccessController.doPrivileged(action);
}
catch (PrivilegedActionException e)
{
Exception ex = e.getException();
throw ex;
}
}
return rtnValue;
}
}
}