| SocketServerInvoker.java |
/***************************************
* *
* JBoss: The OpenSource J2EE WebOS *
* *
* Distributable under LGPL license. *
* See terms of license at gnu.org. *
* *
***************************************/
package org.jboss.remoting.transport.socket;
import org.jboss.remoting.InvokerLocator;
import org.jboss.remoting.ServerInvoker;
import org.jboss.remoting.marshal.serializable.SerializableMarshaller;
import org.jboss.remoting.transport.PortUtil;
import org.jboss.util.propertyeditor.PropertyEditors;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
/**
* SocketServerInvoker is the server-side of a SOCKET based transport
*
* @author <a href="mailto:jhaynie@vocalocity.net">Jeff Haynie</a>
* @author <a href="mailto:tom.elrod@jboss.com">Tom Elrod</a>
* @version $Revision: 1.6.6.3 $
* @jmx:mbean
*/
public class SocketServerInvoker extends ServerInvoker implements Runnable, SocketServerInvokerMBean
{
private InetAddress addr;
private int port;
static int clientCount = 0;
private static int BACKLOG_DEFAULT = 200;
private static int MAX_POOL_SIZE_DEFAULT = 300;
protected ServerSocket serverSocket = null;
protected boolean running = false;
protected int backlog = BACKLOG_DEFAULT;
protected Thread[] acceptThreads;
protected int numAcceptThreads = 1;
protected int maxPoolSize = MAX_POOL_SIZE_DEFAULT;
protected LRUPool clientpool;
protected LinkedList threadpool;
protected int timeout = 60000; // 60 seconds.
/**
* The logging trace level flag
*/
protected boolean trace = false;
/**
* The port a client will use to connect to the sever.
*/
protected int clientConnectPort = 0;
/**
* The internet address client will use to connect to the sever.
*/
protected String clientConnectAddress = null;
/**
* The internet address to bind to by default.
*/
protected String serverBindAddress = null;
/**
* The server port to bind to.
*/
protected int serverBindPort = 0;
public SocketServerInvoker(InvokerLocator locator)
{
super(locator);
try
{
setup();
}
catch(Exception ex)
{
throw new RuntimeException(ex.getMessage());
}
}
public SocketServerInvoker(InvokerLocator locator, Map configuration)
{
super(locator, configuration);
try
{
setup();
}
catch(Exception ex)
{
throw new RuntimeException(ex.getMessage());
}
}
public InetAddress getAddress()
{
return addr;
}
public int getPort()
{
return port;
}
protected void setup()
throws Exception
{
Properties props = new Properties();
props.putAll(getConfiguration());
PropertyEditors.mapJavaBeanProperties(this, props, false);
this.addr = InetAddress.getByName(locator.getHost());
this.port = locator.getPort();
if(this.port <= 0)
{
this.port = PortUtil.findFreePort();
// re-write locator since the port is different
this.locator = new InvokerLocator(locator.getProtocol(), locator.getHost(), this.port, locator.getPath(), locator.getParameters());
}
// set the bind address
if(props.getProperty("serverBindAddress") != null)
{
serverBindAddress = props.getProperty("serverBindAddress");
}
else
{
if(props.getProperty("clientConnectAddress") != null)
{
// can't use uri address, as is for client only
serverBindAddress = InetAddress.getLocalHost().getHostAddress();
}
else
{
serverBindAddress = addr.getHostAddress();
}
}
// set the bind port
if(props.getProperty("serverBindPort") != null)
{
serverBindPort = Integer.parseInt(props.getProperty("serverBindPort"));
}
else
{
if(props.getProperty("clientConnectPort") != null)
{
// can't use uri port, as is for client only
serverBindPort = PortUtil.findFreePort();
}
else
{
serverBindPort = port;
}
}
}
protected void finalize() throws Throwable
{
stop();
super.finalize();
}
/**
* Starts the invoker.
*
* @jmx.managed-operation
* description = "Start sets up the ServerInvoker we are wrapping."
* impact = "ACTION"
*/
public synchronized void start() throws IOException
{
trace = log.isTraceEnabled();
if(!running)
{
running = true;
InetAddress bindAddress =
(serverBindAddress == null || serverBindAddress.length() == 0)
? null
: InetAddress.getByName(serverBindAddress);
clientConnectAddress =
(clientConnectAddress == null || clientConnectAddress.length() == 0)
? InetAddress.getLocalHost().getHostName()
: clientConnectAddress;
if(maxPoolSize <= 0)
{
//need to reset to default
maxPoolSize = MAX_POOL_SIZE_DEFAULT;
}
clientpool = new LRUPool(2, maxPoolSize);
clientpool.create();
threadpool = new LinkedList();
try
{
serverSocket = new ServerSocket(serverBindPort, backlog, bindAddress);
}
catch(IOException e)
{
log.error("Error starting ServerSocket. Bind port: " + serverBindPort + ", bind address: " + bindAddress);
throw e;
}
serverBindPort = serverSocket.getLocalPort();
clientConnectPort = (clientConnectPort == 0) ? serverSocket.getLocalPort() : clientConnectPort;
acceptThreads = new Thread[numAcceptThreads];
for(int i = 0; i < numAcceptThreads; i++)
{
String name = "SocketServerInvoker#" + i + "-" + serverBindPort;
acceptThreads[i] = new Thread(this, name);
acceptThreads[i].start();
}
}
super.start();
}
/**
* Stops the invoker.
*
* @jmx.managed-operation
* description = "Stops the invoker."
* impact = "ACTION"
*/
public synchronized void stop()
{
if(running)
{
running = false;
maxPoolSize = 0; // so ServerThreads don't reinsert themselves
for(int i = 0; i < acceptThreads.length; i++)
{
try
{
acceptThreads[i].interrupt();
}
catch(Exception ignored)
{
}
}
clientpool.flush();
for(int i = 0; i < threadpool.size(); i++)
{
ServerThread thread = (ServerThread) threadpool.removeFirst();
thread.shutdown();
}
try
{
serverSocket.close();
}
catch(Exception e)
{
}
}
super.stop();
}
public String getMBeanObjectName()
{
return "jboss.remoting:service=invoker,transport=socket";
}
/**
* Getter for property timeout
*
* @return Value of property timeout
* @jmx:managed-attribute
*/
public int getSocketTimeout()
{
return timeout;
}
/**
* Setter for property timeout
*
* @param time New value of property timeout
* @jmx:managed-attribute
*/
public void setSocketTimeout(int time)
{
this.timeout = time;
}
/**
* @return Value of property serverBindPort.
* @jmx:managed-attribute
*/
public int getCurrentThreadPoolSize()
{
return threadpool.size();
}
/**
* @return Value of property serverBindPort.
* @jmx:managed-attribute
*/
public int getCurrentClientPoolSize()
{
return clientpool.size();
}
/**
* @jmx:managed-attribute
*/
public String getClientConnectAddress()
{
return clientConnectAddress;
}
/**
* This method should only be called by the service controller when this invoker is
* specified within the Connector configuration of a service xml. Calling this directly
* will have no effect, as will be used in building the locator uri that is published
* for detection and this happens when the invoker is first created and started (after that, no one
* will be aware of a change).
*
* @jmx:managed-attribute
*/
public void setClientConnectAddress(String clientConnectAddress)
{
this.clientConnectAddress = clientConnectAddress;
}
/**
* Getter for property numAcceptThreads
*
* @return The number of threads that exist for accepting client connections
* @jmx:managed-attribute
*/
public int getNumAcceptThreads()
{
return numAcceptThreads;
}
/**
* Setter for property numAcceptThreads
*
* @param size The number of threads that exist for accepting client connections
* @jmx:managed-attribute
*/
public void setNumAcceptThreads(int size)
{
this.numAcceptThreads = size;
}
/**
* Setter for max pool size.
* The number of server threads for processing client. The default is 300.
* @jmx:managed-attribute
* @return
*/
public int getMaxPoolSize()
{
return maxPoolSize;
}
/**
* The number of server threads for processing client. The default is 300.
* @jmx:managed-attribute
* @param maxPoolSize
*/
public void setMaxPoolSize(int maxPoolSize)
{
this.maxPoolSize = maxPoolSize;
}
/**
* @jmx:managed-attribute
*/
public String getServerBindAddress()
{
return serverBindAddress;
}
/**
* @jmx:managed-attribute
*/
public void setServerBindAddress(String serverBindAddress)
{
this.serverBindAddress = serverBindAddress;
}
/**
* Getter for property serverBindPort.
*
* @return Value of property serverBindPort.
* @jmx:managed-attribute
*/
public int getServerBindPort()
{
return serverBindPort;
}
/**
* Setter for property serverBindPort.
*
* @param serverBindPort New value of property serverBindPort.
* @jmx:managed-attribute
*/
public void setServerBindPort(int serverBindPort)
{
this.serverBindPort = serverBindPort;
}
/**
* @jmx:managed-attribute
*/
public int getBacklog()
{
return backlog;
}
/**
* @jmx:managed-attribute
*/
public void setBacklog(int backlog)
{
if(backlog < 0)
{
this.backlog = BACKLOG_DEFAULT;
}
else
{
this.backlog = backlog;
}
}
public void run()
{
while(running)
{
try
{
Socket socket = serverSocket.accept();
if(trace)
{
log.trace("Accepted: " + socket);
}
ServerThread thread = null;
boolean newThread = false;
while(thread == null)
{
synchronized(threadpool)
{
if(threadpool.size() > 0)
{
thread = (ServerThread) threadpool.removeFirst();
}
}
if(thread == null)
{
synchronized(clientpool)
{
if(clientpool.size() < maxPoolSize)
{
thread = new ServerThread(socket, this, clientpool, threadpool, timeout);
newThread = true;
}
if(thread == null)
{
clientpool.evict();
if(trace)
{
log.trace("Waiting for a thread...");
}
clientpool.wait();
if(trace)
{
log.trace("Notified of available thread");
}
}
}
}
}
synchronized(clientpool)
{
clientpool.insert(thread, thread);
}
if(newThread)
{
if(trace)
{
log.trace("Created a new thread, t=" + thread);
}
thread.start();
}
else
{
if(trace)
{
log.trace("Reusing thread t=" + thread);
}
thread.wakeup(socket, timeout);
}
}
catch(Throwable ex)
{
if(running)
{
log.error("Failed to accept socket connection", ex);
}
}
}
}
/**
* returns true if the transport is bi-directional in nature, for example,
* SOAP in unidirectional and SOCKETs are bi-directional (unless behind a firewall
* for example).
*
* @return
*/
public boolean isTransportBiDirectional()
{
return true;
}
/**
* Each implementation of the remote client invoker should have
* a default data type that is uses in the case it is not specified
* in the invoker locator uri.
*
* @return
*/
protected String getDefaultDataType()
{
return SerializableMarshaller.DATATYPE;
}
}
| SocketServerInvoker.java |