package org.jboss.mq.server;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import org.jboss.logging.Logger;
import org.jboss.mq.AcknowledgementRequest;
import org.jboss.mq.ConnectionToken;
import org.jboss.mq.ReceiveRequest;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.Subscription;
import org.jboss.mq.threadpool.ThreadPool;
import org.jboss.mq.threadpool.Work;
public class ClientConsumer implements Work
{
private static Logger log = Logger.getLogger(ClientConsumer.class);
JMSDestinationManager server;
ConnectionToken connectionToken;
boolean enabled;
boolean closed = false;
HashMap subscriptions = new HashMap();
HashMap removedSubscriptions = new HashMap();
LinkedList blockedSubscriptions = new LinkedList();
private LinkedList messages = new LinkedList();
private boolean enqueued = false;
private static ThreadPool threadPool = null;
public ClientConsumer(JMSDestinationManager server, ConnectionToken connectionToken) throws JMSException
{
this.server = server;
this.connectionToken = connectionToken;
synchronized (ClientConsumer.class)
{
if (threadPool == null)
threadPool = new ThreadPool("Message Pushers", server.threadGroup, 10, true);
}
}
public void setEnabled(boolean enabled) throws JMSException
{
if (log.isTraceEnabled())
log.trace("" + this +"->setEnabled(enabled=" + enabled + ")");
synchronized (blockedSubscriptions)
{
this.enabled = enabled;
if (enabled)
{
for (Iterator it = blockedSubscriptions.iterator(); it.hasNext();)
{
Subscription sub = (Subscription) it.next();
JMSDestination dest = server.getJMSDestination(sub.destination);
if (dest != null)
dest.addReceiver(sub);
}
blockedSubscriptions.clear();
}
}
}
public void queueMessageForSending(RoutedMessage r)
{
synchronized (messages)
{
if (closed)
return;
messages.add(r);
if (!enqueued)
{
threadPool.enqueueWork(this);
enqueued = true;
}
}
}
public void addSubscription(Subscription req) throws JMSException
{
if (log.isTraceEnabled())
log.trace("Adding subscription for: " + req);
req.connectionToken = connectionToken;
req.clientConsumer = this;
JMSDestination jmsdest = server.getJMSDestination(req.destination);
if (jmsdest == null)
throw new InvalidDestinationException("The destination " + req.destination + " does not exist !");
jmsdest.addSubscriber(req);
synchronized (subscriptions)
{
subscriptions.put(new Integer(req.subscriptionId), req);
}
}
public void close()
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("" + this +"->close()");
synchronized (messages)
{
closed = true;
if (enqueued)
{
if (trace)
log.trace("" + this +"->close(): Cancelling work in progress.");
threadPool.cancelWork(this);
enqueued = false;
}
messages.clear();
}
HashMap subscriptionsClone = null;
synchronized (subscriptions)
{
subscriptionsClone = (HashMap) subscriptions.clone();
}
Iterator i = subscriptionsClone.keySet().iterator();
while (i.hasNext())
{
Integer subscriptionId = (Integer) i.next();
try
{
removeSubscription(subscriptionId.intValue());
}
catch (JMSException ignore)
{
}
}
HashMap removedSubsClone = null;
synchronized (subscriptions)
{
removedSubsClone = (HashMap) removedSubscriptions.clone();
}
i = removedSubsClone.values().iterator();
while (i.hasNext())
{
Subscription removed = (Subscription) i.next();
JMSDestination queue = server.getJMSDestination(removed.destination);
if (queue == null)
log.warn("The subscription was registered with a destination that does not exist: " + removed);
else
{
try
{
queue.nackMessages(removed);
}
catch (JMSException e)
{
log.warn("Unable to nack removed subscription: " + removed, e);
}
}
removeRemovedSubscription(removed.subscriptionId);
}
}
public SpyMessage receive(int subscriberId, long wait) throws JMSException
{
Subscription req = getSubscription(subscriberId);
if (req == null)
{
throw new JMSException("The provided subscription does not exist");
}
JMSDestination queue = server.getJMSDestination(req.destination);
if (queue == null)
throw new InvalidDestinationException("The subscription's destination " + req.destination + " does not exist");
if (addBlockedSubscription(req, wait))
return queue.receive(req, (wait != -1));
return null;
}
public void removeSubscription(int subscriptionId) throws JMSException
{
if (log.isTraceEnabled())
log.trace("" + this +"->removeSubscription(subscriberId=" + subscriptionId + ")");
Integer subId = new Integer(subscriptionId);
Subscription req;
synchronized (subscriptions)
{
req = (Subscription) subscriptions.remove(subId);
if (req != null)
removedSubscriptions.put(subId, req);
}
if (req == null)
throw new JMSException("The subscription had not been previously registered");
JMSDestination queue = server.getPossiblyClosingJMSDestination(req.destination);
if (queue == null)
throw new InvalidDestinationException("The subscription was registered with a destination that does not exist !");
queue.removeSubscriber(req);
}
public void doWork()
{
try
{
ReceiveRequest[] job;
synchronized (messages)
{
if (closed)
return;
job = new ReceiveRequest[messages.size()];
Iterator iter = messages.iterator();
for (int i = 0; iter.hasNext(); i++)
{
RoutedMessage rm = (RoutedMessage) iter.next();
job[i] = rm.toReceiveRequest();
iter.remove();
}
enqueued = false;
}
connectionToken.clientIL.receive(job);
}
catch (Throwable t)
{
synchronized (messages)
{
if (closed)
log.warn("Could not send messages to a receiver.", t);
else
log.trace("Could not send messages to a receiver. It is closed.", t);
}
try
{
server.connectionFailure(connectionToken);
}
catch (Throwable ignore)
{
log.warn("Could not close the client connection..", ignore);
}
}
}
public String toString()
{
return "ClientConsumer:" + connectionToken.getClientID();
}
public void acknowledge(AcknowledgementRequest request, org.jboss.mq.pm.Tx txId) throws JMSException
{
Subscription sub = retrieveSubscription(request.subscriberId);
if (sub == null)
{
synchronized (subscriptions)
{
sub = (Subscription) removedSubscriptions.get(new Integer(request.subscriberId));
}
}
if (sub == null)
{
throw new JMSException("The provided subscription does not exist");
}
JMSDestination queue = server.getJMSDestination(sub.destination);
if (queue == null)
throw new InvalidDestinationException("The subscription's destination " + sub.destination + " does not exist");
queue.acknowledge(request, sub, txId);
}
boolean addBlockedSubscription(Subscription sub, long wait)
{
synchronized (blockedSubscriptions)
{
if (enabled == false && wait != -1)
blockedSubscriptions.add(sub);
return enabled;
}
}
void removeRemovedSubscription(int subId)
{
Subscription sub = null;
synchronized (subscriptions)
{
sub = (Subscription) removedSubscriptions.remove(new Integer(subId));
}
if (sub != null)
{
JMSDestination topic = server.getPossiblyClosingJMSDestination(sub.destination);
if (topic != null && topic instanceof JMSTopic)
((JMSTopic) topic).cleanupSubscription(sub);
}
}
public Subscription getSubscription(int subscriberId) throws JMSException
{
Subscription req = retrieveSubscription(subscriberId);
if (req == null)
throw new JMSException("The provided subscription does not exist");
return req;
}
private Subscription retrieveSubscription(int subscriberId) throws JMSException
{
Integer id = new Integer(subscriberId);
synchronized (subscriptions)
{
return (Subscription) subscriptions.get(id);
}
}
}