/*
 * JBoss, the OpenSource J2EE webOS
 *
 * Distributable under LGPL license.
 * See terms of license at gnu.org.
 */
package org.jboss.mq.server;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;

import javax.jms.Destination;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;

import org.jboss.logging.Logger;
import org.jboss.mq.AcknowledgementRequest;
import org.jboss.mq.ConnectionToken;
import org.jboss.mq.DurableSubscriptionID;
import org.jboss.mq.SpyDestination;
import org.jboss.mq.SpyJMSException;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.SpyQueue;
import org.jboss.mq.SpyTemporaryQueue;
import org.jboss.mq.SpyTemporaryTopic;
import org.jboss.mq.SpyTopic;
import org.jboss.mq.SpyTransactionRolledBackException;
import org.jboss.mq.Subscription;
import org.jboss.mq.TransactionRequest;
import org.jboss.mq.pm.PersistenceManager;
import org.jboss.mq.pm.Tx;
import org.jboss.mq.sm.StateManager;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;

/**
 * This class implements the JMS provider
 *
 * @author    Norbert Lataille (Norbert.Lataille@m4x.org)
 * @author    Hiram Chirino (Cojonudo14@hotmail.com)
 * @author    David Maplesden (David.Maplesden@orion.co.nz)
 * @author <a href="mailto:pra@tim.se">Peter Antman</a>
 * @version   $Revision: 1.17.6.3 $
 */
public class JMSDestinationManager extends JMSServerInterceptorSupport
{
   static Logger log = Logger.getLogger(JMSDestinationManager.class);

   /**
    * Description of the Field
    */
   public final static String JBOSS_VESION = "JBossMQ Version 3.2";

   /////////////////////////////////////////////////////////////////////
   // Attributes
   /////////////////////////////////////////////////////////////////////

   /** Destinations SpyDestination -> JMSDestination */ 
   public Map destinations = new ConcurrentReaderHashMap();
   /** Destinations being closed SpyDestination -> JMSDestination */ 
   public Map closingDestinations = new ConcurrentReaderHashMap();
   //Thread group for server side threads.
   /** Thread group */
   public ThreadGroup threadGroup = new ThreadGroup("JBossMQ Server Threads");

   //The list of ClientConsumers hased by ConnectionTokens
   Map clientConsumers = new ConcurrentReaderHashMap();

   //last id given to a client
   private int lastID = 1;
   //last id given to a temporary topic
   private int lastTemporaryTopic = 1;
   private Object lastTemporaryTopicLock = new Object();
   //last id given to a temporary queue
   private int lastTemporaryQueue = 1;
   private Object lastTemporaryQueueLock = new Object();
   //The security manager
   private StateManager stateManager;
   //The persistence manager
   private PersistenceManager persistenceManager;
   //The Cache Used to hold messages
   private MessageCache messageCache;

   private Object stateLock = new Object();

   private Object idLock = new Object();

   /**
    * Because there can be a delay between killing the JMS service and the
    * service actually dying, this field is used to tell external classes that
    * that server has actually stopped.
    */
   private boolean stopped = true;

   /** Temporary queue/topic parameters */
   BasicQueueParameters parameters;

   /////////////////////////////////////////////////////////////////////
   // Constructors
   /////////////////////////////////////////////////////////////////////
   /**
    * Constructor for the JMSServer object
    */
   public JMSDestinationManager(BasicQueueParameters parameters)
   {
      this.parameters = parameters;
   }

   /**
    * Sets the Enabled attribute of the JMSServer object
    *
    * @param dc                The new Enabled value
    * @param enabled           The new Enabled value
    * @exception JMSException  Description of Exception
    */
   public void setEnabled(ConnectionToken dc, boolean enabled) throws JMSException
   {
      ClientConsumer ClientConsumer = getClientConsumer(dc);
      ClientConsumer.setEnabled(enabled);
   }

   /**
    * Sets the StateManager attribute of the JMSServer object
    *
    * @param newStateManager  The new StateManager value
    */
   public void setStateManager(StateManager newStateManager)
   {
      stateManager = newStateManager;
   }

