package org.jboss.cache;
import org.jboss.logging.Logger;
import org.jgroups.blocks.MethodCall;
import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
public class ReplicationQueue {
TreeCache cache=null;
long interval=5000;
long max_elements=500;
LinkedList elements=new LinkedList();
Timer timer=null;
MyTask task=null;
static Logger log=Logger.getLogger(ReplicationQueue.class);
public ReplicationQueue() {
}
public ReplicationQueue(TreeCache cache, long interval, long max_elements) {
this.cache=cache;
this.interval=interval;
this.max_elements=max_elements;
}
public long getInterval() {
return interval;
}
public void setInterval(long interval) {
this.interval=interval;
stop();
start();
}
public long getMax_elements() {
return max_elements;
}
public void setMax_elements(long max_elements) {
this.max_elements=max_elements;
}
public void start() {
if(interval > 0) {
if(task == null)
task=new MyTask();
if(timer == null) {
timer=new Timer(true);
timer.schedule(task,
500, interval); }
}
}
public void stop() {
if(task != null) {
task.cancel();
task=null;
}
if(timer != null) {
timer.cancel();
timer=null;
}
}
public void add(MethodCall job) {
synchronized(elements) {
if(job != null && !elements.contains(job))
elements.add(job);
if(elements.size() >= max_elements)
flush();
}
}
public void flush() {
List l;
synchronized(elements) {
if(log.isTraceEnabled())
log.trace("flush(): flushing repl queue (num elements=" + elements.size() + ")");
l=(List)elements.clone();
elements.clear();
}
try {
cache.callRemoteMethods(null, TreeCache.replicateAllMethod, new Object[]{l}, false, true, 5000);
}
catch(Throwable t) {
log.error("failed replicating " + l.size() + " elements in replication queue", t);
}
}
public class MyTask extends TimerTask {
public void run() {
flush();
}
}
}