| ServerThread.java |
/***************************************
* *
* JBoss: The OpenSource J2EE WebOS *
* *
* Distributable under LGPL license. *
* See terms of license at gnu.org. *
* *
***************************************/
package org.jboss.remoting.transport.socket;
import org.jboss.logging.Logger;
import org.jboss.remoting.marshal.MarshalFactory;
import org.jboss.remoting.marshal.Marshaller;
import org.jboss.remoting.marshal.UnMarshaller;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.LinkedList;
/**
* This Thread object hold a single Socket connection to a client
* and is kept alive until a timeout happens, or it is aged out of the
* SocketServerInvoker's LRU cache.
* <p/>
* There is also a separate thread pool that is used if the client disconnects.
* This thread/object is re-used in that scenario and that scenario only.
* <p/>
* This is a customization of the same ServerThread class used witht the PookedInvoker.
* The custimization was made to allow for remoting marshaller/unmarshaller.
*
* @author <a href="mailto:bill@jboss.org">Bill Burke</a>
* @author <a href="mailto:tom@jboss.org">Tom Elrod</a>
* @version $Revision: 1.4.2.2 $
*/
public class ServerThread extends Thread
{
final static private Logger log = Logger.getLogger(ServerThread.class);
protected BufferedInputStream bis;
protected BufferedOutputStream bos;
protected ObjectInputStream in;
protected ObjectOutputStream out;
protected Socket socket;
protected SocketServerInvoker invoker;
protected LRUPool clientpool;
protected LinkedList threadpool;
protected volatile boolean running = true;
protected volatile boolean handlingResponse = true; // start off as true so that nobody can interrupt us
protected volatile boolean shutdown = false;
protected static int id = 0;
public static synchronized int nextID()
{
int nextID = id++;
return nextID;
}
public ServerThread(Socket socket, SocketServerInvoker invoker, LRUPool clientpool,
LinkedList threadpool, int timeout) throws Exception
{
super("SocketServerInvokerThread-" + socket.getInetAddress().getHostAddress() + "-" + nextID());
this.socket = socket;
this.invoker = invoker;
this.clientpool = clientpool;
this.threadpool = threadpool;
socket.setSoTimeout(timeout);
}
public void shutdown()
{
shutdown = true;
running = false;
// This is a race and there is a chance
// that a invocation is going on at the time
// of the interrupt. But I see no way right
// now to protect for this.
// NOTE ALSO!:
// Shutdown should never be synchronized.
// We don't want to hold up accept() thread! (via LRUpool)
if(!handlingResponse)
{
try
{
this.interrupt();
Thread.interrupted(); // clear
}
catch(Exception ignored)
{
}
}
}
public void evict()
{
running = false;
// This is a race and there is a chance
// that a invocation is going on at the time
// of the interrupt. But I see no way right
// now to protect for this.
// There may not be a problem because interrupt only effects
// threads blocking on IO.
// NOTE ALSO!:
// Shutdown should never be synchronized.
// We don't want to hold up accept() thread! (via LRUpool)
if(!handlingResponse)
{
try
{
this.interrupt();
Thread.interrupted(); // clear
}
catch(Exception ignored)
{
}
}
}
public synchronized void wakeup(Socket socket, int timeout) throws Exception
{
this.socket = socket;
String name = "SocketServerInvokerThread-" + socket.getInetAddress().getHostAddress() + "-" + nextID();
super.setName(name);
socket.setSoTimeout(timeout);
running = true;
handlingResponse = true;
this.notify();
}
public void run()
{
try
{
while(true)
{
dorun();
if(shutdown)
{
synchronized(clientpool)
{
clientpool.remove(this);
}
return; // exit thread
}
else
{
synchronized(this)
{
synchronized(clientpool)
{
synchronized(threadpool)
{
clientpool.remove(this);
threadpool.add(this);
Thread.interrupted(); // clear any interruption so that we can be pooled.
clientpool.notify();
}
}
log.debug("begin thread wait");
this.wait();
log.debug("WAKEUP in SERVER THREAD");
}
}
}
}
catch(Exception ignored)
{
log.debug("Exiting run on exception", ignored);
}
}
protected void acknowledge() throws Exception
{
// Perform acknowledgement to convince client
// that the socket is still active
byte ACK = 0;
long startWait = System.currentTimeMillis();
try
{
ACK = in.readByte();
}
catch(EOFException eof)
{
if(log.isTraceEnabled())
{
log.trace("socket timeout is set to : " + socket.getSoTimeout());
log.trace("EOFException waiting on ACK in readByte(). Time waited was " + (System.currentTimeMillis() - startWait));
}
throw eof;
}
catch(IOException e)
{
log.debug("IOException when reading in ACK", e);
throw e;
}
if(log.isDebugEnabled())
{
log.debug("***acknowledge read byte" + Thread.currentThread());
}
// HERE IS THE RACE between ACK received and handlingResponse = true
// We can't synchronize because readByte blocks and client is expecting
// a response and we don't want to hang client.
// see shutdown and evict for more details
// There may not be a problem because interrupt only effects
// threads blocking on IO. and this thread will just continue.
handlingResponse = true;
out.writeByte(ACK);
out.flush();
out.reset();
/*
byte ACK = (byte) bis.read();
bis.read();
// bis.reset();
handlingResponse = true;
bos.write(ACK);
bos.flush();
*/
}
protected void processInvocation() throws Exception
{
handlingResponse = true;
// Ok, now read invocation and invoke
//TODO: -TME Need better way to get the unmarshaller (via config)
UnMarshaller unmarshaller = MarshalFactory.getUnMarshaller(invoker.getLocator(), this.getClass().getClassLoader());
if(unmarshaller == null)
{
unmarshaller = MarshalFactory.getUnMarshaller(invoker.getDataType());
}
Object obj = unmarshaller.read(in, null);
Object resp = null;
try
{
// Make absolutely sure thread interrupted is cleared.
boolean interrupted = Thread.interrupted();
// call transport on the subclass, get the result to handback
resp = invoker.invoke(obj);
/*
if(log.isDebugEnabled())
{
System.err.println("++ returning invocation response : " + resp);
}
*/
}
catch(Exception ex)
{
resp = ex;
}
Thread.interrupted(); // clear interrupted state so we don't fail on socket writes
Marshaller marshaller = MarshalFactory.getMarshaller(invoker.getLocator(), this.getClass().getClassLoader());
if(marshaller == null)
{
marshaller = MarshalFactory.getMarshaller(invoker.getDataType());
}
marshaller.write(resp, out);
handlingResponse = false;
}
/**
* This is needed because Object*Streams leak
*/
protected void dorun()
{
log.debug("beginning dorun");
running = true;
handlingResponse = true;
try
{
//BufferedOutputStream bos = new BufferedOutputStream(socket.getOutputStream());
bos = new BufferedOutputStream(socket.getOutputStream());
//out = new OptimizedObjectOutputStream(bos);
out = new ObjectOutputStream(bos);
out.flush();
//BufferedInputStream bis = new BufferedInputStream(socket.getInputStream());
bis = new BufferedInputStream(socket.getInputStream());
//in = new OptimizedObjectInputStream(bis);
in = new ObjectInputStream(bis);
}
catch(Exception e)
{
log.error("Failed to initialize", e);
}
// Always do first one without an ACK because its not needed
try
{
processInvocation();
}
catch(Exception ex)
{
log.debug("failed to process invocation.", ex);
running = false;
}
// Re-use loop
while(running)
{
try
{
acknowledge();
processInvocation();
}
catch(InterruptedIOException e)
{
log.debug("socket timed out", e);
running = false;
}
catch(InterruptedException e)
{
log.debug("interrupted", e);
}
catch(Exception ex)
{
log.debug("failed", ex);
running = false;
}
// clear any interruption so that thread can be pooled.
Thread.interrupted();
}
// Ok, we've been shutdown. Do appropriate cleanups.
try
{
if(in != null)
{
in.close();
}
if(out != null)
{
out.close();
}
}
catch(Exception ex)
{
}
try
{
socket.close();
}
catch(Exception ex)
{
log.error("Failed cleanup", ex);
}
socket = null;
in = null;
out = null;
}
}
| ServerThread.java |