package org.jboss.mq.il.oil2;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Iterator;
import org.jboss.logging.Logger;
import EDU.oswego.cs.dl.util.concurrent.Channel;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.Slot;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
public final class OIL2SocketHandler implements java.lang.Cloneable, Runnable
{
final static private Logger log = Logger.getLogger(OIL2SocketHandler.class);
private ObjectInputStream in;
private ObjectOutputStream out;
private boolean running;
private final ThreadGroup partentThreadGroup;
private Thread worker;
private static int threadNumber = 0;
volatile ConcurrentHashMap responseSlots = new ConcurrentHashMap();
OIL2RequestListner requestListner;
private volatile boolean pumpingData = false;
private Object pumpMutex = new Object();
LinkedQueue requestQueue = new LinkedQueue();
PooledExecutor pool;
public OIL2SocketHandler(ObjectInputStream in, ObjectOutputStream out, ThreadGroup partentThreadGroup)
{
this.in = in;
this.out = out;
this.partentThreadGroup = partentThreadGroup;
synchronized (OIL2SocketHandler.class)
{
if (pool == null)
{
pool = new PooledExecutor(50);
log.debug("Setting the OIL2SocketHandler's thread factory");
pool.setThreadFactory(
new ThreadFactory()
{
private int threadNo = 0;
public Thread newThread(Runnable r)
{
Thread t = new Thread(OIL2SocketHandler.this.partentThreadGroup, r, "OIL2SocketHandler Thread-" + threadNo++);
t.setDaemon(true);
return t;
}
}
);
pool.setMinimumPoolSize(1);
pool.setKeepAliveTime(1000 * 60);
pool.runWhenBlocked();
pool.createThreads(1);
}
}
}
public void sendRequest(OIL2Request request) throws IOException
{
try
{
synchronized (out)
{
out.writeByte(1);
request.writeExternal(out);
out.reset();
out.flush();
}
}
catch (IOException e)
{
throw e;
}
}
private void registerResponseSlot(OIL2Request request, Slot responseSlot) throws IOException
{
responseSlots.put(request.requestId, responseSlot);
}
public void setRequestListner(OIL2RequestListner requestListner)
{
this.requestListner = requestListner;
}
public void sendResponse(OIL2Response response) throws IOException
{
try
{
synchronized (out)
{
out.writeByte(2);
response.writeExternal(out);
out.reset();
out.flush();
}
}
catch (IOException e)
{
throw e;
}
}
private Object pumpMessages(OIL2Request request, Channel mySlot)
throws IOException, ClassNotFoundException, InterruptedException
{
synchronized (pumpMutex)
{
if (pumpingData)
{
return null;
}
else
pumpingData = true;
}
try
{
while (true)
{
if (mySlot != null)
{
Object o;
while ((o = mySlot.peek()) != null)
{
o = mySlot.take();
if (o != this)
{
return o;
}
}
}
byte code = in.readByte();
switch (code)
{
case 1 :
OIL2Request newRequest = new OIL2Request();
newRequest.readExternal(in);
if (request == null)
{
return newRequest;
}
else
{
requestQueue.put(newRequest);
}
break;
case 2 :
OIL2Response response = new OIL2Response();
response.readExternal(in);
if (response.correlationRequestId == null)
continue;
if (request != null && request.requestId.equals(response.correlationRequestId))
{
return response;
}
else
{
Slot slot = (Slot) responseSlots.remove(response.correlationRequestId);
if (slot != null)
{
slot.put(response);
}
else
{
if (log.isTraceEnabled())
log.warn("No slot registered for: " + response);
}
}
break;
} } }
finally
{
synchronized (pumpMutex)
{
pumpingData = false;
}
Thread thread = Thread.currentThread();
boolean interrupted = thread.isInterrupted();
Iterator i = responseSlots.values().iterator();
while (i.hasNext())
{
Slot s = (Slot) i.next();
if (s != mySlot)
s.offer(this, 0);
}
if (request != null)
{
requestQueue.put(this);
}
if (interrupted)
thread.interrupt();
}
}
public OIL2Response synchRequest(OIL2Request request)
throws IOException, InterruptedException, ClassNotFoundException
{
Slot slot = new Slot();
registerResponseSlot(request, slot);
sendRequest(request);
Object o = null;
while (true)
{
if (o != null)
{
if (o != this)
{
return (OIL2Response) o;
}
o = slot.peek();
if (o != null)
o = slot.take();
}
else
{
o = pumpMessages(request, slot);
if (o == null)
{
o = slot.take();
}
}
} }
public class RequestRunner implements Runnable
{
OIL2Request request;
RequestRunner(OIL2Request request)
{
this.request = request;
}
public void run()
{
requestListner.handleRequest(request);
}
}
public void run()
{
try
{
Object o = null;
while (running)
{
if (o != null)
{
if (o != this)
{
pool.execute(new RequestRunner((OIL2Request) o));
}
o = requestQueue.peek();
if (o != null)
o = requestQueue.take();
}
else
{
o = pumpMessages(null, requestQueue);
if (o == null)
{
o = requestQueue.take();
}
}
}
}
catch (InterruptedException e)
{
if (log.isTraceEnabled())
log.trace("Stopped due to interruption");
}
catch (Exception e)
{
if (log.isTraceEnabled())
log.trace("Stopping due to unexcpected exception: ", e);
requestListner.handleConnectionException(e);
}
running = false;
if (log.isTraceEnabled())
log.trace("Stopped");
}
public void start() {
if (log.isTraceEnabled())
log.trace("Starting");
running = true;
worker = new Thread(partentThreadGroup, this, "OIL2 Worker-" + threadNumber++);
worker.setDaemon(true);
worker.start();
}
public void stop()
{
if (log.isTraceEnabled())
log.trace("Stopping");
running = false;
worker.interrupt();
}
}