package org.jboss.mq.il.oil;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.rmi.RemoteException;
import java.util.Properties;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.naming.InitialContext;
import javax.net.ServerSocketFactory;
import org.jboss.logging.Logger;
import org.jboss.mq.AcknowledgementRequest;
import org.jboss.mq.ConnectionToken;
import org.jboss.mq.DurableSubscriptionID;
import org.jboss.mq.SpyDestination;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.Subscription;
import org.jboss.mq.TransactionRequest;
import org.jboss.mq.il.Invoker;
import org.jboss.mq.il.ServerIL;
import org.jboss.security.SecurityDomain;
import org.jboss.system.server.ServerConfigUtil;
public final class OILServerILService
extends org.jboss.mq.il.ServerILJMXService
implements java.lang.Runnable, OILServerILServiceMBean
{
final static private Logger log = Logger.getLogger(OILServerILService.class);
private final static int SO_TIMEOUT = 5000;
private Invoker server;
private boolean enableTcpNoDelay = false;
private int readTimeout = 0;
private String securityDomain;
private String clientSocketFactoryName;
private ServerSocketFactory serverSocketFactory;
private ServerSocket serverSocket;
private OILServerIL serverIL;
private volatile boolean running;
private int serverBindPort = 0;
private InetAddress bindAddress = null;
private int threadNumber = 0;
private Properties connectionProperties;
private final class Client implements Runnable
{
private Socket sock;
private ObjectOutputStream out;
private ObjectInputStream in;
Client(Socket s) throws IOException
{
this.sock = s;
this.out = new ObjectOutputStream(new BufferedOutputStream(this.sock.getOutputStream()));
this.out.flush();
this.in = new ObjectInputStream(new BufferedInputStream(this.sock.getInputStream()));
sock.setTcpNoDelay(enableTcpNoDelay);
if (log.isTraceEnabled())
log.trace("Setting TcpNoDelay Option to:" + enableTcpNoDelay);
}
public void run()
{
int code = 0;
boolean closed = false;
ConnectionToken connectionToken = null;
while (!closed && running)
{
try
{
code = in.readByte();
}
catch (EOFException e)
{
break;
}
catch (IOException e)
{
if (closed || !running)
{
break;
}
log.warn("Connection failure (1).", e);
break;
}
try
{
Object result = null;
switch (code)
{
case OILConstants.SET_SPY_DISTRIBUTED_CONNECTION :
connectionToken = (ConnectionToken) in.readObject();
break;
case OILConstants.ACKNOWLEDGE :
AcknowledgementRequest ack = new AcknowledgementRequest();
ack.readExternal(in);
server.acknowledge(connectionToken, ack);
break;
case OILConstants.ADD_MESSAGE :
server.addMessage(connectionToken, SpyMessage.readMessage(in));
break;
case OILConstants.BROWSE :
result = server.browse(connectionToken, (Destination) in.readObject(), (String) in.readObject());
break;
case OILConstants.CHECK_ID :
String ID = (String) in.readObject();
server.checkID(ID);
if (connectionToken != null)
connectionToken.setClientID(ID);
break;
case OILConstants.CONNECTION_CLOSING :
server.connectionClosing(connectionToken);
closed = true;
break;
case OILConstants.CREATE_QUEUE :
result = server.createQueue(connectionToken, (String) in.readObject());
break;
case OILConstants.CREATE_TOPIC :
result = server.createTopic(connectionToken, (String) in.readObject());
break;
case OILConstants.DELETE_TEMPORARY_DESTINATION :
server.deleteTemporaryDestination(connectionToken, (SpyDestination) in.readObject());
break;
case OILConstants.GET_ID :
result = server.getID();
if (connectionToken != null)
connectionToken.setClientID((String) result);
break;
case OILConstants.GET_TEMPORARY_QUEUE :
result = server.getTemporaryQueue(connectionToken);
break;
case OILConstants.GET_TEMPORARY_TOPIC :
result = server.getTemporaryTopic(connectionToken);
break;
case OILConstants.RECEIVE :
result = server.receive(connectionToken, in.readInt(), in.readLong());
break;
case OILConstants.SET_ENABLED :
server.setEnabled(connectionToken, in.readBoolean());
break;
case OILConstants.SUBSCRIBE :
server.subscribe(connectionToken, (Subscription) in.readObject());
break;
case OILConstants.TRANSACT :
TransactionRequest trans = new TransactionRequest();
trans.readExternal(in);
server.transact(connectionToken, trans);
break;
case OILConstants.UNSUBSCRIBE :
server.unsubscribe(connectionToken, in.readInt());
break;
case OILConstants.DESTROY_SUBSCRIPTION :
server.destroySubscription(connectionToken, (DurableSubscriptionID) in.readObject());
break;
case OILConstants.CHECK_USER :
result = server.checkUser((String) in.readObject(), (String) in.readObject());
break;
case OILConstants.PING :
server.ping(connectionToken, in.readLong());
break;
case OILConstants.AUTHENTICATE :
result = server.authenticate((String) in.readObject(), (String) in.readObject());
break;
default :
throw new RemoteException("Bad method code !");
}
try
{
if (result == null)
{
out.writeByte(OILConstants.SUCCESS);
}
else
{
out.writeByte(OILConstants.SUCCESS_OBJECT);
out.writeObject(result);
out.reset();
}
out.flush();
}
catch (IOException e)
{
if (closed)
break;
log.warn("Connection failure (2).", e);
break;
}
}
catch (Exception e)
{
if (closed)
break;
log.warn("Client request resulted in a server exception: ", e);
try
{
out.writeByte(OILConstants.EXCEPTION);
out.writeObject(e);
out.reset();
out.flush();
}
catch (IOException e2)
{
if (closed)
break;
log.warn("Connection failure (3).", e);
break;
}
} }
try
{
if (!closed)
{
try
{
server.connectionClosing(connectionToken);
}
catch (JMSException e)
{
}
}
in.close();
out.close();
}
catch (IOException e)
{
log.warn("Connection failure during connection close.", e);
}
finally
{
try
{
sock.close();
}
catch (IOException e)
{
log.warn("Connection failure during connection close.", e);
}
}
} }
public java.util.Properties getClientConnectionProperties()
{
return connectionProperties;
}
public String getName()
{
return "JBossMQ-OILServerIL";
}
public ServerIL getServerIL()
{
return serverIL;
}
public void run()
{
try
{
while (running)
{
Socket socket = null;
try
{
socket = serverSocket.accept();
}
catch (java.io.InterruptedIOException e)
{
continue;
}
if (!running)
{
if (socket != null)
{
try
{
socket.close();
}
catch (Exception e)
{
}
}
return;
}
try
{
socket.setSoTimeout(readTimeout);
new Thread(new Client(socket), "OIL Worker-" + threadNumber++).start();
}
catch (IOException ie)
{
if (log.isDebugEnabled())
{
log.debug("IOException processing client connection", ie);
log.debug("Dropping client connection, server will not terminate");
}
}
}
}
catch (SocketException e)
{
if (running)
log.warn("SocketException occured (Connection reset by peer?). Cannot initialize the OILServerILService.");
}
catch (IOException e)
{
if (running)
log.warn("IOException occured. Cannot initialize the OILServerILService.", e);
}
catch (Throwable t)
{
log.warn("Unexpected error occured. Cannot initialize the OILServerILService.", t);
}
try
{
serverSocket.close();
}
catch (Exception e)
{
log.debug("error closing server socket", e);
}
return;
}
public void startService() throws Exception
{
super.startService();
running = true;
this.server = lookupJMSServer();
if (serverSocketFactory == null)
serverSocketFactory = ServerSocketFactory.getDefault();
if (securityDomain != null)
{
try
{
InitialContext ctx = new InitialContext();
Class ssfClass = serverSocketFactory.getClass();
SecurityDomain domain = (SecurityDomain) ctx.lookup(securityDomain);
Class[] parameterTypes = { SecurityDomain.class };
Method m = ssfClass.getMethod("setSecurityDomain", parameterTypes);
Object[] args = { domain };
m.invoke(serverSocketFactory, args);
}
catch (NoSuchMethodException e)
{
log.error("Socket factory does not support setSecurityDomain(SecurityDomain)");
}
catch (Exception e)
{
log.error("Failed to setSecurityDomain=" + securityDomain + " on socket factory");
}
}
serverSocket = serverSocketFactory.createServerSocket(serverBindPort, 50, bindAddress);
serverSocket.setSoTimeout(SO_TIMEOUT);
InetAddress socketAddress = serverSocket.getInetAddress();
if (log.isInfoEnabled())
log.info("JBossMQ OIL service available at : " + socketAddress + ":" + serverSocket.getLocalPort());
new Thread(server.getThreadGroup(), this, "OIL Worker Server").start();
socketAddress = ServerConfigUtil.fixRemoteAddress(socketAddress);
serverIL = new OILServerIL(socketAddress, serverSocket.getLocalPort(),
clientSocketFactoryName, enableTcpNoDelay);
connectionProperties = super.getClientConnectionProperties();
connectionProperties.setProperty(
OILServerILFactory.CLIENT_IL_SERVICE_KEY,
"org.jboss.mq.il.oil.OILClientILService");
connectionProperties.setProperty(OILServerILFactory.OIL_PORT_KEY, "" + serverSocket.getLocalPort());
connectionProperties.setProperty(OILServerILFactory.OIL_ADDRESS_KEY, "" + socketAddress.getHostAddress());
connectionProperties.setProperty(OILServerILFactory.OIL_TCPNODELAY_KEY, enableTcpNoDelay ? "yes" : "no");
bindJNDIReferences();
}
public void stopService()
{
try
{
unbindJNDIReferences();
}
catch (Exception e)
{
log.error("Exception unbinding from JNDI", e);
}
try
{
running = false;
if (serverSocket != null)
serverSocket.close();
}
catch (Exception e)
{
log.debug("Exception stopping server thread", e);
}
}
public int getServerBindPort()
{
return serverBindPort;
}
public void setServerBindPort(int serverBindPort)
{
this.serverBindPort = serverBindPort;
}
public String getBindAddress()
{
String addr = "0.0.0.0";
if (bindAddress != null)
addr = bindAddress.getHostName();
return addr;
}
public void setBindAddress(String host) throws UnknownHostException
{
if (host == null || host.length() == 0)
bindAddress = null;
else
bindAddress = InetAddress.getByName(host);
}
public boolean getEnableTcpNoDelay()
{
return enableTcpNoDelay;
}
public void setEnableTcpNoDelay(boolean enableTcpNoDelay)
{
this.enableTcpNoDelay = enableTcpNoDelay;
}
public int getReadTimeout()
{
return readTimeout;
}
public void setReadTimeout(int timeout)
{
this.readTimeout = timeout;
}
public String getClientSocketFactory()
{
return clientSocketFactoryName;
}
public void setClientSocketFactory(String name)
{
this.clientSocketFactoryName = name;
}
public void setServerSocketFactory(String name) throws Exception
{
ClassLoader loader = Thread.currentThread().getContextClassLoader();
Class ssfClass = loader.loadClass(name);
serverSocketFactory = (ServerSocketFactory) ssfClass.newInstance();
}
public String getServerSocketFactory()
{
String name = null;
if (serverSocketFactory != null)
name = serverSocketFactory.getClass().getName();
return name;
}
public void setSecurityDomain(String domainName)
{
this.securityDomain = domainName;
}
public String getSecurityDomain()
{
return this.securityDomain;
}
}