   /**
    * Sets the PersistenceManager attribute of the JMSServer object
    *
    * @param newPersistenceManager  The new PersistenceManager value
    */
   public void setPersistenceManager(org.jboss.mq.pm.PersistenceManager newPersistenceManager)
   {
      persistenceManager = newPersistenceManager;
   }

   /////////////////////////////////////////////////////////////////////
   // Public Methods
   /////////////////////////////////////////////////////////////////////

   /**
    * Returns <code>false</code> if the JMS server is currently running and
    * handling requests, <code>true</code> otherwise.
   *
    * @return   <code>false</code> if the JMS server is currently running and
    *      handling requests, <code>true</code> otherwise.
    */
   public boolean isStopped()
   {
      synchronized (stateLock)
      {
         return this.stopped;
      }
   }

   /**
    *
    * @return the current client count
    */
   public int getClientCount()
   {
      return clientConsumers.size();
   }
   /** Obtain a copy of the current clients
    * @return a HashMap<ConnectionToken, ClientConsumer> of current clients
    */
   public HashMap getClients()
   {
      return new HashMap(clientConsumers);
   }

   public ThreadGroup getThreadGroup()
   {
      return threadGroup;
   }

   //Get a new ClientID for a connection
   /**
    * Gets the ID attribute of the JMSServer object
    *
    * @return   The ID value
    */
   public String getID()
   {
      String ID = null;

      while (true)
      {
         try
         {
            synchronized (idLock)
            {
               ID = "ID:" + (new Integer(lastID++).toString());
            }
            stateManager.addLoggedOnClientId(ID);
            break;
         }
         catch (Exception e)
         {
         }
      }

      return ID;
   }

   /**
    * Gets the TemporaryTopic attribute of the JMSServer object
    *
    * @param dc                Description of Parameter
    * @return                  The TemporaryTopic value
    * @exception JMSException  Description of Exception
    */
   public TemporaryTopic getTemporaryTopic(ConnectionToken dc) throws JMSException
   {
      String topicName;
      synchronized (lastTemporaryTopicLock)
      {
         topicName = "JMS_TT" + (new Integer(lastTemporaryTopic++).toString());
      }
      SpyTemporaryTopic topic = new SpyTemporaryTopic(topicName, dc);

      ClientConsumer ClientConsumer = getClientConsumer(dc);
      JMSDestination queue = new JMSTopic(topic, ClientConsumer, this, parameters);
      destinations.put(topic, queue);

      return topic;
   }

   /**
    * Gets the TemporaryQueue attribute of the JMSServer object
    *
    * @param dc                Description of Parameter
    * @return                  The TemporaryQueue value
    * @exception JMSException  Description of Exception
    */
   public TemporaryQueue getTemporaryQueue(ConnectionToken dc) throws JMSException
   {
      String queueName;
      synchronized (lastTemporaryQueueLock)
      {
         queueName = "JMS_TQ" + (new Integer(lastTemporaryQueue++).toString());
      }
      SpyTemporaryQueue newQueue = new SpyTemporaryQueue(queueName, dc);

      ClientConsumer ClientConsumer = getClientConsumer(dc);
      JMSDestination queue = new JMSQueue(newQueue, ClientConsumer, this, parameters);
      destinations.put(newQueue, queue);

      return newQueue;
   }

   // Gets the ClientConsumers mapped the the connection
   // If the connection is not mapped, a new ClientConsumer is created
   /**
    * Gets the ClientConsumer attribute of the JMSServer object
    *
    * @param dc                Description of Parameter
    * @return                  The ClientConsumer value
    * @exception JMSException  Description of Exception
    */
   public ClientConsumer getClientConsumer(ConnectionToken dc) throws JMSException
   {
      ClientConsumer cq = (ClientConsumer) clientConsumers.get(dc);
      if (cq == null)
      {
         cq = new ClientConsumer(this, dc);
         clientConsumers.put(dc, cq);
      }
      return cq;
   }

