package org.jboss.mq;
import java.util.ArrayList;
import java.util.LinkedList;
import javax.jms.ConnectionConsumer;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import org.jboss.logging.Logger;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
public class SpyConnectionConsumer implements ConnectionConsumer, SpyConsumer, Runnable
{
static Logger log = Logger.getLogger(SpyConnectionConsumer.class);
static boolean trace = log.isTraceEnabled();
Connection connection;
Destination destination;
javax.jms.ServerSessionPool serverSessionPool;
int maxMessages;
LinkedList queue = new LinkedList();
boolean closed = false;
boolean waitingForMessage = false;
Subscription subscription = new Subscription();
Thread internalThread;
int id;
static SynchronizedInt threadId = new SynchronizedInt(0);
public SpyConnectionConsumer(Connection connection, Destination destination, String messageSelector,
ServerSessionPool serverSessionPool, int maxMessages) throws JMSException
{
trace = log.isTraceEnabled();
this.connection = connection;
this.destination = destination;
this.serverSessionPool = serverSessionPool;
this.maxMessages = maxMessages;
if (this.maxMessages < 1)
this.maxMessages = 1;
subscription.destination = (SpyDestination) destination;
subscription.messageSelector = messageSelector;
subscription.noLocal = false;
connection.addConsumer(this);
id = threadId.increment();
internalThread = new Thread(this, "Connection Consumer for dest " + subscription + " id=" + id);
internalThread.start();
if (trace)
log.trace("New " + this);
}
public Subscription getSubscription()
{
return subscription;
}
public void addMessage(SpyMessage mes) throws JMSException
{
synchronized (queue)
{
if (closed)
{
if (trace)
log.trace("Consumer close nacking message=" + mes.header.jmsMessageID + " " + this);
log.warn("NACK issued. The connection consumer was closed.");
connection.send(mes.getAcknowledgementRequest(false));
return;
}
if (trace)
log.trace("Add message=" + mes.header.jmsMessageID + " " + this);
if (waitingForMessage)
{
queue.addLast(mes);
queue.notifyAll();
}
else
{
if (trace)
log.trace("Consumer not waiting nacking message=" + mes.header.jmsMessageID + " " + this);
connection.send(mes.getAcknowledgementRequest(false));
}
}
}
public ServerSessionPool getServerSessionPool() throws JMSException
{
return serverSessionPool;
}
public void close() throws javax.jms.JMSException
{
synchronized (queue)
{
if (closed)
return;
closed = true;
queue.notifyAll();
}
if (trace)
log.trace("Close " + this);
if (internalThread != null && !internalThread.equals(Thread.currentThread()))
{
try
{
if (trace)
log.trace("Joining thread " + this);
internalThread.join();
}
catch (InterruptedException e)
{
if (trace)
log.trace("Ignoring interrupting while joining thread " + this);
}
}
synchronized (queue)
{
if (trace)
log.trace("Nacking messages on queue " + this);
try
{
while (queue.isEmpty() == false)
{
SpyMessage message = (SpyMessage) queue.removeFirst();
connection.send(message.getAcknowledgementRequest(false));
}
}
catch (Throwable ignore)
{
if (trace)
log.trace("Ignoring error nacking messages in queue " + this, ignore);
}
try
{
connection.removeConsumer(this);
}
catch (Throwable ignore)
{
if (trace)
log.trace("Ignoring error removing consumer from connection " + this, ignore);
}
}
}
public void run()
{
ArrayList mesList = new ArrayList();
try
{
outer : while (true)
{
synchronized (queue)
{
if (closed)
{
if (trace)
log.trace("run() closed " + this);
break outer;
}
}
for (int i = 0; i < maxMessages; i++)
{
SpyMessage mes = connection.receive(subscription, -1);
if (mes == null)
{
if (trace)
log.trace("run() receivedNoWait got no message" + this);
break;
}
else
{
if (trace)
log.trace("run() receivedNoWait message=" + mes.header.jmsMessageID + " " + this);
mesList.add(mes);
}
}
if (mesList.isEmpty())
{
SpyMessage mes = null;
synchronized (queue)
{
mes = connection.receive(subscription, 0);
if (mes == null)
{
waitingForMessage = true;
while (queue.isEmpty() && !closed)
{
if (trace)
log.trace("run() waiting for message " + this);
try
{
queue.wait();
}
catch (InterruptedException e)
{
if (trace)
log.trace("Ignoring interruption waiting for message " + this, e);
}
}
if (closed)
{
if (trace)
log.trace("run() closed while waiting " + this);
waitingForMessage = false;
break outer;
}
mes = (SpyMessage) queue.removeFirst();
waitingForMessage = false;
if (trace)
log.trace("run() got message message=" + mes.header.jmsMessageID + " " + this);
}
}
mesList.add(mes);
}
if (trace)
log.trace("Waiting for serverSesionPool " + this);
ServerSession serverSession = serverSessionPool.getServerSession();
SpySession spySession = (SpySession) serverSession.getSession();
if (trace)
log.trace("Waited serverSesion=" + serverSession + " session=" + spySession + " " + this);
if (spySession.sessionConsumer == null)
{
if (trace)
log.trace("Session did not have a set MessageListner " + spySession + " " + this);
}
else
{
spySession.sessionConsumer.subscription = subscription;
}
for (int i = 0; i < mesList.size(); i++)
spySession.addMessage((SpyMessage) mesList.get(i));
if (trace)
log.trace(" Starting the ServerSession=" + serverSession + " " + this);
serverSession.start();
mesList.clear();
}
}
catch (Throwable t)
{
log.warn("Connection consumer closing due to error in listening thread " + this, t);
try
{
for (int i = 0; i < mesList.size(); i++)
{
SpyMessage msg = (SpyMessage) mesList.get(i);
connection.send(msg.getAcknowledgementRequest(false));
}
}
catch (Throwable ignore)
{
if (trace)
log.trace("Ignoring error nacking message " + this, ignore);
}
try
{
close();
}
catch (Throwable ignore)
{
if (trace)
log.trace("Ignoring error during close " + this, ignore);
}
}
}
public String toString()
{
StringBuffer buffer = new StringBuffer(100);
buffer.append("SpyConnectionConsumer[sub=").append(subscription);
if (closed)
buffer.append(" CLOSED");
buffer.append(" messages=").append(queue.size());
buffer.append(" waitingForMessage=").append(waitingForMessage);
if (internalThread != null)
buffer.append(" internalThread=").append(internalThread);
buffer.append(" sessionPool=").append(serverSessionPool);
buffer.append(" connection=").append(connection);
buffer.append(']');
return buffer.toString();
}
}