package org.jboss.mq;
import java.util.LinkedList;
import javax.jms.IllegalStateException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.jboss.logging.Logger;
import org.jboss.util.UnreachableStatementException;
public class SpyMessageConsumer implements MessageConsumer, SpyConsumer, Runnable
{
static Logger log = Logger.getLogger(SpyMessageConsumer.class);
static boolean trace = log.isTraceEnabled();
public SpySession session;
public Subscription subscription = new Subscription();
protected boolean closed;
protected Object stateLock = new Object();
protected boolean receiving = false;
protected boolean waitingForMessage = false;
protected boolean listening = false;
protected Thread listenerThread = null;
MessageListener messageListener;
LinkedList messages;
boolean sessionConsumer;
SpyMessageConsumer(SpySession s, boolean sessionConsumer)
{
trace = log.isTraceEnabled();
session = s;
this.sessionConsumer = sessionConsumer;
messageListener = null;
closed = false;
messages = new LinkedList();
if (trace)
log.trace("New message consumer " + this);
}
SpyMessageConsumer(SpySession s, boolean sessionConsumer, SpyDestination destination, String selector, boolean noLocal) throws InvalidSelectorException
{
trace = log.isTraceEnabled();
session = s;
this.sessionConsumer = sessionConsumer;
subscription.destination = destination;
subscription.messageSelector = selector;
subscription.noLocal = noLocal;
if (subscription.messageSelector != null)
subscription.getSelector();
messageListener = null;
closed = false;
messages = new LinkedList();
if (trace)
log.trace("New message consumer " + this);
}
public Subscription getSubscription()
{
return subscription;
}
public void addMessage(SpyMessage message) throws JMSException
{
if (closed)
{
if (trace)
log.trace("WARNING: NACK issued. message=" + message.header.jmsMessageID +
" The message consumer was closed. " + this);
session.connection.send(message.getAcknowledgementRequest(false));
return;
}
if (subscription.accepts(message.header))
{
if (sessionConsumer)
sessionConsumerProcessMessage(message);
else
{
synchronized (messages)
{
if (waitingForMessage)
{
if (trace)
log.trace("Adding message=" + message.header.jmsMessageID + " " + this);
messages.addLast(message);
messages.notifyAll();
}
else
{
if (trace)
log.trace("WARNING: NACK issued. message=" + message.header.jmsMessageID +
" The message consumer was not waiting for a message. " + this);
session.connection.send(message.getAcknowledgementRequest(false));
}
}
}
}
else
{
if (trace)
log.trace("WARNING: NACK issued. message=" + message.header.jmsMessageID +
" The subscription did not accept the message. " + this);
session.connection.send(message.getAcknowledgementRequest(false));
}
}
public void restartProcessing()
{
synchronized (messages)
{
if (trace)
log.trace("Restarting processing " + this);
messages.notifyAll();
}
}
public void setMessageListener(MessageListener listener) throws JMSException
{
if (closed)
throw new IllegalStateException("The MessageConsumer is closed");
synchronized (stateLock)
{
if (receiving)
throw new JMSException("Another thread is already in receive.");
if (trace)
log.trace("Set message listener=" + listener + " old listener=" + messageListener + " " + this);
boolean oldListening = listening;
listening = (listener != null);
messageListener = listener;
if (!sessionConsumer && listening && !oldListening)
{
if (listenerThread == null)
{
listenerThread = new Thread(this, "MessageListenerThread - " + subscription.destination.getName());
listenerThread.start();
}
}
}
}
public String getMessageSelector() throws JMSException
{
if (closed)
throw new IllegalStateException("The MessageConsumer is closed");
return subscription.messageSelector;
}
public MessageListener getMessageListener() throws JMSException
{
if (closed)
throw new IllegalStateException("The MessageConsumer is closed");
return messageListener;
}
public Message receive() throws JMSException
{
if (closed)
throw new IllegalStateException("The MessageConsumer is closed");
synchronized (stateLock)
{
if (receiving)
throw new JMSException("Another thread is already in receive.");
if (listening)
throw new JMSException("A message listener is already registered");
receiving = true;
if (trace)
log.trace("receive() " + this);
}
synchronized (messages)
{
Message message = getMessage();
if (message != null)
{
synchronized (stateLock)
{
receiving = false;
if (trace)
log.trace("receive() message in list " + message.getJMSMessageID() + " " + this);
}
return message;
}
while (true)
{
SpyMessage msg = session.connection.receive(subscription, 0);
if (msg != null)
{
Message mes = preProcessMessage(msg);
if (mes != null)
{
synchronized (stateLock)
{
receiving = false;
if (trace)
log.trace("receive() message from server " + mes.getJMSMessageID() + " " + this);
}
return mes;
}
}
else
break;
}
if (trace)
log.trace("No message in receive(), waiting " + this);
try
{
waitingForMessage = true;
while (true)
{
if (closed)
{
if (trace)
log.trace("Consumer closed in receive() " + this);
return null;
}
Message mes = getMessage();
if (mes != null)
{
if (trace)
log.trace("receive() message from list after wait " + this);
return mes;
}
messages.wait();
}
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Receive interupted", t);
throw new UnreachableStatementException();
}
finally
{
waitingForMessage = false;
synchronized (stateLock)
{
receiving = false;
}
}
}
}
public Message receive(long timeOut) throws JMSException
{
if (timeOut == 0)
{
if (trace)
log.trace("Timeout is zero in receive(long) using receive() " + this);
return receive();
}
if (closed)
throw new IllegalStateException("The MessageConsumer is closed");
synchronized (stateLock)
{
if (receiving)
throw new JMSException("Another thread is already in receive.");
if (listening)
throw new JMSException("A message listener is already registered");
receiving = true;
if (trace)
log.trace("receive(long) " + this);
}
long endTime = System.currentTimeMillis() + timeOut;
if (trace)
log.trace("receive(long) endTime=" + endTime + " " + this);
synchronized (messages)
{
Message message = getMessage();
if (message != null)
{
synchronized (stateLock)
{
receiving = false;
if (trace)
log.trace("receive(long) message in list " + message.getJMSMessageID() + " " + this);
}
return message;
}
while (true)
{
SpyMessage msg = session.connection.receive(subscription, timeOut);
if (msg != null)
{
Message mes = preProcessMessage(msg);
if (mes != null)
{
synchronized (stateLock)
{
receiving = false;
if (trace)
log.trace("receive(long) message from server " + mes.getJMSMessageID() + " " + this);
}
return mes;
}
}
else
break;
}
if (trace)
log.trace("No message in receive(), waiting " + this);
try
{
waitingForMessage = true;
while (true)
{
if (closed)
{
if (trace)
log.trace("Consumer closed in receive(long) " + this);
return null;
}
Message mes = getMessage();
if (mes != null)
{
if (trace)
log.trace("receive(long) message from list after wait " + this);
return mes;
}
long att = endTime - System.currentTimeMillis();
if (att <= 0)
{
if (trace)
log.trace("receive(long) timed out endTime=" + endTime + " " + this);
return null;
}
messages.wait(att);
}
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Receive interupted", t);
throw new UnreachableStatementException();
}
finally
{
waitingForMessage = false;
synchronized (stateLock)
{
receiving = false;
}
}
}
}
public Message receiveNoWait() throws JMSException
{
if (closed)
throw new IllegalStateException("The MessageConsumer is closed");
synchronized (stateLock)
{
if (receiving)
throw new JMSException("Another thread is already in receive.");
if (listening)
throw new JMSException("A message listener is already registered");
receiving = true;
if (trace)
log.trace("receiveNoWait() " + this);
}
synchronized (messages)
{
Message mes = getMessage();
if (mes != null)
{
synchronized (stateLock)
{
receiving = false;
if (trace)
log.trace("receiveNoWait() message in list " + mes.getJMSMessageID() + " " + this);
}
return mes;
}
}
while (true)
{
SpyMessage msg = session.connection.receive(subscription, -1);
if (msg != null)
{
Message mes = preProcessMessage(msg);
if (mes != null)
{
synchronized (stateLock)
{
receiving = false;
if (trace)
log.trace("receiveNoWait() message from server " + mes.getJMSMessageID() + " " + this);
}
return mes;
}
}
else
{
synchronized (stateLock)
{
receiving = false;
}
if (trace)
log.trace("receiveNoWait() no message " + this);
return null;
}
}
}
public void close() throws JMSException
{
synchronized (messages)
{
if (closed)
return;
if (trace)
log.trace("Message consumer closing. " + this);
closed = true;
messages.notifyAll();
}
session.interruptDeliveryLockWaiters();
if (listenerThread != null && !Thread.currentThread().equals(listenerThread))
{
try
{
if (trace)
log.trace("Joining listener thread. " + this);
listenerThread.join();
}
catch (InterruptedException e)
{
}
}
if (!sessionConsumer)
{
session.removeConsumer(this);
}
if (trace)
log.trace("Closed. " + this);
}
public void run()
{
SpyMessage mes = null;
try
{
outer : while (true)
{
while (mes == null)
{
synchronized (messages)
{
if (closed)
{
waitingForMessage = false;
if (trace)
log.trace("Consumer closed in run() " + this);
break outer;
}
if (messages.isEmpty())
mes = session.connection.receive(subscription, 0);
if (mes == null)
{
waitingForMessage = true;
if (trace)
log.trace("waiting in run() " + this);
while ((messages.isEmpty() && !closed) || (!session.running))
{
try
{
messages.wait();
}
catch (InterruptedException e)
{
log.trace("Ignored interruption waiting for messages");
}
}
if (closed)
{
waitingForMessage = false;
if (trace)
log.trace("Consumer closed while waiting in run() " + this);
break outer;
}
mes = (SpyMessage) messages.removeFirst();
waitingForMessage = false;
}
else
{
if (trace)
log.trace("run() message from server mes=" + mes.getJMSMessageID() + " " + this);
}
}
mes.session = session;
if (mes.isOutdated())
{
if (trace)
log.trace("run() acking expired message mes=" + mes.getJMSMessageID() + " " + this);
mes.doAcknowledge();
mes = null;
}
}
MessageListener thisListener;
synchronized (stateLock)
{
if (!isListening())
{
if (mes != null)
{
if (trace)
log.trace("run() nacking not listening message mes=" + mes.getJMSMessageID() + " " + this);
session.connection.send(mes.getAcknowledgementRequest(false));
}
listenerThread = null;
mes = null;
break;
}
thisListener = messageListener;
}
Message message = mes;
if (mes instanceof SpyEncapsulatedMessage)
message = ((SpyEncapsulatedMessage) mes).getMessage();
if (session.transacted)
{
if (trace)
log.trace("run() acknowledging message in tx mes=" + mes.getJMSMessageID() + " " + this);
session.connection.spyXAResourceManager.ackMessage(session.getCurrentTransactionId(), mes);
}
boolean gotDeliveryLock = false;
while (gotDeliveryLock == false)
{
gotDeliveryLock = session.tryDeliveryLock();
if (gotDeliveryLock == false)
{
synchronized (messages)
{
if (closed)
break;
}
}
}
if (gotDeliveryLock == false)
{
if (trace)
log.trace("run() nacking didn't get delivery lock mes=" + mes.getJMSMessageID() + " " + this);
session.connection.send(mes.getAcknowledgementRequest(false));
}
else
{
try
{
session.addUnacknowlegedMessage((SpyMessage) message);
thisListener.onMessage(message);
}
catch (Throwable t)
{
log.warn("Message listener " + thisListener + " threw a throwable.", t);
}
finally
{
session.releaseDeliveryLock();
}
if (!session.transacted
&& (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE || session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE))
{
boolean recovered;
synchronized (messages)
{
recovered = messages.contains(message);
}
if (recovered == false)
mes.doAcknowledge();
}
mes = null;
}
}
}
catch (Throwable t)
{
log.warn("Message consumer closing due to error in listening thread.", t);
try
{
close();
}
catch (Throwable ignore)
{
}
session.asynchFailure("Message consumer closing due to error in listening thread.", t);
}
}
public String toString()
{
StringBuffer buffer = new StringBuffer(100);
buffer.append("SpyMessageConsumer@").append(System.identityHashCode(this));
buffer.append("[sub=").append(subscription);
if (closed)
buffer.append(" CLOSED");
buffer.append(" listening=").append(listening);
buffer.append(" receiving=").append(receiving);
buffer.append(" sessionConsumer=").append(sessionConsumer);
buffer.append(" waitingForMessage=").append(waitingForMessage);
buffer.append(" messages=").append(messages.size());
if (listenerThread != null)
buffer.append(" thread=").append(listenerThread);
if (messageListener != null)
buffer.append(" listener=").append(messageListener);
buffer.append(" session=").append(session);
buffer.append(']');
return buffer.toString();
}
Message getMessage()
{
synchronized (messages)
{
if (trace)
log.trace("Getting message from list " + this);
while (true)
{
try
{
if (messages.size() == 0)
return null;
SpyMessage mes = (SpyMessage) messages.removeFirst();
Message rc = preProcessMessage(mes);
if (rc == null)
continue;
return rc;
}
catch (Throwable t)
{
log.error("Ignoring error", t);
}
}
}
}
Message preProcessMessage(SpyMessage message) throws JMSException
{
message.session = session;
session.addUnacknowlegedMessage(message);
if (message.isOutdated())
{
if (trace)
log.trace("preprocess() acking expired message=" + message.getJMSMessageID() + " " + this);
message.doAcknowledge();
return null;
}
if (!isListening())
{
if (session.transacted)
{
if (trace)
log.trace("preprocess() acking message in tx message=" + message.getJMSMessageID() + " " + this);
session.connection.spyXAResourceManager.ackMessage(session.getCurrentTransactionId(), message);
}
else if (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE
|| session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
{
message.doAcknowledge();
}
if (message instanceof SpyEncapsulatedMessage)
{
return ((SpyEncapsulatedMessage) message).getMessage();
}
return message;
}
else
{
return message;
}
}
protected boolean isListening()
{
synchronized (stateLock)
{
return listening;
}
}
protected void sessionConsumerProcessMessage(SpyMessage message) throws JMSException
{
message.session = session;
if (message.isOutdated())
{
if (trace)
log.trace("consumer() acking message in tx message=" + message.getJMSMessageID() + " " + this);
message.doAcknowledge();
return;
}
MessageListener thisListener;
synchronized (stateLock)
{
thisListener = messageListener;
}
Object anonymousTXID = null;
if (session.transacted)
{
if (session.getCurrentTransactionId() == null)
{
anonymousTXID = session.connection.spyXAResourceManager.startTx();
session.setCurrentTransactionId(anonymousTXID);
}
if (trace)
log.trace("consumer() acking message in tx message=" + message.getJMSMessageID() + " " + this);
session.connection.spyXAResourceManager.ackMessage(session.getCurrentTransactionId(), message);
}
if (thisListener != null)
{
Message mes = message;
if (message instanceof SpyEncapsulatedMessage)
{
mes = ((SpyEncapsulatedMessage) message).getMessage();
}
session.addUnacknowlegedMessage((SpyMessage) mes);
if (trace)
log.trace("consumer() before onMessage=" + message.getJMSMessageID() + " " + this);
thisListener.onMessage(mes);
if (trace)
log.trace("consumer() after onMessage=" + message.getJMSMessageID() + " " + this);
}
if (session.transacted)
{
if (anonymousTXID != null)
{
if (session.getCurrentTransactionId() == anonymousTXID)
{
try
{
if (trace)
log.trace("XASession was not enlisted - Committing work using anonymous xid: " + anonymousTXID);
session.connection.spyXAResourceManager.endTx(anonymousTXID, true);
session.connection.spyXAResourceManager.commit(anonymousTXID, true);
}
catch (Throwable t)
{
log.error("Could not commit", t);
}
finally
{
session.unsetCurrentTransactionId(anonymousTXID);
}
}
}
}
else
{
if (session.acknowledgeMode == Session.AUTO_ACKNOWLEDGE
|| session.acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
{
message.doAcknowledge();
}
}
}
}