   /**
    * Gets the JMSDestination attribute of the JMSServer object
    *
    * @param dest  Description of Parameter
    * @return      The JMSDestination value
    */
   public JMSDestination getJMSDestination(SpyDestination dest)
   {
      return (JMSDestination) destinations.get(dest);
   }

   /**
    * Gets the JMSDestination attribute of the JMSServer object
    * which might be being closed 
    *
    * @param dest  Description of Parameter
    * @return      The JMSDestination value
    */
   protected JMSDestination getPossiblyClosingJMSDestination(SpyDestination dest)
   {
      JMSDestination result = (JMSDestination) destinations.get(dest);
      if (result == null)
         result = (JMSDestination) closingDestinations.get(dest);
      return result;
   }

   /**
    * Gets the StateManager attribute of the JMSServer object
    *
    * @return   The StateManager value
    */
   public StateManager getStateManager()
   {
      return stateManager;
   }

   /**
    * Gets the PersistenceManager attribute of the JMSServer object
    *
    * @return   The PersistenceManager value
    */
   public org.jboss.mq.pm.PersistenceManager getPersistenceManager()
   {
      return persistenceManager;
   }

   /**
    * #Description of the Method
    */
   public void startServer()
   {
      synchronized (stateLock)
      {
         this.stopped = false;
      }
   }

   /**
    * #Description of the Method
    */
   public void stopServer()
   {
      synchronized (stateLock)
      {
         this.stopped = true;
      }
   }

   /**
    * Check a clienID set by the client.
    *
    * @param ID                Description of Parameter
    * @exception JMSException  Description of Exception
    */
   public void checkID(String ID) throws JMSException
   {
      stateManager.addLoggedOnClientId(ID);
   }

   //A connection has sent a new message
   /**
    * Adds a feature to the Message attribute of the JMSServer object
    *
    * @param dc                The feature to be added to the Message attribute
    * @param val               The feature to be added to the Message attribute
    * @exception JMSException  Description of Exception
    */
   public void addMessage(ConnectionToken dc, SpyMessage val) throws JMSException
   {
      addMessage(dc, val, null);
   }

   //A connection has sent a new message
   /**
    * Adds a feature to the Message attribute of the JMSServer object
    *
    * @param dc                The feature to be added to the Message attribute
    * @param val               The feature to be added to the Message attribute
    * @param txId              The feature to be added to the Message attribute
    * @exception JMSException  Description of Exception
    */
   public void addMessage(ConnectionToken dc, SpyMessage val, Tx txId) throws JMSException
   {
      JMSDestination queue = (JMSDestination) destinations.get(val.getJMSDestination());
      if (queue == null)
         throw new InvalidDestinationException("This destination does not exist! " + val.getJMSDestination());

      // Reset any redelivered information
      val.setJMSRedelivered(false);
      val.header.jmsProperties.remove(SpyMessage.PROPERTY_REDELIVERY_COUNT);

      //Add the message to the queue
      val.setReadOnlyMode();
      queue.addMessage(val, txId);
   }

