package org.jboss.mq.server;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import javax.jms.Destination;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import org.jboss.logging.Logger;
import org.jboss.mq.AcknowledgementRequest;
import org.jboss.mq.ConnectionToken;
import org.jboss.mq.DurableSubscriptionID;
import org.jboss.mq.SpyDestination;
import org.jboss.mq.SpyJMSException;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.SpyQueue;
import org.jboss.mq.SpyTemporaryQueue;
import org.jboss.mq.SpyTemporaryTopic;
import org.jboss.mq.SpyTopic;
import org.jboss.mq.SpyTransactionRolledBackException;
import org.jboss.mq.Subscription;
import org.jboss.mq.TransactionRequest;
import org.jboss.mq.pm.PersistenceManager;
import org.jboss.mq.pm.Tx;
import org.jboss.mq.sm.StateManager;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
public class JMSDestinationManager extends JMSServerInterceptorSupport
{
static Logger log = Logger.getLogger(JMSDestinationManager.class);
public final static String JBOSS_VESION = "JBossMQ Version 3.2";
public Map destinations = new ConcurrentReaderHashMap();
public Map closingDestinations = new ConcurrentReaderHashMap();
public ThreadGroup threadGroup = new ThreadGroup("JBossMQ Server Threads");
Map clientConsumers = new ConcurrentReaderHashMap();
private int lastID = 1;
private int lastTemporaryTopic = 1;
private Object lastTemporaryTopicLock = new Object();
private int lastTemporaryQueue = 1;
private Object lastTemporaryQueueLock = new Object();
private StateManager stateManager;
private PersistenceManager persistenceManager;
private MessageCache messageCache;
private Object stateLock = new Object();
private Object idLock = new Object();
private boolean stopped = true;
BasicQueueParameters parameters;
public JMSDestinationManager(BasicQueueParameters parameters)
{
this.parameters = parameters;
}
public void setEnabled(ConnectionToken dc, boolean enabled) throws JMSException
{
ClientConsumer ClientConsumer = getClientConsumer(dc);
ClientConsumer.setEnabled(enabled);
}
public void setStateManager(StateManager newStateManager)
{
stateManager = newStateManager;
}
public void setPersistenceManager(org.jboss.mq.pm.PersistenceManager newPersistenceManager)
{
persistenceManager = newPersistenceManager;
}
public boolean isStopped()
{
synchronized (stateLock)
{
return this.stopped;
}
}
public int getClientCount()
{
return clientConsumers.size();
}
public HashMap getClients()
{
return new HashMap(clientConsumers);
}
public ThreadGroup getThreadGroup()
{
return threadGroup;
}
public String getID()
{
String ID = null;
while (true)
{
try
{
synchronized (idLock)
{
ID = "ID:" + (new Integer(lastID++).toString());
}
stateManager.addLoggedOnClientId(ID);
break;
}
catch (Exception e)
{
}
}
return ID;
}
public TemporaryTopic getTemporaryTopic(ConnectionToken dc) throws JMSException
{
String topicName;
synchronized (lastTemporaryTopicLock)
{
topicName = "JMS_TT" + (new Integer(lastTemporaryTopic++).toString());
}
SpyTemporaryTopic topic = new SpyTemporaryTopic(topicName, dc);
ClientConsumer ClientConsumer = getClientConsumer(dc);
JMSDestination queue = new JMSTopic(topic, ClientConsumer, this, parameters);
destinations.put(topic, queue);
return topic;
}
public TemporaryQueue getTemporaryQueue(ConnectionToken dc) throws JMSException
{
String queueName;
synchronized (lastTemporaryQueueLock)
{
queueName = "JMS_TQ" + (new Integer(lastTemporaryQueue++).toString());
}
SpyTemporaryQueue newQueue = new SpyTemporaryQueue(queueName, dc);
ClientConsumer ClientConsumer = getClientConsumer(dc);
JMSDestination queue = new JMSQueue(newQueue, ClientConsumer, this, parameters);
destinations.put(newQueue, queue);
return newQueue;
}
public ClientConsumer getClientConsumer(ConnectionToken dc) throws JMSException
{
ClientConsumer cq = (ClientConsumer) clientConsumers.get(dc);
if (cq == null)
{
cq = new ClientConsumer(this, dc);
clientConsumers.put(dc, cq);
}
return cq;
}
public JMSDestination getJMSDestination(SpyDestination dest)
{
return (JMSDestination) destinations.get(dest);
}
protected JMSDestination getPossiblyClosingJMSDestination(SpyDestination dest)
{
JMSDestination result = (JMSDestination) destinations.get(dest);
if (result == null)
result = (JMSDestination) closingDestinations.get(dest);
return result;
}
public StateManager getStateManager()
{
return stateManager;
}
public org.jboss.mq.pm.PersistenceManager getPersistenceManager()
{
return persistenceManager;
}
public void startServer()
{
synchronized (stateLock)
{
this.stopped = false;
}
}
public void stopServer()
{
synchronized (stateLock)
{
this.stopped = true;
}
}
public void checkID(String ID) throws JMSException
{
stateManager.addLoggedOnClientId(ID);
}
public void addMessage(ConnectionToken dc, SpyMessage val) throws JMSException
{
addMessage(dc, val, null);
}
public void addMessage(ConnectionToken dc, SpyMessage val, Tx txId) throws JMSException
{
JMSDestination queue = (JMSDestination) destinations.get(val.getJMSDestination());
if (queue == null)
throw new InvalidDestinationException("This destination does not exist! " + val.getJMSDestination());
val.setJMSRedelivered(false);
val.header.jmsProperties.remove(SpyMessage.PROPERTY_REDELIVERY_COUNT);
val.setReadOnlyMode();
queue.addMessage(val, txId);
}
public void transact(ConnectionToken dc, TransactionRequest t) throws JMSException
{
org.jboss.mq.pm.TxManager txManager = persistenceManager.getTxManager();
if (t.requestType == TransactionRequest.ONE_PHASE_COMMIT_REQUEST)
{
Tx txId = txManager.createTx();
try
{
if (t.messages != null)
{
for (int i = 0; i < t.messages.length; i++)
{
addMessage(dc, t.messages[i], txId);
}
}
if (t.acks != null)
{
for (int i = 0; i < t.acks.length; i++)
{
acknowledge(dc, t.acks[i], txId);
}
}
txManager.commitTx(txId);
}
catch (JMSException e)
{
log.debug("Exception occured, rolling back transaction: ", e);
txManager.rollbackTx(txId);
throw new SpyTransactionRolledBackException("Transaction was rolled back.", e);
}
}
else if (t.requestType == TransactionRequest.TWO_PHASE_COMMIT_PREPARE_REQUEST)
{
Tx txId = txManager.createTx(dc, t.xid);
try
{
if (t.messages != null)
{
for (int i = 0; i < t.messages.length; i++)
{
addMessage(dc, t.messages[i], txId);
}
}
if (t.acks != null)
{
for (int i = 0; i < t.acks.length; i++)
{
acknowledge(dc, t.acks[i], txId);
}
}
}
catch (JMSException e)
{
log.debug("Exception occured, rolling back transaction: ", e);
txManager.rollbackTx(txId);
throw new SpyTransactionRolledBackException("Transaction was rolled back.", e);
}
}
else if (t.requestType == TransactionRequest.TWO_PHASE_COMMIT_ROLLBACK_REQUEST)
{
Tx txId = txManager.getPrepared(dc, t.xid);
txManager.rollbackTx(txId);
}
else if (t.requestType == TransactionRequest.TWO_PHASE_COMMIT_COMMIT_REQUEST)
{
Tx txId = txManager.getPrepared(dc, t.xid);
txManager.commitTx(txId);
}
}
public void acknowledge(ConnectionToken dc, AcknowledgementRequest item) throws JMSException
{
acknowledge(dc, item, null);
}
public void acknowledge(ConnectionToken dc, AcknowledgementRequest item, Tx txId) throws JMSException
{
ClientConsumer cc = getClientConsumer(dc);
cc.acknowledge(item, txId);
}
public void connectionClosing(ConnectionToken dc) throws JMSException
{
if (dc == null)
return;
ClientConsumer cq = (ClientConsumer) clientConsumers.remove(dc);
if (cq != null)
cq.close();
if (dc.getClientID() != null)
stateManager.removeLoggedOnClientId(dc.getClientID());
Iterator i = destinations.entrySet().iterator();
while (i.hasNext())
{
Map.Entry entry = (Map.Entry) i.next();
JMSDestination sq = (JMSDestination) entry.getValue();
ClientConsumer cc = sq.temporaryDestination;
if (cc != null && dc.equals(cc.connectionToken))
{
i.remove();
deleteTemporaryDestination(dc, sq);
}
}
try
{
if (dc.clientIL != null)
dc.clientIL.close();
}
catch (Exception ex)
{
}
}
public void connectionFailure(ConnectionToken dc) throws JMSException
{
log.error("The connection to client " + dc.getClientID() + " failed.");
connectionClosing(dc);
}
public void subscribe(ConnectionToken dc, Subscription sub) throws JMSException
{
ClientConsumer clientConsumer = getClientConsumer(dc);
clientConsumer.addSubscription(sub);
}
public void unsubscribe(ConnectionToken dc, int subscriptionId) throws JMSException
{
ClientConsumer clientConsumer = getClientConsumer(dc);
clientConsumer.removeSubscription(subscriptionId);
}
public void destroySubscription(ConnectionToken dc, DurableSubscriptionID id) throws JMSException
{
getStateManager().setDurableSubscription(this, id, null);
}
public SpyMessage[] browse(ConnectionToken dc, Destination dest, String selector) throws JMSException
{
JMSDestination queue = (JMSDestination) destinations.get(dest);
if (queue == null)
throw new InvalidDestinationException("That destination does not exist! " + dest);
if (!(queue instanceof JMSQueue))
throw new JMSException("That destination is not a queue");
return ((JMSQueue) queue).browse(selector);
}
public SpyMessage receive(ConnectionToken dc, int subscriberId, long wait) throws JMSException
{
ClientConsumer clientConsumer = getClientConsumer(dc);
SpyMessage msg = clientConsumer.receive(subscriberId, wait);
return msg;
}
public Queue createQueue(ConnectionToken dc, String name) throws JMSException
{
SpyQueue newQueue = new SpyQueue(name);
if (!destinations.containsKey(newQueue))
{
throw new JMSException("This destination does not exist !");
}
return newQueue;
}
public Topic createTopic(ConnectionToken dc, String name) throws JMSException
{
SpyTopic newTopic = new SpyTopic(name);
if (!destinations.containsKey(newTopic))
{
throw new JMSException("This destination does not exist !");
}
return newTopic;
}
public void deleteTemporaryDestination(ConnectionToken dc, SpyDestination dest)
throws JMSException
{
JMSDestination destination = (JMSDestination) destinations.get(dest);
if (destination == null)
throw new InvalidDestinationException("That destination does not exist! " + destination);
if (destination.isInUse())
throw new JMSException("Cannot delete temporary queue, it is in use.");
destinations.remove(dest);
deleteTemporaryDestination(dc, destination);
}
protected void deleteTemporaryDestination(ConnectionToken dc, JMSDestination destination)
throws JMSException
{
try
{
destination.removeAllMessages();
}
catch (Exception e)
{
log.error("An exception happened while removing all messages from temporary destination "
+ destination.getSpyDestination().getName(), e);
}
}
public String checkUser(String userName, String password) throws JMSException
{
return stateManager.checkUser(userName, password);
}
public String authenticate(String id, String password) throws JMSException
{
return null;
}
public void addDestination(JMSDestination destination) throws JMSException
{
if (destinations.containsKey(destination.getSpyDestination()))
throw new JMSException("This destination has already been added to the server!");
destinations.put(destination.getSpyDestination(), destination);
if (destination instanceof JMSTopic)
{
Collection durableSubs =
getStateManager().getDurableSubscriptionIdsForTopic((SpyTopic) destination.getSpyDestination());
for (Iterator i = durableSubs.iterator(); i.hasNext();)
{
DurableSubscriptionID sub = (DurableSubscriptionID) i.next();
log.debug("creating the durable subscription for :" + sub);
((JMSTopic) destination).createDurableSubscription(sub);
}
}
}
public void closeDestination(SpyDestination dest) throws JMSException
{
JMSDestination destination = (JMSDestination) destinations.remove(dest);
if (destination == null)
throw new InvalidDestinationException("This destination is not open! " + dest);
log.debug("Closing destination " + dest);
closingDestinations.put(dest, destination);
try
{
destination.close();
}
finally
{
closingDestinations.remove(dest);
}
}
public String toString()
{
return JBOSS_VESION;
}
public void ping(ConnectionToken dc, long clientTime) throws JMSException
{
try
{
dc.clientIL.pong(System.currentTimeMillis());
}
catch (Exception e)
{
throw new SpyJMSException("Could not pong", e);
}
}
public MessageCache getMessageCache()
{
return messageCache;
}
public void setMessageCache(MessageCache messageCache)
{
this.messageCache = messageCache;
}
public SpyTopic getDurableTopic(DurableSubscriptionID sub) throws JMSException
{
return getStateManager().getDurableTopic(sub);
}
public Subscription getSubscription(ConnectionToken dc, int subscriberId) throws JMSException
{
ClientConsumer clientConsumer = getClientConsumer(dc);
return clientConsumer.getSubscription(subscriberId);
}
public MessageCounter[] getMessageCounter()
{
TreeMap map = new TreeMap();
Iterator i = destinations.values().iterator();
while (i.hasNext())
{
JMSDestination dest = (JMSDestination) i.next();
MessageCounter[] counter = dest.getMessageCounter();
for (int j = 0; j < counter.length; j++)
{
String key =
counter[j].getDestinationName()
+ "-"
+ counter[j].getDestinationSubscription()
+ "-"
+ (counter[j].getDestinationTopic() ? "Topic" : "Queue");
map.put(key, counter[j]);
}
}
return (MessageCounter[]) map.values().toArray(new MessageCounter[0]);
}
public void resetMessageCounter()
{
Iterator i = destinations.values().iterator();
while (i.hasNext())
{
JMSDestination dest = (JMSDestination) i.next();
MessageCounter[] counter = dest.getMessageCounter();
for (int j = 0; j < counter.length; j++)
{
counter[j].resetCounter();
}
}
}
}