package org.jboss.mq.il.oil;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.rmi.RemoteException;
import org.jboss.mq.Connection;
import org.jboss.mq.ReceiveRequest;
import org.jboss.mq.SpyDestination;
public final class OILClientILService
implements java.lang.Runnable,
org.jboss.mq.il.ClientILService
{
private final static org.jboss.logging.Logger cat = org.jboss.logging.Logger.getLogger(OILClientILService.class);
private OILClientIL clientIL;
private Thread worker;
private Socket socket = null;
private Connection connection;
private boolean running;
private ServerSocket serverSocket;
private static int threadNumber= 0;
private boolean enableTcpNoDelay=false;
public org.jboss.mq.il.ClientIL getClientIL()
throws java.lang.Exception
{
return clientIL;
}
public void init(org.jboss.mq.Connection connection, java.util.Properties props)
throws java.lang.Exception
{
this.connection = connection;
serverSocket = new ServerSocket(0);
String t = props.getProperty(OILServerILFactory.OIL_TCPNODELAY_KEY);
if (t != null)
enableTcpNoDelay = t.equals("yes");
clientIL = new OILClientIL(java.net.InetAddress.getLocalHost(), serverSocket.getLocalPort(), enableTcpNoDelay);
}
public void run()
{
int code = 0;
ObjectOutputStream out = null;
ObjectInputStream in = null;
socket = null;
int serverPort = serverSocket.getLocalPort();
try
{
if( cat.isDebugEnabled() )
cat.debug("Waiting for the server to connect to me on port " +serverSocket.getLocalPort());
serverSocket.setSoTimeout(1000);
while (running && socket == null)
{
try
{
socket = serverSocket.accept();
}
catch (java.io.InterruptedIOException e)
{
continue;
}
catch (IOException e)
{
if (running)
connection.asynchFailure("Error accepting connection from server in OILClientILService.", e);
return; }
}
if(running)
{
socket.setTcpNoDelay(enableTcpNoDelay);
socket.setSoTimeout(0);
out = new ObjectOutputStream(new BufferedOutputStream(socket.getOutputStream()));
out.flush();
in = new ObjectInputStream(new BufferedInputStream(socket.getInputStream()));
}
else
{
return;
}
}
catch (IOException e)
{
connection.asynchFailure("Could not initialize the OILClientIL Service.", e);
return;
}
finally
{
try
{
serverSocket.close();
serverSocket = null;
}
catch (IOException e)
{
if(cat.isDebugEnabled())
cat.debug("run: an error occured closing the server socket", e);
}
}
while (running)
{
try
{
code = in.readByte();
}
catch (java.io.InterruptedIOException e)
{
continue;
}
catch (IOException e)
{
break;
}
try
{
switch (code)
{
case OILConstants.RECEIVE:
int numReceives = in.readInt();
org.jboss.mq.ReceiveRequest[] messages = new org.jboss.mq.ReceiveRequest[numReceives];
for (int i = 0; i < numReceives; ++i)
{
messages[i] = new ReceiveRequest();
messages[i].readExternal(in);
}
connection.asynchDeliver(messages);
break;
case OILConstants.DELETE_TEMPORARY_DESTINATION:
connection.asynchDeleteTemporaryDestination((SpyDestination)in.readObject());
break;
case OILConstants.CLOSE:
connection.asynchClose();
break;
case OILConstants.PONG:
connection.asynchPong(in.readLong());
break;
default:
throw new RemoteException("Bad method code !");
}
try
{
out.writeByte(OILConstants.SUCCESS);
out.flush();
}
catch (IOException e)
{
connection.asynchFailure("Connection failure(1)", e);
break; }
}
catch (Exception e)
{
if (!running)
{
break;
}
try
{
cat.error("Exception handling server request", e);
out.writeByte(OILConstants.EXCEPTION);
out.writeObject(e);
out.reset();
out.flush();
}
catch (IOException e2)
{
connection.asynchFailure("Connection failure(2)", e2);
break;
}
}
}
try
{
cat.debug("Closing receiver connections on port: " + serverPort);
out.close();
in.close();
socket.close();
socket = null;
}
catch (IOException e)
{
connection.asynchFailure("Connection failure", e);
}
running = false;
}
public void start()
throws java.lang.Exception
{
running = true;
worker = new Thread(Connection.getThreadGroup(), this, "OILClientILService-" +threadNumber++);
worker.setDaemon(true);
worker.start();
}
public void stop()
throws java.lang.Exception
{
cat.trace("Stop called on OILClientService");
running = false;
worker.interrupt();
}
}