/*
 * JBoss, the OpenSource J2EE webOS
 * 
 * Distributable under LGPL license.
 * See terms of license at gnu.org.
 */
package org.jboss.resource.adapter.jms.inflow;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ServerSession;
import javax.jms.Session;
import javax.jms.XAConnection;
import javax.jms.XASession;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkEvent;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkListener;
import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;

import org.jboss.logging.Logger;

/**
 * A generic jms session pool.
 * 
 * @author <a href="adrian@jboss.com">Adrian Brock</a>
 * @version $Revision: 1.2.2.1 $
 */
public class JmsServerSession implements ServerSession, MessageListener, Work, WorkListener
{
   // Constants -----------------------------------------------------

   /** The log */
   private static final Logger log = Logger.getLogger(JmsServerSession.class);
   
   // Attributes ----------------------------------------------------
   
   /** The session pool */
   JmsServerSessionPool pool;
   
   /** The transacted flag */
   boolean transacted;
   
   /** The acknowledge mode */
   int acknowledge;
   
   /** The session */
   Session session;
   
   /** Any XA session */
   XASession xaSession;
   
   /** The endpoint */
   MessageEndpoint endpoint;
   
   /** Any DLQ handler */
   DLQHandler dlqHandler;
   
   // Static --------------------------------------------------------

   // Constructors --------------------------------------------------

   /**
    * Create a new JmsServerSession
    * 
    * @param pool the server session pool
    */
   public JmsServerSession(JmsServerSessionPool pool)
   {
      this.pool = pool;
   }
   
   // Public --------------------------------------------------------
   
   // MessageListener -----------------------------------------------
   
   /**
    * Setup the session
    */
   public void setup() throws Exception
   {
      JmsActivation activation = pool.getActivation();
      JmsActivationSpec spec = activation.getActivationSpec();

      dlqHandler = activation.getDLQHandler();
      
      Connection connection = activation.getConnection();

      // Create the session
      if (connection instanceof XAConnection)
      {
         xaSession = ((XAConnection) connection).createXASession();
         session = xaSession.getSession();
      }
      else
      {
         transacted = spec.isSessionTransacted();
         acknowledge = spec.getAcknowledgeModeInt();
         session = connection.createSession(transacted, acknowledge);
      }
      
      // Get the endpoint
      MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
      XAResource xaResource = null;
      if (activation.isDeliveryTransacted() && xaSession != null)
         xaResource = xaSession.getXAResource();
      endpoint = endpointFactory.createEndpoint(xaResource);
      
      // Set the message listener
      session.setMessageListener(this);
   }
   
   /**
    * Stop the session
    */
   public void teardown()
   {
      try
      {
         if (endpoint != null)
            endpoint.release();
      }
      catch (Throwable t)
      {
         log.debug("Error releasing endpoint " + endpoint, t);
      }

      try
      {
         if (xaSession != null)
            xaSession.close();
      }
      catch (Throwable t)
      {
         log.debug("Error releasing xaSession " + xaSession, t);
      }

      try
      {
         if (session != null)
            session.close();
      }
      catch (Throwable t)
      {
         log.debug("Error releasing session " + session, t);
      }
   }
   
   // MessageListener implementation --------------------------------
   
   public void onMessage(Message message)
   {
      try
      {
         endpoint.beforeDelivery(JmsActivation.ONMESSAGE);
         try
         {
            if (dlqHandler == null || dlqHandler.handleRedeliveredMessage(message) == false)
               ((MessageListener) endpoint).onMessage(message);
         }
         finally
         {
            endpoint.afterDelivery();
            if (dlqHandler != null)
               dlqHandler.messageDelivered(message);
         }
      }
      catch (Throwable t)
      {
         log.error("Unexpected error delivering message " + message, t);
      }
   }
   
   // ServerSession implementation ----------------------------------

   public Session getSession() throws JMSException
   {
      return session;
   }

   public void start() throws JMSException
   {
      JmsActivation activation = pool.getActivation(); 
      WorkManager workManager = activation.getWorkManager();
      try
      {
         workManager.scheduleWork(this, 0, null, this);
      }
      catch (WorkException e)
      {
         log.error("Unable to schedule work", e);
         throw new JMSException("Unable to schedule work: " + e.toString());
      }
   }

   // Work implementation -------------------------------------------

   public void run()
   {
      session.run();
   }

   public void release()
   {
   }

   // WorkListener implementation -----------------------------------
   
   public void workAccepted(WorkEvent e)
   {
   }

   public void workCompleted(WorkEvent e)
   {
      pool.returnServerSession(this);
   }

   public void workRejected(WorkEvent e)
   {
      pool.returnServerSession(this);
   }


   public void workStarted(WorkEvent e)
   {
   }

   // Protected -----------------------------------------------------

   // Private -------------------------------------------------------

   // Inner classes -------------------------------------------------

}