/*
 * JBoss, the OpenSource J2EE webOS
 *
 * Distributable under LGPL license.
 * See terms of license at gnu.org.
 */
package org.jboss.cache;


import org.jboss.logging.Logger;
import org.jgroups.blocks.MethodCall;

import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;


/**
 * Periodically (or when certain size is exceeded) takes elements and replicates them.
 *
 * @author <a href="mailto:bela@jboss.org">Bela Ban</a> May 24, 2003
 * @version $Revision: 1.7.4.2 $
 */
public class ReplicationQueue {

   TreeCache cache=null;

   /**
    * We flush every 5 seconds. Inactive if -1 or 0
    */
   long interval=5000;

   /**
    * Max elements before we flush
    */
   long max_elements=500;

   /**
    * Holds the replication jobs: LinkedList<MethodCall>
    */
   LinkedList elements=new LinkedList();

   /**
    * For periodical replication
    */
   Timer timer=null;

   /**
    * The timer task, only calls flush() when executed by Timer
    */
   MyTask task=null;

   static Logger log=Logger.getLogger(ReplicationQueue.class);


   public ReplicationQueue() {

   }

   public ReplicationQueue(TreeCache cache, long interval, long max_elements) {
      this.cache=cache;
      this.interval=interval;
      this.max_elements=max_elements;
   }


   public long getInterval() {
      return interval;
   }

   public void setInterval(long interval) {
      this.interval=interval;
      stop();
      start();
   }

   public long getMax_elements() {
      return max_elements;
   }

   public void setMax_elements(long max_elements) {
      this.max_elements=max_elements;
   }

   public void start() {
      if(interval > 0) {
         if(task == null)
            task=new MyTask();
         if(timer == null) {
            timer=new Timer(true);
            timer.schedule(task,
                    500, // delay before initial flush
                    interval); // interval between flushes
         }
      }
   }


   public void stop() {
      if(task != null) {
         task.cancel();
         task=null;
      }
      if(timer != null) {
         timer.cancel();
         timer=null;
      }
   }


   public void add(MethodCall job) {
      synchronized(elements) {
         if(job != null && !elements.contains(job))
            elements.add(job);
         if(elements.size() >= max_elements)
            flush();
      }
   }


   public void flush() {
      List l;
      synchronized(elements) {
         if(log.isTraceEnabled())
            log.trace("flush(): flushing repl queue (num elements=" + elements.size() + ")");
         l=(List)elements.clone();
         elements.clear();
      }

      try {
         cache.callRemoteMethods(null, // send to all live nodes in the cluster
                                 TreeCache.replicateAllMethod, new Object[]{l}, false, true, 5000);
      }
      catch(Throwable t) {
         log.error("failed replicating " + l.size() + " elements in replication queue", t);
      }
   }


   public class MyTask extends TimerTask {
      public void run() {
         flush();
      }
   }


}