   /**
    * The following function performs a Unit Of Work.
    *
    * @param dc                Description of Parameter
    * @param t                 Description of Parameter
    * @exception JMSException  Description of Exception
    */
   public void transact(ConnectionToken dc, TransactionRequest t) throws JMSException
   {

      org.jboss.mq.pm.TxManager txManager = persistenceManager.getTxManager();
      if (t.requestType == TransactionRequest.ONE_PHASE_COMMIT_REQUEST)
      {

         Tx txId = txManager.createTx();

         try
         {

            if (t.messages != null)
            {
               for (int i = 0; i < t.messages.length; i++)
               {
                  addMessage(dc, t.messages[i], txId);
               }
            }

            if (t.acks != null)
            {
               for (int i = 0; i < t.acks.length; i++)
               {
                  acknowledge(dc, t.acks[i], txId);
               }
            }

            txManager.commitTx(txId);

         }
         catch (JMSException e)
         {
            log.debug("Exception occured, rolling back transaction: ", e);
            txManager.rollbackTx(txId);
            throw new SpyTransactionRolledBackException("Transaction was rolled back.", e);
         }
      }
      else if (t.requestType == TransactionRequest.TWO_PHASE_COMMIT_PREPARE_REQUEST)
      {

         Tx txId = txManager.createTx(dc, t.xid);
         try
         {

            if (t.messages != null)
            {
               for (int i = 0; i < t.messages.length; i++)
               {
                  addMessage(dc, t.messages[i], txId);
               }
            }

            if (t.acks != null)
            {
               for (int i = 0; i < t.acks.length; i++)
               {
                  acknowledge(dc, t.acks[i], txId);
               }
            }

         }
         catch (JMSException e)
         {
            log.debug("Exception occured, rolling back transaction: ", e);
            txManager.rollbackTx(txId);
            throw new SpyTransactionRolledBackException("Transaction was rolled back.", e);
         }
      }
      else if (t.requestType == TransactionRequest.TWO_PHASE_COMMIT_ROLLBACK_REQUEST)
      {

         Tx txId = txManager.getPrepared(dc, t.xid);
         txManager.rollbackTx(txId);

      }
      else if (t.requestType == TransactionRequest.TWO_PHASE_COMMIT_COMMIT_REQUEST)
      {

         Tx txId = txManager.getPrepared(dc, t.xid);
         txManager.commitTx(txId);

      }

   }

   //Sent by a client to Ack or Nack a message.
   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @param item              Description of Parameter
    * @exception JMSException  Description of Exception
    */
   public void acknowledge(ConnectionToken dc, AcknowledgementRequest item) throws JMSException
   {
      acknowledge(dc, item, null);
   }

   //Sent by a client to Ack or Nack a message.
   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @param item              Description of Parameter
    * @param txId              Description of Parameter
    * @exception JMSException  Description of Exception
    */
   public void acknowledge(ConnectionToken dc, AcknowledgementRequest item, Tx txId) throws JMSException
   {

      ClientConsumer cc = getClientConsumer(dc);
      cc.acknowledge(item, txId);

   }

   //A connection is closing [error or notification]
   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @exception JMSException  Description of Exception
    */
   public void connectionClosing(ConnectionToken dc) throws JMSException
   {
      if (dc == null)
         return;

      // Close it's ClientConsumer
      ClientConsumer cq = (ClientConsumer) clientConsumers.remove(dc);
      if (cq != null)
         cq.close();

      //unregister its clientID
      if (dc.getClientID() != null)
         stateManager.removeLoggedOnClientId(dc.getClientID());

      //Remove any temporary destinations the consumer may have created.
      Iterator i = destinations.entrySet().iterator();
      while (i.hasNext())
      {
         Map.Entry entry = (Map.Entry) i.next();
         JMSDestination sq = (JMSDestination) entry.getValue();
         ClientConsumer cc = sq.temporaryDestination;
         if (cc != null && dc.equals(cc.connectionToken))
         {
            i.remove();
            deleteTemporaryDestination(dc, sq);
         }
      }
      // Close the clientIL
      try
      {
         if (dc.clientIL != null)
            dc.clientIL.close();
      }
      catch (Exception ex)
      {
         // We skipp warning, to often the client will allways
         // have gone when we get here
         //log.warn("Could not close clientIL: " +ex,ex);
      }

   }

   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @exception JMSException  Description of Exception
    */
   public void connectionFailure(ConnectionToken dc) throws JMSException
   {
      //We should try again :) This behavior should under control of a Failure-Plugin
      log.error("The connection to client " + dc.getClientID() + " failed.");
      connectionClosing(dc);
   }

   //A connection object wants to subscribe to a Destination
   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @param sub               Description of Parameter
    * @exception JMSException  Description of Exception
    */
   public void subscribe(ConnectionToken dc, Subscription sub) throws JMSException
   {
      ClientConsumer clientConsumer = getClientConsumer(dc);
      clientConsumer.addSubscription(sub);
   }

   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @param subscriptionId    Description of Parameter
    * @exception JMSException  Description of Exception
    */
   public void unsubscribe(ConnectionToken dc, int subscriptionId) throws JMSException
   {
      ClientConsumer clientConsumer = getClientConsumer(dc);
      clientConsumer.removeSubscription(subscriptionId);
   }

