package org.jboss.mq.server;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.TreeMap;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import org.jboss.mq.DestinationFullException;
import org.jboss.mq.DurableSubscriptionID;
import org.jboss.mq.SpyDestination;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.SpyTopic;
import org.jboss.mq.Subscription;
import org.jboss.mq.pm.NewPersistenceManager;
import org.jboss.mq.pm.PersistenceManager;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
public class JMSTopic extends JMSDestination
{
ConcurrentReaderHashMap durQueues = new ConcurrentReaderHashMap();
ConcurrentReaderHashMap tempQueues = new ConcurrentReaderHashMap();
public JMSTopic(SpyDestination dest, ClientConsumer temporary, JMSDestinationManager server, BasicQueueParameters parameters) throws JMSException
{
super(dest, temporary, server, parameters);
PersistenceManager pm = server.getPersistenceManager();
parameters.lateClone = (pm instanceof NewPersistenceManager);
}
public void addSubscriber(Subscription sub) throws JMSException
{
SpyTopic topic = (SpyTopic) sub.destination;
DurableSubscriptionID id = topic.getDurableSubscriptionID();
if (id == null)
{
ExclusiveQueue q = new ExclusiveQueue(server, destination, sub, parameters);
q.createMessageCounter(destination.getName(), q.getDescription(), true, false, parameters.messageCounterHistoryDayLimit);
tempQueues.put(sub, q);
q.addSubscriber(sub);
}
else
{
PersistentQueue q = (PersistentQueue) durQueues.get(id);
if (q != null && q.isInUse())
throw new IllegalStateException("The durable subscription is already in use. " + id);
boolean selectorChanged = false;
if (q != null)
{
String newSelector = sub.messageSelector;
String oldSelector = null;
if (q instanceof SelectorPersistentQueue)
oldSelector = ((SelectorPersistentQueue) q).selectorString;
if ((newSelector == null && oldSelector != null)
|| (newSelector != null && newSelector.equals(oldSelector) == false))
selectorChanged = true;
}
if (q == null || !q.destination.equals(topic) || selectorChanged)
{
server.getStateManager().setDurableSubscription(server, id, topic);
synchronized (durQueues)
{
q = (PersistentQueue) durQueues.get(id);
}
}
q.addSubscriber(sub);
}
}
public void removeSubscriber(Subscription sub) throws JMSException
{
BasicQueue queue = null;
SpyTopic topic = (SpyTopic) sub.destination;
DurableSubscriptionID id = topic.getDurableSubscriptionID();
if (id == null)
queue = (BasicQueue) tempQueues.get(sub);
else
queue = (BasicQueue) durQueues.get(id);
if (queue == null)
((ClientConsumer) sub.clientConsumer).removeRemovedSubscription(sub.subscriptionId);
else
queue.removeSubscriber(sub);
}
public void nackMessages(Subscription sub) throws JMSException
{
BasicQueue queue = null;
SpyTopic topic = (SpyTopic) sub.destination;
DurableSubscriptionID id = topic.getDurableSubscriptionID();
if (id == null)
queue = (BasicQueue) tempQueues.get(sub);
else
queue = (BasicQueue) durQueues.get(id);
if (queue != null)
{
queue.nackMessages(sub);
}
}
void cleanupSubscription(Subscription sub)
{
BasicQueue queue = (BasicQueue) tempQueues.remove(sub);
try
{
if (queue != null)
queue.removeAllMessages();
}
catch (JMSException e)
{
cat.debug("Error removing messages for subscription " + sub, e);
}
}
public void addReceiver(Subscription sub) throws JMSException
{
getQueue(sub).addReceiver(sub);
}
public void removeReceiver(Subscription sub)
{
getQueue(sub).removeReceiver(sub);
}
public void restoreMessage(MessageReference messageRef)
{
try
{
SpyMessage spyMessage = messageRef.getMessage();
updateNextMessageId(spyMessage);
if (spyMessage.header.durableSubscriberID == null)
{
cat.debug("Trying to restore message with null durableSubscriberID");
}
else
{
BasicQueue queue = ((BasicQueue) durQueues.get(spyMessage.header.durableSubscriberID));
messageRef.queue = queue;
queue.restoreMessage(messageRef);
}
}
catch (JMSException e)
{
cat.error("Could not restore message:", e);
}
}
public void restoreMessage(SpyMessage message)
{
try
{
updateNextMessageId(message);
if (message.header.durableSubscriberID == null)
{
cat.debug("Trying to restore message with null durableSubscriberID");
}
else
{
BasicQueue queue = (BasicQueue) durQueues.get(message.header.durableSubscriberID);
MessageReference messageRef = server.getMessageCache().add(message, queue, MessageReference.STORED);
queue.restoreMessage(messageRef);
}
}
catch (JMSException e)
{
cat.error("Could not restore message:", e);
}
}
public void restoreMessage(SpyMessage message, DurableSubscriptionID id)
{
try
{
updateNextMessageId(message);
if (id == null)
{
cat.debug("Trying to restore message with null durableSubscriberID");
}
else
{
BasicQueue queue = (BasicQueue) durQueues.get(id);
MessageReference messageRef = server.getMessageCache().add(message, queue, MessageReference.STORED, id);
queue.restoreMessage(messageRef);
}
}
catch (JMSException e)
{
cat.error("Could not restore message:", e);
}
}
public void createDurableSubscription(DurableSubscriptionID id) throws JMSException
{
if (temporaryDestination != null)
throw new JMSException("Not a valid operation on a temporary topic");
SpyTopic dstopic = new SpyTopic((SpyTopic) destination, id);
BasicQueue queue;
if (id.getSelector() == null)
queue = new PersistentQueue(server, dstopic, parameters);
else
queue = new SelectorPersistentQueue(server, dstopic, id.getSelector(), parameters);
queue.createMessageCounter(destination.getName(), id.toString(), true, true, parameters.messageCounterHistoryDayLimit);
durQueues.put(id, queue);
server.getPersistenceManager().restoreQueue(this, dstopic);
}
public void close() throws JMSException
{
if (temporaryDestination != null)
throw new JMSException("Not a valid operation on a temporary topic");
Iterator i = tempQueues.values().iterator();
while (i.hasNext())
{
ExclusiveQueue queue = (ExclusiveQueue) i.next();
queue.stop();
}
i = durQueues.values().iterator();
while (i.hasNext())
{
PersistentQueue queue = (PersistentQueue) i.next();
queue.stop();
server.getPersistenceManager().closeQueue(this, queue.getSpyDestination());
}
}
public void destroyDurableSubscription(DurableSubscriptionID id) throws JMSException
{
BasicQueue queue = (BasicQueue) durQueues.remove(id);
queue.removeAllMessages();
}
public SpyMessage receive(Subscription sub, boolean wait) throws javax.jms.JMSException
{
return getQueue(sub).receive(sub, wait);
}
public void acknowledge(org.jboss.mq.AcknowledgementRequest req, Subscription sub, org.jboss.mq.pm.Tx txId)
throws JMSException
{
getQueue(sub).acknowledge(req, txId);
}
public void addMessage(SpyMessage message, org.jboss.mq.pm.Tx txId) throws JMSException
{
StringBuffer errorMessage = null;
boolean added = false;
long messageId = nextMessageId();
if (parameters.lateClone)
message.header.messageId = messageId;
Iterator iter = durQueues.keySet().iterator();
while (iter.hasNext())
{
DurableSubscriptionID id = (DurableSubscriptionID) iter.next();
PersistentQueue q = (PersistentQueue) durQueues.get(id);
MessageReference ref;
if (parameters.lateClone)
{
ref = server.getMessageCache().add(message, q, MessageReference.NOT_STORED, id);
}
else
{
SpyMessage clone = message.myClone();
clone.header.durableSubscriberID = id;
clone.header.messageId = messageId;
clone.setJMSDestination(q.getSpyDestination());
ref = server.getMessageCache().add(clone, q, MessageReference.NOT_STORED);
}
try
{
if (added == false && parameters.lateClone && ref.isPersistent())
{
NewPersistenceManager pm = (NewPersistenceManager) server.getPersistenceManager();
pm.addMessage(message);
added = true;
}
q.addMessage(ref, txId);
}
catch (DestinationFullException e)
{
if (errorMessage == null)
errorMessage = new StringBuffer(e.getText());
else
errorMessage.append(", ").append(e.getText());
}
}
iter = tempQueues.values().iterator();
while (iter.hasNext())
{
BasicQueue q = (BasicQueue) iter.next();
MessageReference ref;
if (parameters.lateClone)
{
ref = server.getMessageCache().add(message, q, MessageReference.NOT_STORED);
}
else
{
SpyMessage clone = message.myClone();
clone.header.messageId = messageId;
ref = server.getMessageCache().add(clone, q, MessageReference.NOT_STORED);
}
try
{
q.addMessage(ref, txId);
}
catch (DestinationFullException e)
{
if (errorMessage == null)
errorMessage = new StringBuffer(e.getText());
else
errorMessage.append(", ").append(e.getText());
}
}
if (errorMessage != null)
throw new DestinationFullException(errorMessage.toString());
}
public int getAllMessageCount()
{
return calculateMessageCount(getAllQueues());
}
public int getDurableMessageCount()
{
return calculateMessageCount(getPersistentQueues());
}
public int getNonDurableMessageCount()
{
return calculateMessageCount(getTemporaryQueues());
}
public ArrayList getAllQueues()
{
ArrayList result = new ArrayList(getAllSubscriptionsCount());
result.addAll(getPersistentQueues());
result.addAll(getTemporaryQueues());
return result;
}
public ArrayList getTemporaryQueues()
{
return new ArrayList(tempQueues.values());
}
public ArrayList getPersistentQueues()
{
return new ArrayList(durQueues.values());
}
public int getAllSubscriptionsCount()
{
return durQueues.size() + tempQueues.size();
}
public int getDurableSubscriptionsCount()
{
return durQueues.size();
}
public int getNonDurableSubscriptionsCount()
{
return tempQueues.size();
}
public ArrayList getAllSubscriptions()
{
ArrayList result = new ArrayList(getAllSubscriptionsCount());
result.addAll(getDurableSubscriptions());
result.addAll(getNonDurableSubscriptions());
return result;
}
public ArrayList getDurableSubscriptions()
{
return new ArrayList(durQueues.keySet());
}
public ArrayList getNonDurableSubscriptions()
{
return new ArrayList(tempQueues.keySet());
}
PersistentQueue getDurableSubscription(DurableSubscriptionID id)
{
return (PersistentQueue) durQueues.get(id);
}
private BasicQueue getQueue(Subscription sub)
{
SpyTopic topic = (SpyTopic) sub.destination;
DurableSubscriptionID id = topic.getDurableSubscriptionID();
if (id != null)
return getDurableSubscription(id);
else
return (BasicQueue) tempQueues.get(sub);
}
public boolean isInUse()
{
if (tempQueues.size() > 0)
return true;
Iterator iter = durQueues.values().iterator();
while (iter.hasNext())
{
PersistentQueue q = (PersistentQueue) iter.next();
if (q.isInUse())
return true;
}
return false;
}
public void removeAllMessages() throws JMSException
{
Iterator i = durQueues.values().iterator();
while (i.hasNext())
{
PersistentQueue queue = (PersistentQueue) i.next();
queue.removeAllMessages();
}
}
private int calculateMessageCount(ArrayList queues)
{
int count = 0;
for (Iterator i = queues.listIterator(); i.hasNext();)
{
BasicQueue queue = (BasicQueue) i.next();
count += queue.getQueueDepth();
}
return count;
}
public MessageCounter[] getMessageCounter()
{
TreeMap map = new TreeMap();
Iterator i = durQueues.values().iterator();
while (i.hasNext())
{
BasicQueue queue = (BasicQueue) i.next();
MessageCounter counter = queue.getMessageCounter();
if (counter != null)
{
String key = counter.getDestinationName() + counter.getDestinationSubscription();
map.put(key, counter);
}
}
i = tempQueues.values().iterator();
while (i.hasNext())
{
BasicQueue queue = (BasicQueue) i.next();
MessageCounter counter = queue.getMessageCounter();
if (counter != null)
{
String key = counter.getDestinationName() + counter.getDestinationSubscription();
map.put(key, counter);
}
}
return (MessageCounter[]) map.values().toArray(new MessageCounter[0]);
}
}