package org.jboss.mq.server;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import org.jboss.logging.Logger;
import org.jboss.mq.AcknowledgementRequest;
import org.jboss.mq.DestinationFullException;
import org.jboss.mq.SpyJMSException;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.Subscription;
import org.jboss.mq.pm.Tx;
import org.jboss.mq.selectors.Selector;
import org.jboss.util.timeout.Timeout;
import org.jboss.util.timeout.TimeoutFactory;
import org.jboss.util.timeout.TimeoutTarget;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
public class BasicQueue
{
static final Logger log = Logger.getLogger(BasicQueue.class);
SortedSet messages = new TreeSet();
ConcurrentHashMap events = new ConcurrentHashMap();
SynchronizedInt scheduledMessageCount = new SynchronizedInt(0);
JMSDestinationManager server;
Receivers receivers;
String description;
MessageCounter counter;
HashMap unacknowledgedMessages = new HashMap();
HashMap unackedByMessageRef = new HashMap();
HashMap unackedBySubscription = new HashMap();
HashSet subscribers = new HashSet();
HashSet removedSubscribers = new HashSet();
BasicQueueParameters parameters;
boolean stopped = false;
public BasicQueue(JMSDestinationManager server, String description, BasicQueueParameters parameters)
throws JMSException
{
this.server = server;
this.description = description;
this.parameters = parameters;
Class receiversImpl = parameters.receiversImpl;
if (receiversImpl == null)
receiversImpl = ReceiversImpl.class;
try
{
receivers = (Receivers) receiversImpl.newInstance();
}
catch (Throwable t)
{
throw new SpyJMSException("Error instantiating receivers implementation: " + receiversImpl, t);
}
}
public String getDescription()
{
return description;
}
public int getReceiversCount()
{
return receivers.size();
}
public ArrayList getReceivers()
{
synchronized (receivers)
{
return receivers.listReceivers();
}
}
public boolean isInUse()
{
synchronized (receivers)
{
return subscribers.size() > 0;
}
}
public void addReceiver(Subscription sub) throws JMSException
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("addReceiver " + sub + " " + this);
MessageReference found = null;
synchronized (messages)
{
if (messages.size() != 0)
{
for (Iterator it = messages.iterator(); it.hasNext();)
{
MessageReference message = (MessageReference) it.next();
try
{
if (message.isExpired())
{
it.remove();
if (trace)
log.trace("message expired: " + message);
dropMessage(message);
}
else if (sub.accepts(message.getHeaders()))
{
it.remove();
found = message;
break;
}
}
catch (JMSException ignore)
{
log.info("Caught unusual exception in addToReceivers.", ignore);
}
}
}
}
if (found != null)
queueMessageForSending(sub, found);
else
addToReceivers(sub);
}
public void addSubscriber(Subscription sub) throws JMSException
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("addSubscriber " + sub + " " + this);
synchronized (receivers)
{
if (stopped)
throw new IllegalStateException("The destination is stopped " + getDescription());
subscribers.add(sub);
}
}
public void removeSubscriber(Subscription sub)
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("removeSubscriber " + sub + " " + this);
synchronized (receivers)
{
removeReceiver(sub);
synchronized (messages)
{
if (hasUnackedMessages(sub))
{
if (trace)
log.trace("Delaying removal of subscriber is has unacked messages " + sub);
removedSubscribers.add(sub);
}
else
{
if (trace)
log.trace("Removing subscriber " + sub);
subscribers.remove(sub);
((ClientConsumer) sub.clientConsumer).removeRemovedSubscription(sub.subscriptionId);
}
}
}
}
public int getQueueDepth()
{
return messages.size();
}
public int getScheduledMessageCount()
{
return scheduledMessageCount.get();
}
public void addMessage(MessageReference mes, Tx txId) throws JMSException
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("addMessage " + mes + " " + txId + " " + this);
try
{
synchronized (receivers)
{
if (stopped)
throw new IllegalStateException("The destination is stopped " + getDescription());
}
if (parameters.maxDepth > 0)
{
synchronized (messages)
{
if (messages.size() >= parameters.maxDepth)
{
dropMessage(mes);
String message = "Maximum size " + parameters.maxDepth +
" exceeded for " + description;
log.warn(message);
throw new DestinationFullException(message);
}
}
}
Runnable task = new AddMessagePostRollBackTask(mes);
server.getPersistenceManager().getTxManager().addPostRollbackTask(txId, task);
task = new AddMessagePostCommitTask(mes);
server.getPersistenceManager().getTxManager().addPostCommitTask(txId, task);
}
catch (JMSException e)
{
dropMessage(mes, txId);
throw e;
}
}
public void restoreMessage(MessageReference mes)
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("restoreMessage " + mes + " " + this);
internalAddMessage(mes);
}
public SpyMessage[] browse(String selector) throws JMSException
{
if (selector == null)
{
SpyMessage list[];
synchronized (messages)
{
list = new SpyMessage[messages.size()];
Iterator iter = messages.iterator();
for (int i = 0; iter.hasNext(); i++)
list[i] = ((MessageReference) iter.next()).getMessageForDelivery();
}
return list;
}
else
{
Selector s = new Selector(selector);
LinkedList selection = new LinkedList();
synchronized (messages)
{
Iterator i = messages.iterator();
while (i.hasNext())
{
MessageReference m = (MessageReference) i.next();
if (s.test(m.getHeaders()))
selection.add(m.getMessageForDelivery());
}
}
SpyMessage list[];
list = new SpyMessage[selection.size()];
list = (SpyMessage[]) selection.toArray(list);
return list;
}
}
public SpyMessage receive(Subscription sub, boolean wait) throws JMSException
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("receive " + sub + " wait=" + wait + " " + this);
MessageReference messageRef = null;
synchronized (receivers)
{
if (stopped)
throw new IllegalStateException("The destination is stopped " + getDescription());
if (sub.getSelector() == null && sub.noLocal == false)
{
synchronized (messages)
{
while (messages.size() != 0)
{
messageRef = (MessageReference) messages.first();
messages.remove(messageRef);
if (messageRef.isExpired())
{
if (trace)
log.trace("message expired: " + messageRef);
dropMessage(messageRef);
messageRef = null;
}
else
break;
}
}
}
else
{
synchronized (messages)
{
Iterator i = messages.iterator();
while (i.hasNext())
{
MessageReference mr = (MessageReference) i.next();
if (mr.isExpired())
{
i.remove();
if (trace)
log.trace("message expired: " + mr);
dropMessage(mr);
}
else if (sub.accepts(mr.getHeaders()))
{
messageRef = mr;
i.remove();
break;
}
}
}
}
if (messageRef == null)
{
if (wait)
addToReceivers(sub);
}
else
{
setupMessageAcknowledgement(sub, messageRef);
}
}
if (messageRef == null)
return null;
return messageRef.getMessageForDelivery();
}
public void acknowledge(AcknowledgementRequest item, Tx txId) throws javax.jms.JMSException
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("acknowledge " + item + " " + txId + " " + this);
UnackedMessageInfo unacked = null;
synchronized (messages)
{
unacked = (UnackedMessageInfo) unacknowledgedMessages.remove(item);
if (unacked == null)
return;
unackedByMessageRef.remove(unacked.messageRef);
HashMap map = (HashMap) unackedBySubscription.get(unacked.sub);
if (map != null)
map.remove(unacked.messageRef);
if (map == null || map.isEmpty())
unackedBySubscription.remove(unacked.sub);
}
MessageReference m = unacked.messageRef;
if (!item.isAck)
{
Runnable task = new RestoreMessageTask(m);
server.getPersistenceManager().getTxManager().addPostCommitTask(txId, task);
}
else
{
if (m.isPersistent())
server.getPersistenceManager().remove(m, txId);
Runnable task = new RestoreMessageTask(m);
server.getPersistenceManager().getTxManager().addPostRollbackTask(txId, task);
task = new RemoveMessageTask(m);
server.getPersistenceManager().getTxManager().addPostCommitTask(txId, task);
}
synchronized (receivers)
{
synchronized (messages)
{
checkRemovedSubscribers(unacked.sub);
}
}
}
public void nackMessages(Subscription sub)
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("nackMessages " + sub + " " + this);
synchronized (receivers)
{
synchronized (messages)
{
int count = 0;
HashMap map = (HashMap) unackedBySubscription.get(sub);
if (map != null)
{
Iterator i = ((HashMap) map.clone()).values().iterator();
while (i.hasNext())
{
AcknowledgementRequest item = (AcknowledgementRequest) i.next();
try
{
acknowledge(item, null);
count++;
}
catch (JMSException ignore)
{
log.debug("Unable to nack message: " + item, ignore);
}
}
if (log.isDebugEnabled())
log.debug("Nacked " + count + " messages for removed subscription " + sub);
}
}
}
}
public void removeAllMessages() throws JMSException
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("removeAllMessages " + this);
clearEvents();
synchronized (receivers)
{
synchronized (messages)
{
Iterator i = ((HashMap) unacknowledgedMessages.clone()).keySet().iterator();
while (i.hasNext())
{
AcknowledgementRequest item = (AcknowledgementRequest) i.next();
try
{
acknowledge(item, null);
}
catch (JMSException ignore)
{
}
}
i = messages.iterator();
while (i.hasNext())
{
MessageReference message = (MessageReference) i.next();
i.remove();
dropMessage(message);
}
}
}
}
public void stop()
{
HashSet subs;
synchronized (receivers)
{
stopped = true;
subs = new HashSet(subscribers);
if (log.isTraceEnabled())
log.trace("Stopping " + this + " with subscribers " + subs);
clearEvents();
}
for (Iterator i = subs.iterator(); i.hasNext();)
{
Subscription sub = (Subscription) i.next();
ClientConsumer consumer = (ClientConsumer) sub.clientConsumer;
try
{
consumer.removeSubscription(sub.subscriptionId);
}
catch (Throwable t)
{
log.warn("Error during stop - removing subscriber " + sub, t);
}
nackMessages(sub);
}
}
public void createMessageCounter(String name, String subscription, boolean topic, boolean durable, int daycountmax)
{
counter = new MessageCounter(name, subscription, this, topic, durable, daycountmax);
}
public MessageCounter getMessageCounter()
{
return counter;
}
public String toString()
{
return super.toString() + "{id=" + description + '}';
}
protected void clearEvents()
{
scheduledMessageCount.set(0);
for (Iterator i = events.entrySet().iterator(); i.hasNext();)
{
Map.Entry entry = (Map.Entry) i.next();
Timeout timeout = (Timeout) entry.getValue();
timeout.cancel();
i.remove();
}
}
protected void clearEvent(MessageReference message)
{
Timeout timeout = (Timeout) events.remove(message);
if (timeout != null)
timeout.cancel();
}
protected void addToReceivers(Subscription sub) throws JMSException
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("addReceiver " + " " + sub + " " + this);
synchronized (receivers)
{
if (stopped)
throw new IllegalStateException("The destination is stopped " + getDescription());
receivers.add(sub);
}
}
protected void removeReceiver(Subscription sub)
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("removeReceiver " + " " + sub + " " + this);
synchronized (receivers)
{
receivers.remove(sub);
}
}
private void internalAddMessage(MessageReference message)
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("internalAddMessage " + " " + message + " " + this);
long ts = message.messageScheduledDelivery;
if (ts > 0 && ts > System.currentTimeMillis())
{
EnqueueMessageTask t = new EnqueueMessageTask(message);
Timeout timeout = TimeoutFactory.createTimeout(ts, t);
events.put(message, timeout);
scheduledMessageCount.increment();
if (trace)
log.trace("scheduled message at " + new Date(ts) + ": " + message);
return;
}
if (message.isExpired())
{
if (trace)
log.trace("message expired: " + message);
dropMessage(message);
return;
}
try
{
Subscription found = null;
synchronized (receivers)
{
if (receivers.size() != 0)
{
for (Iterator it = receivers.iterator(); it.hasNext();)
{
Subscription sub = (Subscription) it.next();
if (sub.accepts(message.getHeaders()))
{
it.remove();
found = sub;
break;
}
}
}
if (found == null)
{
synchronized (messages)
{
messages.add(message);
if (message.messageExpiration > 0)
{
ExpireMessageTask t = new ExpireMessageTask(message);
Timeout timeout = TimeoutFactory.createTimeout(message.messageExpiration, t);
events.put(message, timeout);
}
}
}
}
if (found != null)
queueMessageForSending(found, message);
}
catch (JMSException e)
{
log.error("Caught unusual exception in internalAddMessage.", e);
dropMessage(message);
}
}
protected void queueMessageForSending(Subscription sub, MessageReference message)
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("queueMessageForSending " + " " + sub + " " + message + " " + this);
try
{
setupMessageAcknowledgement(sub, message);
RoutedMessage r = new RoutedMessage();
r.message = message;
r.subscriptionId = new Integer(sub.subscriptionId);
((ClientConsumer) sub.clientConsumer).queueMessageForSending(r);
}
catch (Throwable t)
{
log.warn("Caught unusual exception sending message to receiver.", t);
}
}
protected void setupMessageAcknowledgement(Subscription sub, MessageReference messageRef) throws JMSException
{
SpyMessage message = messageRef.getMessage();
AcknowledgementRequest ack = new AcknowledgementRequest();
ack.destination = message.getJMSDestination();
ack.messageID = message.getJMSMessageID();
ack.subscriberId = sub.subscriptionId;
ack.isAck = false;
synchronized (messages)
{
UnackedMessageInfo unacked = new UnackedMessageInfo(messageRef, sub);
unacknowledgedMessages.put(ack, unacked);
unackedByMessageRef.put(messageRef, ack);
HashMap map = (HashMap) unackedBySubscription.get(sub);
if (map == null)
{
map = new HashMap();
unackedBySubscription.put(sub, map);
}
map.put(messageRef, ack);
}
}
protected void dropMessage(MessageReference message)
{
dropMessage(message, null);
}
protected void dropMessage(MessageReference message, Tx txid)
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("dropMessage " + this + " txid=" + txid);
clearEvent(message);
try
{
if (message.isPersistent())
{
try
{
server.getPersistenceManager().remove(message, txid);
}
catch (JMSException e)
{
try
{
log.warn("Message removed from queue, but not from the persistent store: " + message.getMessage(), e);
}
catch (JMSException x)
{
log.warn("Message removed from queue, but not from the persistent store: " + message, e);
}
}
}
server.getMessageCache().remove(message);
}
catch (JMSException e)
{
log.warn("Error dropping message " + message, e);
}
}
private void checkRemovedSubscribers(Subscription sub)
{
boolean trace = log.isTraceEnabled();
if (removedSubscribers.contains(sub) && hasUnackedMessages(sub) == false)
{
if (trace)
log.trace("Removing subscriber " + sub);
removedSubscribers.remove(sub);
subscribers.remove(sub);
((ClientConsumer) sub.clientConsumer).removeRemovedSubscription(sub.subscriptionId);
}
}
private boolean hasUnackedMessages(Subscription sub)
{
return unackedBySubscription.containsKey(sub);
}
class AddMessagePostRollBackTask implements Runnable
{
MessageReference message;
AddMessagePostRollBackTask(MessageReference m)
{
message = m;
}
public void run()
{
try
{
server.getMessageCache().remove(message);
}
catch (JMSException e)
{
log.error("Could not remove message from the message cache after an add rollback: ", e);
}
}
}
class AddMessagePostCommitTask implements Runnable
{
MessageReference message;
AddMessagePostCommitTask(MessageReference m)
{
message = m;
}
public void run()
{
internalAddMessage(message);
if (counter != null)
{
counter.incrementCounter();
}
}
}
class RestoreMessageTask implements Runnable
{
MessageReference message;
RestoreMessageTask(MessageReference m)
{
message = m;
}
public void run()
{
if (log.isTraceEnabled())
log.trace("Restoring message: " + message);
try
{
message.redelivered();
message.invalidate();
if (message.isPersistent())
server.getPersistenceManager().update(message, null);
}
catch (JMSException e)
{
log.error("Caught unusual exception in restoreMessageTask.", e);
}
internalAddMessage(message);
}
}
class RemoveMessageTask implements Runnable
{
MessageReference message;
RemoveMessageTask(MessageReference m)
{
message = m;
}
public void run()
{
try
{
clearEvent(message);
server.getMessageCache().remove(message);
}
catch (JMSException e)
{
log.error("Could not remove an acknowleged message from the message cache: ", e);
}
}
}
private class EnqueueMessageTask implements TimeoutTarget
{
private MessageReference messageRef;
public EnqueueMessageTask(MessageReference messageRef)
{
this.messageRef = messageRef;
}
public void timedOut(Timeout timeout)
{
if (log.isTraceEnabled())
log.trace("scheduled message delivery: " + messageRef);
events.remove(messageRef);
internalAddMessage(messageRef);
scheduledMessageCount.decrement();
}
}
private class ExpireMessageTask implements TimeoutTarget
{
private MessageReference messageRef;
public ExpireMessageTask(MessageReference messageRef)
{
this.messageRef = messageRef;
}
public void timedOut(Timeout timout)
{
events.remove(messageRef);
synchronized (messages)
{
if (messages.remove(messageRef) == false)
return;
}
if (log.isTraceEnabled())
log.trace("message expired: " + messageRef);
dropMessage(messageRef);
}
}
private static class UnackedMessageInfo
{
public MessageReference messageRef;
public Subscription sub;
public UnackedMessageInfo(MessageReference messageRef, Subscription sub)
{
this.messageRef = messageRef;
this.sub = sub;
}
}
}