   /**
    * #Description of the Method
    *
    * @param id                Description of Parameter
    * @exception JMSException  Description of Exception
    */
   public void destroySubscription(ConnectionToken dc, DurableSubscriptionID id) throws JMSException
   {
      getStateManager().setDurableSubscription(this, id, null);
   }

   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @param dest              Description of Parameter
    * @param selector          Description of Parameter
    * @return                  Description of the Returned Value
    * @exception JMSException  Description of Exception
    */
   public SpyMessage[] browse(ConnectionToken dc, Destination dest, String selector) throws JMSException
   {

      JMSDestination queue = (JMSDestination) destinations.get(dest);
      if (queue == null)
         throw new InvalidDestinationException("That destination does not exist! " + dest);
      if (!(queue instanceof JMSQueue))
         throw new JMSException("That destination is not a queue");

      return ((JMSQueue) queue).browse(selector);
   }

   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @param subscriberId      Description of Parameter
    * @param wait              Description of Parameter
    * @return                  Description of the Returned Value
    * @exception JMSException  Description of Exception
    */
   public SpyMessage receive(ConnectionToken dc, int subscriberId, long wait) throws JMSException
   {
      ClientConsumer clientConsumer = getClientConsumer(dc);
      SpyMessage msg = clientConsumer.receive(subscriberId, wait);
      return msg;
   }

   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @param name              Description of Parameter
    * @return                  Description of the Returned Value
    * @exception JMSException  Description of Exception
    */
   public Queue createQueue(ConnectionToken dc, String name) throws JMSException
   {
      SpyQueue newQueue = new SpyQueue(name);
      if (!destinations.containsKey(newQueue))
      {
         throw new JMSException("This destination does not exist !");
      }
      return newQueue;
   }

   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @param name              Description of Parameter
    * @return                  Description of the Returned Value
    * @exception JMSException  Description of Exception
    */
   public Topic createTopic(ConnectionToken dc, String name) throws JMSException
   {
      SpyTopic newTopic = new SpyTopic(name);
      if (!destinations.containsKey(newTopic))
      {
         throw new JMSException("This destination does not exist !");
      }
      return newTopic;
   }

   /**
    * #Description of the Method
    *
    * @param dc    Description of Parameter
    * @param dest  Description of Parameter
    */
   public void deleteTemporaryDestination(ConnectionToken dc, SpyDestination dest)
      throws JMSException
   {
      JMSDestination destination = (JMSDestination) destinations.get(dest);
      if (destination == null)
         throw new InvalidDestinationException("That destination does not exist! " + destination);

      if (destination.isInUse())
         throw new JMSException("Cannot delete temporary queue, it is in use.");
      
      destinations.remove(dest);
      deleteTemporaryDestination(dc, destination);
   }

   protected void deleteTemporaryDestination(ConnectionToken dc, JMSDestination destination)
      throws JMSException
   {
      try
      {
         destination.removeAllMessages();
      }
      catch (Exception e)
      {
         log.error("An exception happened while removing all messages from temporary destination " 
                   + destination.getSpyDestination().getName(), e);
      }

   }

