package org.jboss.mq.il.oil2;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.rmi.RemoteException;
import java.util.Properties;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.naming.InitialContext;
import javax.net.ServerSocketFactory;
import org.jboss.logging.Logger;
import org.jboss.mq.AcknowledgementRequest;
import org.jboss.mq.ConnectionToken;
import org.jboss.mq.DurableSubscriptionID;
import org.jboss.mq.SpyDestination;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.Subscription;
import org.jboss.mq.TransactionRequest;
import org.jboss.mq.il.Invoker;
import org.jboss.mq.il.ServerIL;
import org.jboss.security.SecurityDomain;
public final class OIL2ServerILService
extends org.jboss.mq.il.ServerILJMXService
implements java.lang.Runnable, OIL2ServerILServiceMBean
{
final static private Logger log = Logger.getLogger(OIL2ServerILService.class);
private final static int SO_TIMEOUT = 5000;
private Invoker server;
private boolean enableTcpNoDelay = false;
private String securityDomain;
private String clientSocketFactoryName;
private ServerSocketFactory serverSocketFactory;
private ServerSocket serverSocket;
private OIL2ServerIL serverIL;
private volatile boolean running;
private int serverBindPort = 0;
private InetAddress bindAddress = null;
private Properties connectionProperties;
public class RequestListner implements OIL2RequestListner
{
Socket socket;
ObjectInputStream in;
ObjectOutputStream out;
OIL2SocketHandler socketHandler;
ConnectionToken connectionToken;
boolean closing = false;
RequestListner(Socket socket) throws IOException
{
socket.setSoTimeout(0);
socket.setTcpNoDelay(enableTcpNoDelay);
out = new ObjectOutputStream(new BufferedOutputStream(socket.getOutputStream()));
out.flush();
in = new ObjectInputStream(new BufferedInputStream(socket.getInputStream()));
}
public void handleRequest(OIL2Request request)
{
if (closing)
{
log.trace("A connection that is closing received another request. Droping request.");
return;
}
Object result = null;
Exception resultException = null;
try
{
switch (request.operation)
{
case OIL2Constants.SERVER_SET_SPY_DISTRIBUTED_CONNECTION :
connectionToken = (ConnectionToken) request.arguments[0];
((OIL2ClientIL) connectionToken.clientIL).setRequestListner(this);
break;
case OIL2Constants.SERVER_ACKNOWLEDGE :
server.acknowledge(connectionToken, (AcknowledgementRequest) request.arguments[0]);
break;
case OIL2Constants.SERVER_ADD_MESSAGE :
server.addMessage(connectionToken, (SpyMessage) request.arguments[0]);
break;
case OIL2Constants.SERVER_BROWSE :
result =
server.browse(connectionToken, (Destination) request.arguments[0], (String) request.arguments[1]);
break;
case OIL2Constants.SERVER_CHECK_ID :
server.checkID((String) request.arguments[0]);
if (connectionToken != null)
connectionToken.setClientID((String) request.arguments[0]);
break;
case OIL2Constants.SERVER_CONNECTION_CLOSING :
beginClose();
break;
case OIL2Constants.SERVER_CREATE_QUEUE :
result = server.createQueue(connectionToken, (String) request.arguments[0]);
break;
case OIL2Constants.SERVER_CREATE_TOPIC :
result = server.createTopic(connectionToken, (String) request.arguments[0]);
break;
case OIL2Constants.SERVER_DELETE_TEMPORARY_DESTINATION :
server.deleteTemporaryDestination(connectionToken, (SpyDestination) request.arguments[0]);
break;
case OIL2Constants.SERVER_GET_ID :
result = server.getID();
if (connectionToken != null)
connectionToken.setClientID((String) result);
break;
case OIL2Constants.SERVER_GET_TEMPORARY_QUEUE :
result = server.getTemporaryQueue(connectionToken);
break;
case OIL2Constants.SERVER_GET_TEMPORARY_TOPIC :
result = server.getTemporaryTopic(connectionToken);
break;
case OIL2Constants.SERVER_RECEIVE :
result =
server.receive(
connectionToken,
((Integer) request.arguments[0]).intValue(),
((Long) request.arguments[1]).longValue());
break;
case OIL2Constants.SERVER_SET_ENABLED :
server.setEnabled(connectionToken, ((Boolean) request.arguments[0]).booleanValue());
break;
case OIL2Constants.SERVER_SUBSCRIBE :
server.subscribe(connectionToken, (Subscription) request.arguments[0]);
break;
case OIL2Constants.SERVER_TRANSACT :
server.transact(connectionToken, (TransactionRequest) request.arguments[0]);
break;
case OIL2Constants.SERVER_UNSUBSCRIBE :
server.unsubscribe(connectionToken, ((Integer) request.arguments[0]).intValue());
break;
case OIL2Constants.SERVER_DESTROY_SUBSCRIPTION :
server.destroySubscription(connectionToken, (DurableSubscriptionID) request.arguments[0]);
break;
case OIL2Constants.SERVER_CHECK_USER :
result = server.checkUser((String) request.arguments[0], (String) request.arguments[1]);
break;
case OIL2Constants.SERVER_PING :
server.ping(connectionToken, ((Long) request.arguments[0]).longValue());
break;
case OIL2Constants.SERVER_AUTHENTICATE :
result = server.authenticate((String) request.arguments[0], (String) request.arguments[1]);
break;
default :
throw new RemoteException("Bad method code !");
} }
catch (Exception e)
{
resultException = e;
}
try
{
OIL2Response response = new OIL2Response(request);
response.result = result;
response.exception = resultException;
socketHandler.sendResponse(response);
}
catch (IOException e)
{
handleConnectionException(e);
}
}
public void handleConnectionException(Exception e)
{
if (!closing)
log.info("Client Disconnected: " + e);
beginClose();
}
void beginClose()
{
closing = true;
try
{
if (connectionToken != null)
server.connectionClosing(connectionToken);
}
catch (JMSException ignore)
{
}
finally
{
close();
}
}
void close()
{
try
{
if (socket != null)
{
socketHandler.stop();
in.close();
out.close();
socket.close();
socket = null;
}
}
catch (IOException e)
{
log.debug("Exception occured while closing opened resources: ", e);
}
}
public OIL2SocketHandler getSocketHandler()
{
return socketHandler;
}
}
public java.util.Properties getClientConnectionProperties()
{
return connectionProperties;
}
public String getName()
{
return "JBossMQ-OILServerIL";
}
public ServerIL getServerIL()
{
return serverIL;
}
public void run()
{
try
{
while (running)
{
Socket socket = null;
try
{
socket = serverSocket.accept();
if (log.isTraceEnabled())
log.trace("Accepted connection: " + socket);
}
catch (java.io.InterruptedIOException e)
{
continue;
}
if (!running)
{
if (socket != null)
{
try
{
socket.close();
}
catch (Exception ignore)
{
}
}
return;
}
try
{
if (log.isTraceEnabled())
log.trace("Initializing RequestListner for socket: " + socket);
RequestListner requestListner = new RequestListner(socket);
OIL2SocketHandler socketHandler =
new OIL2SocketHandler(requestListner.in, requestListner.out, Thread.currentThread().getThreadGroup());
requestListner.socketHandler = socketHandler;
socketHandler.setRequestListner(requestListner);
socketHandler.start();
}
catch (IOException ie)
{
log.debug("Client connection could not be accepted: ", ie);
}
}
}
catch (SocketException e)
{
if (running)
log.warn("SocketException occured (Connection reset by peer?). Cannot initialize the OIL2ServerILService.");
}
catch (IOException e)
{
if (running)
log.warn("IOException occured. Cannot initialize the OIL2ServerILService.");
}
catch (Throwable t)
{
log.warn("Unexpected error occured. Cannot initialize the OIL2ServerILService.", t);
}
try
{
serverSocket.close();
}
catch (Exception e)
{
log.debug("error closing server socket", e);
}
return;
}
public void startService() throws Exception
{
super.startService();
running = true;
this.server = lookupJMSServer();
if (serverSocketFactory == null)
serverSocketFactory = ServerSocketFactory.getDefault();
if (securityDomain != null)
{
try
{
InitialContext ctx = new InitialContext();
Class ssfClass = serverSocketFactory.getClass();
SecurityDomain domain = (SecurityDomain) ctx.lookup(securityDomain);
Class[] parameterTypes = { SecurityDomain.class };
Method m = ssfClass.getMethod("setSecurityDomain", parameterTypes);
Object[] args = { domain };
m.invoke(serverSocketFactory, args);
}
catch (NoSuchMethodException e)
{
log.error("Socket factory does not support setSecurityDomain(SecurityDomain)");
}
catch (Exception e)
{
log.error("Failed to setSecurityDomain=" + securityDomain + " on socket factory");
}
}
serverSocket = serverSocketFactory.createServerSocket(serverBindPort, 50, bindAddress);
serverSocket.setSoTimeout(SO_TIMEOUT);
InetAddress socketAddress = serverSocket.getInetAddress();
log.info("JBossMQ OIL2 service available at : " + socketAddress + ":" + serverSocket.getLocalPort());
new Thread(server.getThreadGroup(), this, "OIL2 Worker Server").start();
if (socketAddress.toString().equals("0.0.0.0/0.0.0.0"))
socketAddress = InetAddress.getLocalHost();
serverIL =
new OIL2ServerIL(
socketAddress.getHostAddress(),
serverSocket.getLocalPort(),
clientSocketFactoryName,
enableTcpNoDelay);
connectionProperties = super.getClientConnectionProperties();
connectionProperties.setProperty(
OIL2ServerILFactory.CLIENT_IL_SERVICE_KEY,
"org.jboss.mq.il.oil2.OIL2ClientILService");
connectionProperties.setProperty(OIL2ServerILFactory.OIL2_PORT_KEY, "" + serverSocket.getLocalPort());
connectionProperties.setProperty(OIL2ServerILFactory.OIL2_ADDRESS_KEY, "" + socketAddress.getHostAddress());
connectionProperties.setProperty(OIL2ServerILFactory.OIL2_TCPNODELAY_KEY, enableTcpNoDelay ? "yes" : "no");
bindJNDIReferences();
}
public void stopService()
{
try
{
unbindJNDIReferences();
}
catch (Exception e)
{
log.error("Exception unbinding from JNDI", e);
}
try
{
running = false;
if (serverSocket != null)
serverSocket.close();
}
catch (Exception e)
{
log.debug("Exception stopping server thread", e);
}
}
public int getServerBindPort()
{
return serverBindPort;
}
public void setServerBindPort(int serverBindPort)
{
this.serverBindPort = serverBindPort;
}
public String getBindAddress()
{
String addr = "0.0.0.0";
if (bindAddress != null)
addr = bindAddress.getHostName();
return addr;
}
public void setBindAddress(String host) throws UnknownHostException
{
if (host == null || host.length() == 0)
bindAddress = null;
else
bindAddress = InetAddress.getByName(host);
}
public boolean getEnableTcpNoDelay()
{
return enableTcpNoDelay;
}
public void setEnableTcpNoDelay(boolean enableTcpNoDelay)
{
this.enableTcpNoDelay = enableTcpNoDelay;
}
public String getClientSocketFactory()
{
return clientSocketFactoryName;
}
public void setClientSocketFactory(String name)
{
this.clientSocketFactoryName = name;
}
public void setServerSocketFactory(String name) throws Exception
{
ClassLoader loader = Thread.currentThread().getContextClassLoader();
Class ssfClass = loader.loadClass(name);
serverSocketFactory = (ServerSocketFactory) ssfClass.newInstance();
}
public String getServerSocketFactory()
{
String name = null;
if (serverSocketFactory != null)
name = serverSocketFactory.getClass().getName();
return name;
}
public void setSecurityDomain(String domainName)
{
this.securityDomain = domainName;
}
public String getSecurityDomain()
{
return this.securityDomain;
}
}