package org.jboss.mq;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSubscriber;
import javax.jms.XASession;
import javax.transaction.xa.XAResource;
import org.jboss.logging.Logger;
public class SpySession implements Session, XASession
{
static Logger log = Logger.getLogger(SpySession.class);
static boolean trace = log.isTraceEnabled();
public Connection connection;
public boolean running;
protected boolean transacted;
protected int acknowledgeMode;
protected HashSet consumers;
protected HashSet producers;
protected Object deliveryLock = new Object();
protected boolean inDelivery = false;
SpyMessageConsumer sessionConsumer;
boolean closed;
Object runLock = new Object();
private Object currentTransactionId;
SpyXAResource spyXAResource;
LinkedList messages = new LinkedList();
ArrayList unacknowledgedMessages = new ArrayList();
SpySession(Connection conn, boolean trans, int acknowledge, boolean xaSession)
{
trace = log.isTraceEnabled();
connection = conn;
transacted = trans;
acknowledgeMode = acknowledge;
if (xaSession)
spyXAResource = new SpyXAResource(this);
running = true;
closed = false;
consumers = new HashSet();
producers = new HashSet();
if (spyXAResource == null && transacted)
currentTransactionId = connection.spyXAResourceManager.startTx();
if (trace)
log.trace("New session " + this);
}
public void doAcknowledge(Message message, AcknowledgementRequest ack) throws JMSException
{
if (ack.isAck)
{
synchronized (unacknowledgedMessages)
{
if (trace)
log.trace("Acknowledging message " + ack);
connection.send(((SpyMessage) message).getAcknowledgementRequest(true));
unacknowledgedMessages.remove(message);
Iterator i = unacknowledgedMessages.iterator();
while (i.hasNext())
{
Message mess = (Message) i.next();
i.remove();
connection.send(((SpyMessage) mess).getAcknowledgementRequest(true));
}
}
}
else
{
if (trace)
log.trace("Nacking message " + message.getJMSMessageID());
unacknowledgedMessages.remove(message);
connection.send(ack);
}
}
public SpyXAResourceManager getXAResourceManager()
{
return connection.spyXAResourceManager;
}
public void setMessageListener(MessageListener listener) throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
if (trace)
log.trace("Set message listener " + listener + " " + this);
sessionConsumer = new SpyMessageConsumer(this, true);
sessionConsumer.setMessageListener(listener);
}
public boolean getTransacted() throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
return transacted;
}
public MessageListener getMessageListener() throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
if (sessionConsumer == null)
return null;
return sessionConsumer.getMessageListener();
}
public BytesMessage createBytesMessage() throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
SpyBytesMessage message = MessagePool.getBytesMessage();
message.header.producerClientId = connection.getClientID();
return message;
}
public MapMessage createMapMessage() throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
SpyMapMessage message = MessagePool.getMapMessage();
message.header.producerClientId = connection.getClientID();
return message;
}
public Message createMessage() throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
SpyMessage message = MessagePool.getMessage();
message.header.producerClientId = connection.getClientID();
return message;
}
public ObjectMessage createObjectMessage() throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
SpyObjectMessage message = MessagePool.getObjectMessage();
message.header.producerClientId = connection.getClientID();
return message;
}
public ObjectMessage createObjectMessage(Serializable object) throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
SpyObjectMessage message = MessagePool.getObjectMessage();
message.setObject(object);
message.header.producerClientId = connection.getClientID();
return message;
}
public StreamMessage createStreamMessage() throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
SpyStreamMessage message = MessagePool.getStreamMessage();
message.header.producerClientId = connection.getClientID();
return message;
}
public TextMessage createTextMessage() throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
SpyTextMessage message = MessagePool.getTextMessage();
message.header.producerClientId = connection.getClientID();
return message;
}
public void run()
{
synchronized (messages)
{
if (trace)
log.trace("Run messages=" + messages.size() + " " + this);
while (messages.size() > 0)
{
SpyMessage message = (SpyMessage) messages.removeFirst();
try
{
if (sessionConsumer == null)
{
log.warn("Session has no message listener set, cannot process message. " + this);
connection.send(message.getAcknowledgementRequest(false));
}
else
{
sessionConsumer.addMessage(message);
}
}
catch (Throwable ignore)
{
if (trace)
log.trace("Ignored error from session consumer", ignore);
}
}
}
}
public void close() throws JMSException
{
synchronized (runLock)
{
if (closed)
return;
closed = true;
}
if (trace)
log.trace("Session closing " + this);
JMSException exception = null;
if (trace)
log.trace("Closing consumers " + this);
Iterator i;
synchronized (consumers)
{
if (sessionConsumer != null)
{
try
{
sessionConsumer.close();
}
catch (InvalidDestinationException ignored)
{
log.warn(ignored.getMessage(), ignored);
}
catch (Throwable t)
{
exception = SpyJMSException.getAsJMSException("Error closing session consumer", t);
}
}
i = new ArrayList(consumers).iterator();
}
while (i.hasNext())
{
SpyMessageConsumer messageConsumer = (SpyMessageConsumer) i.next();
try
{
messageConsumer.close();
}
catch (InvalidDestinationException ignored)
{
log.warn(ignored.getMessage(), ignored);
}
catch (Throwable t)
{
if (exception == null)
exception = SpyJMSException.getAsJMSException("Error closing message consumer", t);
}
}
synchronized (producers)
{
i = new ArrayList(producers).iterator();
}
while (i.hasNext())
{
SpyMessageProducer messageProducer = (SpyMessageProducer) i.next();
try
{
messageProducer.close();
}
catch (InvalidDestinationException ignored)
{
log.warn(ignored.getMessage(), ignored);
}
catch (Throwable t)
{
if (exception == null)
exception = SpyJMSException.getAsJMSException("Error closing message producer", t);
}
}
if (trace)
log.trace("Close handling unacknowledged messages " + this);
try
{
if (spyXAResource == null)
{
if (transacted)
internalRollback();
else
{
i = unacknowledgedMessages.iterator();
while (i.hasNext())
{
SpyMessage message = (SpyMessage) i.next();
connection.send(message.getAcknowledgementRequest(false));
i.remove();
}
}
}
}
catch (Throwable t)
{
if (exception == null)
exception = SpyJMSException.getAsJMSException("Error nacking message", t);
}
if (trace)
log.trace("Informing connection of close " + this);
connection.sessionClosing(this);
if (exception != null)
throw exception;
}
public void commit() throws JMSException
{
trace = log.isTraceEnabled();
synchronized (runLock)
{
if (spyXAResource != null)
throw new javax.jms.TransactionInProgressException("Should not be call from a XASession");
if (closed)
throw new IllegalStateException("The session is closed");
if (!transacted)
throw new IllegalStateException("The session is not transacted");
if (trace)
log.trace("Committing transaction " + this);
try
{
connection.spyXAResourceManager.endTx(currentTransactionId, true);
connection.spyXAResourceManager.commit(currentTransactionId, true);
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Could not commit", t);
}
finally
{
unacknowledgedMessages.clear();
try
{
currentTransactionId = connection.spyXAResourceManager.startTx();
if (trace)
log.trace("Current transaction id: " + currentTransactionId + " " + this);
}
catch (Throwable ignore)
{
if (trace)
log.trace("Failed to start tx " + this, ignore);
}
}
}
}
public void rollback() throws JMSException
{
trace = log.isTraceEnabled();
synchronized (runLock)
{
if (closed)
throw new IllegalStateException("The session is closed");
internalRollback();
}
}
public void recover() throws JMSException
{
boolean stopped = connection.modeStop;
synchronized (runLock)
{
if (closed)
throw new IllegalStateException("The session is closed");
if (currentTransactionId != null)
throw new IllegalStateException("The session is transacted");
if (trace)
log.trace("Session recovery stopping delivery " + this);
try
{
connection.stop();
running = false;
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Could not stop message delivery", t);
}
try
{
synchronized (messages)
{
if (stopped == false)
{
if (trace)
log.trace("Recovering: unacknowledged messages=" + unacknowledgedMessages + " " + this);
Iterator i = consumers.iterator();
while (i.hasNext())
{
SpyMessageConsumer consumer = (SpyMessageConsumer) i.next();
Iterator ii = unacknowledgedMessages.iterator();
while (ii.hasNext())
{
SpyMessage message = (SpyMessage) ii.next();
if (consumer.getSubscription().accepts(message.header))
{
message.setJMSRedelivered(true);
consumer.messages.addLast(message);
ii.remove();
if (trace)
log.trace("Recovered: message=" + message + " consumer=" + consumer);
}
}
}
}
Iterator i = unacknowledgedMessages.iterator();
while (i.hasNext())
{
SpyMessage message = (SpyMessage) i.next();
connection.send(message.getAcknowledgementRequest(false));
i.remove();
if (trace)
log.trace("Recovered: nacked with no consumer message=" + message + " " + this);
}
}
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Unable to recover session ", t);
}
if (stopped == false)
{
if (trace)
log.trace("Recovery restarting message delivery " + this);
try
{
running = true;
connection.start();
Iterator i = consumers.iterator();
while (i.hasNext())
((SpyMessageConsumer) i.next()).restartProcessing();
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Could not resume message delivery", t);
}
}
}
}
public TextMessage createTextMessage(String string) throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
SpyTextMessage message = new SpyTextMessage();
message.setText(string);
message.header.producerClientId = connection.getClientID();
return message;
}
public int getAcknowledgeMode() throws JMSException
{
return acknowledgeMode;
}
public MessageConsumer createConsumer(Destination destination) throws JMSException
{
return createConsumer(destination, null, false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
return createConsumer(destination, messageSelector, false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
throws JMSException
{
if (destination instanceof Topic)
return createSubscriber((Topic) destination, messageSelector, noLocal);
else
return createReceiver((Queue) destination, messageSelector);
}
public MessageProducer createProducer(Destination destination) throws JMSException
{
if (destination instanceof Topic)
return createPublisher((Topic) destination);
else
return createSender((Queue) destination);
}
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
return createBrowser(queue, null);
}
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
if (this instanceof SpyTopicSession)
throw new IllegalStateException("Not allowed for a TopicSession");
if (queue == null)
throw new InvalidDestinationException("Cannot browse a null queue.");
return new SpyQueueBrowser(this, queue, messageSelector);
}
public QueueReceiver createReceiver(Queue queue) throws JMSException
{
return createReceiver(queue, null);
}
public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
if (queue == null)
throw new InvalidDestinationException("Queue cannot be null.");
connection.checkTemporary(queue);
SpyQueueReceiver receiver = new SpyQueueReceiver(this, queue, messageSelector);
addConsumer(receiver);
return receiver;
}
public QueueSender createSender(Queue queue) throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
SpyQueueSender producer = new SpyQueueSender(this, queue);
addProducer(producer);
return producer;
}
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
{
return createDurableSubscriber(topic, name, null, false);
}
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
if (this instanceof SpyQueueSession)
throw new IllegalStateException("Not allowed for a QueueSession");
if (topic == null)
throw new InvalidDestinationException("Topic cannot be null");
if (topic instanceof TemporaryTopic)
throw new InvalidDestinationException("Attempt to create a durable subscription for a temporary topic");
if (name == null || name.trim().length() == 0)
throw new JMSException("Null or empty subscription");
SpyTopic t = new SpyTopic((SpyTopic) topic, connection.getClientID(), name, messageSelector);
SpyTopicSubscriber sub = new SpyTopicSubscriber(this, t, noLocal, messageSelector);
addConsumer(sub);
return sub;
}
public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
return createSubscriber(topic, null, false);
}
public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
if (topic == null)
throw new InvalidDestinationException("Topic cannot be null");
connection.checkTemporary(topic);
SpyTopicSubscriber sub = new SpyTopicSubscriber(this, (SpyTopic) topic, noLocal, messageSelector);
addConsumer(sub);
return sub;
}
public TopicPublisher createPublisher(Topic topic) throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
SpyTopicPublisher producer = new SpyTopicPublisher(this, topic);
addProducer(producer);
return producer;
}
public Queue createQueue(String queueName) throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
if (this instanceof SpyTopicSession)
throw new IllegalStateException("Not allowed for a TopicSession");
if (queueName == null)
throw new InvalidDestinationException("Queue name cannot be null.");
return ((SpyConnection) connection).createQueue(queueName);
}
public Topic createTopic(String topicName) throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
if (this instanceof SpyQueueSession)
throw new IllegalStateException("Not allowed for a QueueSession");
if (topicName == null)
throw new InvalidDestinationException("The topic name cannot be null");
return ((SpyConnection) connection).createTopic(topicName);
}
public TemporaryQueue createTemporaryQueue() throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
if (this instanceof SpyTopicSession)
throw new IllegalStateException("Not allowed for a TopicSession");
return ((SpyConnection) connection).getTemporaryQueue();
}
public TemporaryTopic createTemporaryTopic() throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
if (this instanceof SpyQueueSession)
throw new IllegalStateException("Not allowed for a QueueSession");
return ((SpyConnection) connection).getTemporaryTopic();
}
public void unsubscribe(String name) throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
if (this instanceof SpyQueueSession)
throw new IllegalStateException("Not allowed for a QueueSession");
DurableSubscriptionID id = new DurableSubscriptionID(connection.getClientID(), name, null);
connection.unsubscribe(id);
}
public XAResource getXAResource()
{
return spyXAResource;
}
public Session getSession() throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
return this;
}
public String toString()
{
StringBuffer buffer = new StringBuffer(100);
buffer.append("SpySession@").append(System.identityHashCode(this));
buffer.append('[');
buffer.append("tx=").append(transacted);
if (transacted == false)
{
if (acknowledgeMode == AUTO_ACKNOWLEDGE)
buffer.append(" ack=").append("AUTO");
else if (acknowledgeMode == CLIENT_ACKNOWLEDGE)
buffer.append(" ack=").append("CLIENT");
else if (acknowledgeMode == DUPS_OK_ACKNOWLEDGE)
buffer.append(" ack=").append("DUPSOK");
}
buffer.append(" txid=" + currentTransactionId);
if (spyXAResource != null)
buffer.append(" XA");
if (running)
buffer.append(" RUNNING");
if (closed)
buffer.append(" CLOSED");
buffer.append(" connection=").append(connection);
buffer.append(']');
return buffer.toString();
}
void setCurrentTransactionId(final Object xid)
{
if (xid == null)
throw new org.jboss.util.NullArgumentException("xid");
if (trace)
log.trace("Setting current tx xid=" + xid + " previous: " + currentTransactionId + " " + this);
this.currentTransactionId = xid;
}
void unsetCurrentTransactionId(final Object xid)
{
if (xid == null)
throw new org.jboss.util.NullArgumentException("xid");
if (trace)
log.trace("Unsetting current tx xid=" + xid + " previous: " + currentTransactionId + " " + this);
if (xid.equals(currentTransactionId))
this.currentTransactionId = null;
}
Object getCurrentTransactionId()
{
return currentTransactionId;
}
String getNewMessageID() throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
return connection.getNewMessageID();
}
void addMessage(SpyMessage message)
{
synchronized (messages)
{
if (trace)
log.trace("Add message msgid=" + message.header.jmsMessageID + " " + this);
messages.addLast(message);
}
}
void addUnacknowlegedMessage(SpyMessage message)
{
if (!transacted)
{
synchronized (unacknowledgedMessages)
{
if (trace)
log.trace("Add unacked message msgid=" + message.header.jmsMessageID + " " + this);
unacknowledgedMessages.add(message);
}
}
}
void sendMessage(SpyMessage m) throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
m.header.producerClientId = connection.getClientID();
if (transacted)
{
if (trace)
log.trace("Adding message to transaction " + m.header.jmsMessageID + " " + this);
connection.spyXAResourceManager.addMessage(currentTransactionId, m.myClone());
}
else
{
if (trace)
log.trace("Sending message to server " + m.header.jmsMessageID + " " + this);
connection.sendToServer(m);
}
}
void addConsumer(SpyMessageConsumer who) throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
synchronized (consumers)
{
if (trace)
log.trace("Adding consumer " + who);
consumers.add(who);
}
try
{
connection.addConsumer(who);
}
catch (JMSSecurityException ex)
{
removeConsumerInternal(who);
throw ex;
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Error adding consumer", t);
}
}
void removeConsumer(SpyMessageConsumer who) throws JMSException
{
connection.removeConsumer(who);
removeConsumerInternal(who);
}
void addProducer(SpyMessageProducer who) throws JMSException
{
if (closed)
throw new IllegalStateException("The session is closed");
synchronized (producers)
{
if (trace)
log.trace("Adding producer " + who);
producers.add(who);
}
}
void removeProducer(SpyMessageProducer who) throws JMSException
{
removeProducerInternal(who);
}
boolean tryDeliveryLock()
{
synchronized (deliveryLock)
{
if (inDelivery)
{
try
{
deliveryLock.wait();
}
catch (InterruptedException e)
{
log.trace("Ignored interruption waiting for delivery lock");
}
}
if (inDelivery == false)
{
inDelivery = true;
return true;
}
}
return false;
}
void releaseDeliveryLock()
{
synchronized (deliveryLock)
{
inDelivery = false;
deliveryLock.notifyAll();
}
}
void interruptDeliveryLockWaiters()
{
synchronized (deliveryLock)
{
deliveryLock.notifyAll();
}
}
void asynchFailure(String message, Throwable t)
{
connection.asynchFailure(message, t);
}
private void internalRollback() throws JMSException
{
synchronized (runLock)
{
if (spyXAResource != null)
throw new javax.jms.TransactionInProgressException("Should not be call from a XASession");
if (!transacted)
throw new IllegalStateException("The session is not transacted");
if (trace)
log.trace("Rollback transaction " + this);
try
{
connection.spyXAResourceManager.endTx(currentTransactionId, true);
connection.spyXAResourceManager.rollback(currentTransactionId);
}
catch (Throwable t)
{
SpyJMSException.rethrowAsJMSException("Could not rollback", t);
}
finally
{
unacknowledgedMessages.clear();
try
{
currentTransactionId = connection.spyXAResourceManager.startTx();
if (trace)
log.trace("Current transaction id: " + currentTransactionId + " " + this);
}
catch (Throwable ignore)
{
if (trace)
log.trace("Failed to start tx " + this, ignore);
}
}
}
}
private void removeConsumerInternal(SpyMessageConsumer who)
{
synchronized (consumers)
{
if (trace)
log.trace("Remove consumer " + who);
consumers.remove(who);
}
}
private void removeProducerInternal(SpyMessageProducer who)
{
synchronized (producers)
{
if (trace)
log.trace("Remove producer " + who);
producers.remove(who);
}
}
}