   /**
    * #Description of the Method
    *
    * @param userName          Description of Parameter
    * @param password          Description of Parameter
    * @return                  Description of the Returned Value
    * @exception JMSException  Description of Exception
    */
   public String checkUser(String userName, String password) throws JMSException
   {
      return stateManager.checkUser(userName, password);
   }
   /**
    * authenticate user and return a session id. Same as checkID.
    *
    * @param ID                Description of Parameter
    * @exception JMSException  Description of Exception
    */
   public String authenticate(String id, String password) throws JMSException
   {
      // do nothing
      return null;
   }
   // Administration calls
   /**
    * Adds a feature to the Destination attribute of the JMSServer object
    *
    * @param topic             The feature to be added to the Destination
    *      attribute
    * @param queue             The feature to be added to the Destination
    *      attribute
    * @exception JMSException  Description of Exception
    */
   public void addDestination(JMSDestination destination) throws JMSException
   {
      if (destinations.containsKey(destination.getSpyDestination()))
         throw new JMSException("This destination has already been added to the server!");

      //Add this new destination to the list
      destinations.put(destination.getSpyDestination(), destination);

      // Restore the messages
      if (destination instanceof JMSTopic)
      {
         Collection durableSubs =
            getStateManager().getDurableSubscriptionIdsForTopic((SpyTopic) destination.getSpyDestination());
         for (Iterator i = durableSubs.iterator(); i.hasNext();)
         {
            DurableSubscriptionID sub = (DurableSubscriptionID) i.next();
            log.debug("creating the durable subscription for :" + sub);
            ((JMSTopic) destination).createDurableSubscription(sub);
         }
      }
   }

   /**
    * Closed a destination that was opened previously
    *
    * @param dest              the destionation to close
    * @exception JMSException  Description of Exception
    */
   public void closeDestination(SpyDestination dest) throws JMSException
   {
      JMSDestination destination = (JMSDestination) destinations.remove(dest);
      if (destination == null)
         throw new InvalidDestinationException("This destination is not open! " + dest);

      log.debug("Closing destination " + dest);
      
      // Add it to the closing list
      closingDestinations.put(dest, destination);
      try
      {
         destination.close();
      }
      finally
      {
         closingDestinations.remove(dest);
      }
   }

   /**
    * #Description of the Method
    *
    * @return   Description of the Returned Value
    */
   public String toString()
   {
      return JBOSS_VESION;
   }

   /**
    * #Description of the Method
    *
    * @param dc                Description of Parameter
    * @param clientTime        Description of Parameter
    * @exception JMSException  Description of Exception
    */
   public void ping(ConnectionToken dc, long clientTime) throws JMSException
   {
      try
      {
         dc.clientIL.pong(System.currentTimeMillis());
      }
      catch (Exception e)
      {
         throw new SpyJMSException("Could not pong", e);
      }
   }

   /**
    * Gets the messageCache
    * @return Returns a MessageCache
    */
   public MessageCache getMessageCache()
   {
      return messageCache;
   }

   /**
    * Sets the messageCache
    * @param messageCache The messageCache to set
    */
   public void setMessageCache(MessageCache messageCache)
   {
      this.messageCache = messageCache;
   }

   public SpyTopic getDurableTopic(DurableSubscriptionID sub) throws JMSException
   {
      return getStateManager().getDurableTopic(sub);
   }

   public Subscription getSubscription(ConnectionToken dc, int subscriberId) throws JMSException
   {
      ClientConsumer clientConsumer = getClientConsumer(dc);
      return clientConsumer.getSubscription(subscriberId);
   }

   /**
    * Gets message counters of all configured destinations
    *
    * @return MessageCounter[]      message counter array sorted by name
    */
   public MessageCounter[] getMessageCounter()
   {
      TreeMap map = new TreeMap(); // for sorting

      Iterator i = destinations.values().iterator();

      while (i.hasNext())
      {
         JMSDestination dest = (JMSDestination) i.next();

         MessageCounter[] counter = dest.getMessageCounter();

         for (int j = 0; j < counter.length; j++)
         {
            // sorting order name + subscription + type
            String key =
               counter[j].getDestinationName()
                  + "-"
                  + counter[j].getDestinationSubscription()
                  + "-"
                  + (counter[j].getDestinationTopic() ? "Topic" : "Queue");

            map.put(key, counter[j]);
         }
      }

      return (MessageCounter[]) map.values().toArray(new MessageCounter[0]);
   }

   /**
    * Resets message counters of all configured destinations
    */
   public void resetMessageCounter()
   {
      Iterator i = destinations.values().iterator();

      while (i.hasNext())
      {
         JMSDestination dest = (JMSDestination) i.next();

         MessageCounter[] counter = dest.getMessageCounter();

         for (int j = 0; j < counter.length; j++)
         {
            counter[j].resetCounter();
         }
      }
   }
}