/*
 * JBoss, the OpenSource WebOS
 *
 * Distributable under LGPL license.
 * See terms of license at gnu.org.
 */
package org.jboss.web.tomcat.tc5.session;

import org.apache.catalina.Session;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.Context;
import org.apache.catalina.deploy.FilterDef;
import org.apache.catalina.deploy.FilterMap;
import org.jboss.metadata.WebMetaData;
import org.jboss.util.NestedRuntimeException;

import javax.transaction.TransactionManager;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionRequiredException;
import javax.transaction.TransactionRolledbackException;

import javax.transaction.UserTransaction;

import javax.naming.NamingException;
import javax.naming.InitialContext;
import java.util.List;

/**
 * Implementation of a clustered session manager for
 * catalina using JBossCache replication.
 *
 * @author Ben Wang
 * @version $Revision: 1.3.2.11 $
 */
public class JBossCacheManager
   extends JBossManager
{

   /**
    * Informational name for this Catalina component
    */
   private static final String info_ = "JBossCacheManager/1.0";

   // -- Class attributes ---------------------------------
   // For JNDI lookup
   private InitialContext initialContext_;

   /**
    * Proxy-object for the JBossCacheService
    */
   private JBossCacheService proxy_;

   /**
    * If set to true, will add a JvmRouteFilter to the request.
    */
   protected boolean useJK_ = false;

   /**
    * The transaction manager.
    */
   protected TransactionManager tm;

   public JBossCacheManager()
   {
   }

   public void init(String name, WebMetaData webMetaData, boolean useJK, boolean useLocalCache)
      throws ClusteringNotSupportedException
   {
      super.init(name, webMetaData, useJK, useLocalCache);
      this.useJK_ = useJK;
      try
      {
         proxy_ = (JBossCacheService) new JBossCacheService();
      }
      catch (Throwable t)
      {
         String str = "JBossCacheService to Tomcat clustering not found";
         log_.error(str);
         throw new ClusteringNotSupportedException(str);
      }
   }

   public JBossCacheService getCacheService()
   {
      return proxy_;
   }

   // Manager-methods -------------------------------------
   /**
    * Start this Manager
    *
    * @throws org.apache.catalina.LifecycleException
    *
    */
   public void start() throws LifecycleException
   {
      super.start();

      // Obtain the transaction manager
      try
      {
         tm =(TransactionManager)new InitialContext().lookup("java:/TransactionManager");
      }
      catch (Exception e)
      {
         log_.error("Cannot get a reference to the transaction manager", e);
         throw new LifecycleException(e);
      }

      // Adding JvmRouteFilter if needed
      if (useJK_)
      {
         boolean hasFilterCreated = false;
         Context context = (Context) container_;
         // Need to set this so filter instance can get hold of me later.
         context.getServletContext().setAttribute("AbstractJBossManager", this);

         String filterName = "JvmRouteFilter";
         if (log_.isDebugEnabled())
         {
            log_.debug("start(): we are using mod_jk(2) for load-balancing. Will add " + filterName);
         }

         FilterDef def = new FilterDef();
         def.setFilterName(filterName);
         def.setDescription("Filter to re-package the session id with jvmroute if failing-over under mod_jk(2)");
         def.setFilterClass(org.jboss.web.tomcat.tc5.JvmRouteFilter.class.getName());
         // Just to make sure we don't create duplicate
         FilterDef[] defs = context.findFilterDefs();
         for (int i = 0; i < defs.length; i++)
         {
            FilterDef d = defs[i];
            if (d.getFilterName().equals(filterName))
            {
               hasFilterCreated = true;
               break;
            }
         }
         if (!hasFilterCreated)
            context.addFilterDef(def);

         FilterMap map = new FilterMap();
         map.setFilterName(filterName);
         map.setURLPattern("/*");
         context.addFilterMap(map);
      }
      // Find JBossCacheService
      // Will need to pass the classloader that is associated with this web app so de-serialization will work correctly.
      ClassLoader tcl = super.getContainer().getLoader().getClassLoader();
      proxy_.start(tcl, this);
      if (log_.isDebugEnabled())
      {
         log_.debug("start(): JBossCacheService started");
      }
   }

   public void stop() throws LifecycleException
   {
      super.stop();
      proxy_.stop();
      tm = null;
   }

   /**
    * Create a new session. Note this does not mean the session is active yet.
    */
   public Session createSession()
   {
       return createSession(null);
   }

       
   /**
    * Create a new session. Note this does not mean the session is active yet.
    */
   public Session createSession(String sessionId)
   {
      ClusteredSession session = null;
      if (replicationGranularity_ == WebMetaData.REPLICATION_GRANULARITY_ATTRIBUTE)
      {
         session = (ClusteredSession) new AttributeBasedClusteredSession(this);
      }
      else if (replicationGranularity_ == WebMetaData.REPLICATION_GRANULARITY_SESSION)
      {
         session = (ClusteredSession) new SessionBasedClusteredSession(this);
      }

      session.setNew(true);
      session.setCreationTime(System.currentTimeMillis());
      session.setMaxInactiveInterval(this.maxInactiveInterval_);

      if (sessionId == null)
      {
          sessionId = this.getNextId();
          
          // We are using mod_jk for load balancing. Append the JvmRoute.
          if (useJK_)
          {
              if (log_.isDebugEnabled())
              {
                  log_.debug("createSession(): useJK is true. Will append JvmRoute: " + this.getJvmRoute());
              }
              sessionId += "." + this.getJvmRoute();
          }
      }

      session.setValid(true);
      session.setId(sessionId);
      if (log_.isDebugEnabled())
      {
         log_.debug("Creating an ClusteredSession with id: " + session.getId());
      }

      createdCounter_++;
      return session;
   }

   public boolean storeSession(Session baseSession)
   {
      // is this session of the right type?
      if (!(baseSession instanceof ClusteredSession))
      {
         throw new IllegalArgumentException("You can only add ClusteredSessions to this Manager");
      }

      ClusteredSession session = (ClusteredSession) baseSession;
      if (session == null)
      {
         return false;
      }

      if (session.isValid())
      {
         // put it in the local store as well.
         sessions_.put(getRealId(session.getId()), session);

         String id = session.getId();
         long beginPassivate = System.currentTimeMillis();
         // Notify all session attributes that they get serialized (SRV 7.7.2)
         session.passivate();
         long endPassivate = System.currentTimeMillis();
         stats_.updatePassivationStats(id, (endPassivate - beginPassivate));

         if (log_.isDebugEnabled())
         {
            log_.debug("check to see if needs to store and replicate session with id " + id);
         }

         long beginReplication = System.currentTimeMillis();
         processSessionRepl(session);
         long endReplication = System.currentTimeMillis();
         stats_.updateReplicationStats(id, (endReplication - beginReplication));
         return true;
      }
      else
      {
         return false;
      }
   }

   public void add(Session session)
   {
      if (session == null)
         return;

      if (!session.isValid())
      {
         log_.error("Cannot add session with id=" + session.getId() + " because it is invalid");
         return;
      }

      // maxActive_ -1 is unlimited
      if (maxActive_ != -1 && activeCounter_ >= maxActive_)
      {
         // Exceeds limit. We will need to reject it.
         rejectedCounter_++;
         // Catalina api does not specify what heppens but we will throw a runtime exception for now.
         throw new IllegalStateException("JBossCacheManager.add(): number of active sessions exceeds the maximum limit: " +
            maxActive_ + " when trying to add session id " + session.getId());
      }

      if (storeSession((ClusteredSession) session))
      {
         activeCounter_++;
         if (log_.isDebugEnabled())
         {
            log_.debug("Session with id=" + session.getId() + " added. Current active sessions " + activeCounter_);
         }
      }
   }

   public Session createEmptySession()
   {
      // We will simply return new ClusteredSession instanc enow.
      ClusteredSession session = null;
      if (replicationGranularity_ == WebMetaData.REPLICATION_GRANULARITY_ATTRIBUTE)
      {
         session = (ClusteredSession) new AttributeBasedClusteredSession(this);
      }
      else if (replicationGranularity_ == WebMetaData.REPLICATION_GRANULARITY_SESSION)
      {
         session = (ClusteredSession) new SessionBasedClusteredSession(this);
      }

      if (log_.isDebugEnabled())
      {
         log_.debug("Creating an empty ClusteredSession: " + session);
      }

      createdCounter_++;
      return session;
   }

   public Session findSession(String id)
   {
      String realId = getRealId(id);
      ClusteredSession session = findLocalSession(realId);
      // Find it from the local store first.
      if (session != null && !session.isOutdated() )
      {
         return session;
      }
      else
      {
         return loadSession(realId);
      }
   }

   /**
    * Return the sessions. We will find it from the in-memory local ones first. If not found, we search for the
    * underlying store as well just to be sure.
    * @return
    */
   public Session[] findSessions()
   {
      Session[] sessions;

      // Will need to find from the underlying store
      List ids = proxy_.getNewSessionsInStore();
      if(ids.size() ==0)
      {
         Session[] sess = new Session[0];
         sess = (Session[]) sessions_.values().toArray(sess);
         return sess;
      }

      if(log_.isDebugEnabled()) {
         log_.debug("findSessions: find ids from cache store: " + ids);
      }


      // Is there a better way to do this?
      for(int i=0; i < ids.size(); i++) {
         Session session = loadSession((String)ids.get(i));
         if( session == null )
         {
            // This can happen now if the node has been removed before it has been called.

            // Something is wrong with session. Should not have id but null session!
//            log_.warn("Has session id: " +ids.get(i) + " but session is null. Will remove this from map");
//            sessions_.remove(ids.get(i));
//            proxy_.removeSession((String)ids.get(i));
            continue;
         }
         sessions_.put(ids.get(i), session);   // Populate local copy as well.
      }

      Session[] sess = new Session[0];
      sess = (Session[]) sessions_.values().toArray(sess);
      return sess;
   }

   public ClusteredSession[] findLocalSessions()
   {
      ClusteredSession[] sess = new ClusteredSession[0];

      sess = (ClusteredSession[]) sessions_.values().toArray(sess);
      return sess;
   }

   public ClusteredSession findLocalSession(String realId)
   {
      ClusteredSession session = (ClusteredSession) sessions_.get(realId);
      return session;
   }

   public void remove(Session session)
   {
      String id = session.getId();
      if (id == null) return;
      // Let's do it in brute force.
      if (log_.isDebugEnabled())
      {
         log_.debug("Removing session from store with id: " + id);
      }
      ((ClusteredSession) session).removeMyself();
      sessions_.remove(getRealId(session.getId()));
      activeCounter_--;
   }

   public void removeLocal(Session session)
   {
      String id = session.getId();
      if (id == null) return;
      // Let's do it in brute force.
      if (log_.isDebugEnabled())
      {
         log_.debug("Removing session from local store with id: " + id);
      }
      ((ClusteredSession) session).removeMyselfLocal();
      sessions_.remove(getRealId(session.getId()));
      // It's a bit ad-hoc to do it here. But since we currently call this when session expires ...
      expiredCounter_++;
      activeCounter_--;
   }

   /**
    * Load a session from the distributed store
    *
    * @param realId The session-id for the session to load without JvmRoute
    * @return the session or null if the session cannot be found in the distributed store
    */
   protected Session loadSession(String realId)
   {
      ClusteredSession session = null;

      if (realId == null)
      {
         return null;
      }

      // TODO We will need to determine if we want stats for loading since there will be a lot of them
//      long begin = System.currentTimeMillis();
      session = (ClusteredSession) proxy_.getSession(realId);
      // Will need to initialize.
      if (session != null)
      {
         session.initAfterLoad(this);
         /* Put the session into the local map or else other calls to find
         the session after the request completes will wipe out the dirty state
         due to any mods made by the web app.
         */
         sessions_.put(getRealId(session.getId()), session);
//      long end = System.currentTimeMillis();
//      stats_.updateLoadStats(id, (end - begin));
      }

      if (log_.isDebugEnabled())
      {
         log_.debug("loadSession(): id= " + realId + ", session=" + session);
      }

      return session;
   }

   protected void processSessionRepl(ClusteredSession session)
   {
      boolean doTx = false;
      try
      {
         // We need transaction so all the replication are sent in batch.
         // Don't do anything if there is already transaction context associated with this thread.
         if(tm.getTransaction() == null)
            doTx = true;

         if(doTx)
            tm.begin();

         session.processSessionRepl();
      }
      catch (Exception ex)
      {
         log_.error("processSessionRepl: failed with exception: " + ex);
         try
         {
//            if(doTx)
               // Let's set it no matter what.
               tm.setRollbackOnly();
         }
         catch (Exception exn)
         {
            exn.printStackTrace();
         }
         // We will need to alert Tomcat of this exception.
         throw new NestedRuntimeException("JBossCacheManager.processSessionRepl(): failed to replicate session.", ex);
      }
      finally
      {
         if(doTx)
            endTransaction();
      }
   }

   protected void endTransaction()
   {
      try {
         if(tm.getTransaction().getStatus() != Status.STATUS_MARKED_ROLLBACK)
         {
            tm.commit();
         } else
         {
            tm.rollback();
         }
      } catch (Exception e) {
         e.printStackTrace();
         throw new NestedRuntimeException("TreeCacheAop.endTransaction(): ", e);
      }
   }

   /**
    * Go through all sessions and look if they have expired. Note this overrides the method in JBossManager.
    */
   protected void processExpires()
   {
      if (log_.isTraceEnabled())
      {
         log_.trace("Looking for sessions that have expired ...");
      }

      // Get all sessions
      // Does not really need tx. But just to comform with the cache usage.
      try
      {
         Session sessions[] = findSessions();
         for (int i = 0; i < sessions.length; ++i)
         {
            ClusteredSession session = (ClusteredSession) sessions[i];
            if(session == null)
            {
               log_.warn("processExpires(): processing null session at index " +i);
               continue;
            }

            // We only look at valid sessions. This will remove session if not valid already.
            boolean doTx = false;
            try
            {
               // We need transaction so all the replication are sent in batch.
               // Don't do anything if there is already transaction context associated with this thread.
               if(tm.getTransaction() == null)
                  doTx = true;

               if(doTx)
                  tm.begin();

               if (!session.isValid()) continue;
            }
            catch (Exception ex)
            {
               log_.error("processSessionExpire: failed with exception: " + ex);
               try
               {
//            if(doTx)
                     // Let's set it no matter what.
                     tm.setRollbackOnly();
               }
               catch (Exception exn)
               {
                  exn.printStackTrace();
               }
               // We will need to alert Tomcat of this exception.
               throw new NestedRuntimeException("JBossCacheManager.processSessionExpire(): failed to expire session.", ex);
            }
            finally
            {
               if(doTx)
                  endTransaction();
            }
         }
      }
      catch (Exception ex)
      {
         log_.error("processExpires: failed with exception: " + ex);
         ex.printStackTrace();
      }
   }

   // Get the id without JvmRoute.
   private String getRealId(String id)
   {
      // TODO May need optimization
      if (!useJK_) return id;

      int index = id.indexOf(".");
      if (index > 0)
      {
         return id.substring(0, id.indexOf("."));
      }
      else
      {
         return id;
      }
   }
}