package org.jboss.invocation.pooled.server;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.InterruptedIOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.LinkedList;
import org.jboss.invocation.Invocation;
import org.jboss.invocation.pooled.interfaces.OptimizedObjectInputStream;
import org.jboss.invocation.pooled.interfaces.OptimizedObjectOutputStream;
import org.jboss.logging.Logger;
public class ServerThread extends Thread
{
final static private Logger log = Logger.getLogger(ServerThread.class);
protected ObjectInputStream in;
protected ObjectOutputStream out;
protected Socket socket;
protected PooledInvoker invoker;
protected LRUPool clientpool;
protected LinkedList threadpool;
protected volatile boolean running = true;
protected volatile boolean handlingResponse = true; protected volatile boolean shutdown = false;
protected static int id = 0;
public static synchronized int nextID()
{
int nextID = id ++;
return nextID;
}
public ServerThread(Socket socket, PooledInvoker invoker, LRUPool clientpool,
LinkedList threadpool, int timeout) throws Exception
{
super("PooledInvokerThread-" + socket.getInetAddress().getHostAddress()+"-"+nextID());
this.socket = socket;
this.invoker = invoker;
this.clientpool = clientpool;
this.threadpool = threadpool;
socket.setSoTimeout(timeout);
}
public void shutdown()
{
shutdown = true;
running = false;
if (!handlingResponse)
{
try
{
this.interrupt();
Thread.interrupted(); }
catch (Exception ignored) {}
}
}
public void evict()
{
running = false;
if (!handlingResponse)
{
try
{
this.interrupt();
Thread.interrupted(); }
catch (Exception ignored) {}
}
}
public synchronized void wakeup(Socket socket, int timeout) throws Exception
{
this.socket = socket;
String name = "PooledInvokerThread-" + socket.getInetAddress().getHostAddress()+"-"+nextID();
super.setName(name);
socket.setSoTimeout(timeout);
running = true;
handlingResponse = true;
this.notify();
}
public void run()
{
try
{
while (true)
{
dorun();
if (shutdown)
{
synchronized (clientpool)
{
clientpool.remove(this);
}
return; }
else
{
synchronized (this)
{
synchronized(clientpool)
{
synchronized(threadpool)
{
clientpool.remove(this);
threadpool.add(this);
Thread.interrupted(); clientpool.notify();
}
}
log.debug("begin thread wait");
this.wait();
log.debug("WAKEUP in SERVER THREAD");
}
}
}
}
catch (Exception ignored)
{
log.debug("Exiting run on exception", ignored);
}
}
protected void acknowledge() throws Exception
{
byte ACK = in.readByte();
handlingResponse = true;
out.writeByte(ACK);
out.flush();
}
protected void processInvocation() throws Exception
{
handlingResponse = true;
Invocation invocation = (Invocation)in.readObject();
in.readObject(); Object response = null;
try
{
boolean interrupted = Thread.interrupted();
response = invoker.invoke(invocation);
}
catch (Exception ex)
{
response = ex;
}
Thread.interrupted(); out.writeObject(response);
out.reset();
out.writeObject(Boolean.TRUE);
out.flush();
out.reset();
handlingResponse = false;
}
protected void dorun()
{
log.debug("beginning dorun");
running = true;
handlingResponse = true;
try
{
BufferedOutputStream bos = new BufferedOutputStream(socket.getOutputStream());
out = new OptimizedObjectOutputStream(bos);
out.flush();
BufferedInputStream bis = new BufferedInputStream(socket.getInputStream());
in = new OptimizedObjectInputStream(bis);
}
catch (Exception e)
{
log.error("Failed to initialize", e);
}
try
{
processInvocation();
}
catch (Exception ex)
{
running = false;
}
while (running)
{
try
{
acknowledge();
processInvocation();
}
catch (InterruptedIOException e)
{
log.debug("socket timed out", e);
running = false;
}
catch (InterruptedException e)
{
log.debug("interrupted", e);
}
catch (Exception ex)
{
log.debug("failed", ex);
running = false;
}
Thread.interrupted();
}
try
{
if (in != null) in.close();
if (out != null) out.close();
}
catch (Exception ex)
{
}
try
{
socket.close();
}
catch (Exception ex)
{
log.error("Failed cleanup", ex);
}
socket = null;
in = null;
out = null;
}
}