package org.jboss.jms.client.p2p;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.transaction.xa.XAResource;
import org.jboss.jms.BytesMessageImpl;
import org.jboss.jms.MapMessageImpl;
import org.jboss.jms.MessageImpl;
import org.jboss.jms.ObjectMessageImpl;
import org.jboss.jms.StreamMessageImpl;
import org.jboss.jms.TextMessageImpl;
import org.jboss.jms.client.BrowserDelegate;
import org.jboss.jms.client.ConsumerDelegate;
import org.jboss.jms.client.ProducerDelegate;
import org.jboss.jms.client.SessionDelegate;
public class P2PSessionDelegate
implements SessionDelegate
{
private P2PConnectionDelegate connection = null;
private int acknowledgeMode;
private boolean closed = false; private MessageListener messageListener = null;
private boolean transacted = false;
private List messageConsumers = new ArrayList();
private List messageProducers = new ArrayList();
private List queueBrowsers = new ArrayList();
private Map unacknowledgedMessageMap = new TreeMap();
private long nextDeliveryId = 0;
private boolean recovering = false;
private Object recoveryLock = new Object();
private List uncommittedMessages = new ArrayList();
public P2PSessionDelegate(P2PConnectionDelegate connection, boolean transaction, int acknowledgeMode)
throws JMSException
{
this.connection = connection;
this.transacted = transaction;
this.acknowledgeMode = acknowledgeMode;
}
public void close() throws JMSException
{
if (!this.closed)
{
if (this.transacted)
{
this.rollback();
}
Iterator iterator = this.messageConsumers.iterator();
while (iterator.hasNext())
{
((ConsumerDelegate) iterator.next()).close();
iterator.remove();
}
iterator = this.messageProducers.iterator();
while (iterator.hasNext())
{
((ProducerDelegate) iterator.next()).close();
}
iterator = this.queueBrowsers.iterator();
while (iterator.hasNext())
{
((BrowserDelegate) iterator.next()).close();
}
this.closed = true;
}
}
public void closing() throws JMSException
{
}
public void commit() throws JMSException
{
this.throwExceptionIfClosed();
if (this.transacted)
{
this.recovering = true;
if (this.uncommittedMessages.size() > 0)
{
this.connection.send((Collection) ((ArrayList) this.uncommittedMessages).clone());
}
this.unacknowledgedMessageMap.clear();
this.uncommittedMessages.clear();
this.recovering = false;
synchronized (this.recoveryLock)
{
this.recoveryLock.notify();
}
}
else
{
throw new IllegalStateException("Illegal Operation: This is not a transacted Session.");
}
}
public BrowserDelegate createBrowser(Queue queue, String selector) throws JMSException
{
this.throwExceptionIfClosed();
return new P2PBrowserDelegate(this, queue, selector);
}
public BytesMessage createBytesMessage() throws JMSException
{
this.throwExceptionIfClosed();
return new BytesMessageImpl();
}
public ConsumerDelegate createConsumer(
Destination destination,
String subscription,
String selector,
boolean noLocal)
throws JMSException
{
this.throwExceptionIfClosed();
ConsumerDelegate messageConsumer = new P2PConsumerDelegate(this, destination, selector, noLocal);
this.messageConsumers.add(messageConsumer);
return messageConsumer;
}
public MapMessage createMapMessage() throws JMSException
{
this.throwExceptionIfClosed();
return new MapMessageImpl();
}
public javax.jms.Message createMessage() throws JMSException
{
this.throwExceptionIfClosed();
return new MessageImpl();
}
public ObjectMessage createObjectMessage(Serializable object) throws JMSException
{
this.throwExceptionIfClosed();
return new ObjectMessageImpl(object);
}
public ProducerDelegate createProducer(Destination destination) throws JMSException
{
this.throwExceptionIfClosed();
ProducerDelegate messageProducer = new P2PProducerDelegate(this, destination);
this.messageProducers.add(messageProducer);
return messageProducer;
}
public StreamMessage createStreamMessage() throws JMSException
{
this.throwExceptionIfClosed();
return new StreamMessageImpl();
}
public Destination createTempDestination(int type) throws JMSException
{
return null;
}
public TextMessage createTextMessage(String text) throws JMSException
{
this.throwExceptionIfClosed();
return new TextMessageImpl(text);
}
public Destination getDestination(String name) throws JMSException
{
return null;
}
public XAResource getXAResource()
{
return null;
}
public void recover() throws JMSException
{
this.throwExceptionIfClosed();
if (this.transacted)
{
throw new IllegalStateException("Illegal Operation: This is a transacted Session. Use rollback instead.");
}
synchronized (this.unacknowledgedMessageMap)
{
this.recovering = true;
Map clone = (Map) ((TreeMap) this.unacknowledgedMessageMap).clone();
this.unacknowledgedMessageMap.clear();
this.restart(clone);
}
}
public void rollback() throws JMSException
{
this.throwExceptionIfClosed();
if (this.transacted)
{
synchronized (this.unacknowledgedMessageMap)
{
this.recovering = true;
Map clone = (Map) ((TreeMap) this.unacknowledgedMessageMap).clone();
this.unacknowledgedMessageMap.clear();
this.restart(clone);
}
this.uncommittedMessages.clear();
}
else
{
throw new IllegalStateException("Illegal Operation: This is not a transacted Session.");
}
}
public void run()
{
}
public void setMessageListener(MessageListener listener) throws JMSException
{
this.throwExceptionIfClosed();
this.messageListener = listener;
}
public void unsubscribe(String name) throws JMSException
{
this.throwExceptionIfClosed();
}
synchronized void send(MessageImpl message) throws JMSException
{
if (this.transacted)
{
this.uncommittedMessages.add(message.clone());
}
else
{
this.connection.send(message);
}
}
public void acknowledge(Message message, boolean acknowledge)
{
if (!this.transacted)
{
synchronized (this.unacknowledgedMessageMap)
{
Iterator iterator = this.unacknowledgedMessageMap.keySet().iterator();
while (iterator.hasNext())
{
Long currentKey = (Long) iterator.next();
if (currentKey.longValue() <= ((MessageImpl) message).deliveryId)
{
iterator.remove();
}
}
}
}
}
void deliver(MessageImpl message)
{
this.deliver(message, false);
}
private void deliver(MessageImpl message, boolean recoveryOperation)
{
if (this.recovering && !recoveryOperation)
{
synchronized (this.recoveryLock)
{
try
{
this.recoveryLock.wait();
}
catch (InterruptedException e)
{
}
}
}
message.setSession(this);
message.setDeliveryId(++this.nextDeliveryId);
Iterator iterator = this.messageConsumers.iterator();
if (this.acknowledgeMode != Session.AUTO_ACKNOWLEDGE)
{
synchronized (unacknowledgedMessageMap)
{
this.unacknowledgedMessageMap.put(new Long(this.nextDeliveryId), message);
}
}
while (iterator.hasNext())
{
((P2PConsumerDelegate) iterator.next()).deliver(message);
}
}
private void throwExceptionIfClosed() throws IllegalStateException
{
if (this.closed)
{
throw new IllegalStateException("The session is closed.");
}
}
private void restart(final Map unacknowledgedMessage)
{
Thread thread = new Thread(new Runnable()
{
public void run()
{
Iterator iterator = unacknowledgedMessage.keySet().iterator();
while (iterator.hasNext())
{
MessageImpl message = (MessageImpl) unacknowledgedMessage.get(iterator.next());
message.setJMSRedelivered(true);
deliver(message, true);
}
recovering = false;
synchronized (recoveryLock)
{
recoveryLock.notify();
}
}
});
thread.start();
}
}