package org.jboss.cache;
import org.jboss.cache.eviction.LRUPolicy;
import org.jboss.cache.interceptors.Interceptor;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.lock.IsolationLevel;
import org.jboss.cache.lock.LockStrategyFactory;
import org.jboss.cache.lock.LockingException;
import org.jboss.cache.lock.TimeoutException;
import org.jboss.logging.Logger;
import org.jboss.system.ServiceMBeanSupport;
import org.jboss.util.NestedRuntimeException;
import org.jgroups.*;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.MethodCall;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
import org.jgroups.util.Util;
import org.w3c.dom.Attr;
import org.w3c.dom.Element;
import org.w3c.dom.NamedNodeMap;
import org.w3c.dom.NodeList;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.lang.reflect.Method;
import java.util.*;
public class TreeCache extends ServiceMBeanSupport implements TreeCacheMBean, Cloneable, MembershipListener {
protected Node root=new Node(SEPARATOR, Fqn.fromString(SEPARATOR), null, null, this);
protected final Vector listeners=new Vector();
protected JChannel channel=null;
protected boolean coordinator=false;
protected String cluster_name="TreeCache-Group";
protected String cluster_props=null;
protected final Vector members=new Vector();
protected RpcDispatcher disp=null;
protected MessageListener ml=new MessageListenerAdaptor(this, log);
protected long state_fetch_timeout=5000;
protected long sync_repl_timeout=15000;
protected boolean use_repl_queue=false;
protected int repl_queue_max_elements=1000;
protected long repl_queue_interval=5000;
private final TransactionTable tx_table=new TransactionTable();
private final HashMap lock_table=new HashMap();
protected boolean fetch_state_on_startup=true;
protected long lock_acquisition_timeout=10000;
protected String eviction_policy_class=null;
protected TreeCacheListener eviction_policy_provider = null;
protected int cache_mode=LOCAL;
public static Method putDataMethodLocal=null;
public static Method putDataEraseMethodLocal=null;
public static Method putKeyValMethodLocal=null;
public static Method putFailFastKeyValueMethodLocal=null;
public static Method removeNodeMethodLocal=null;
public static Method removeKeyMethodLocal=null;
public static Method removeDataMethodLocal=null;
public static Method evictNodeMethodLocal=null;
public static Method prepareMethod=null;
public static Method commitMethod=null;
public static Method rollbackMethod=null;
public static Method replicateMethod=null;
public static Method replicateAllMethod=null;
public static Method addChildMethodLocal=null;
public static Method getKeyValueMethodLocal=null;
public static Method getNodeMethodLocal=null;
public static Method getKeysMethodLocal=null;
public static Method getChildrenNamesMethodLocal=null;
public static Method releaseAllLocksMethodLocal=null;
public static Method printMethodLocal=null;
public static Method lockMethodLocal=null;
static LinkedList crud_methods=new LinkedList();
protected boolean isStateSet=false;
private final Object stateLock=new Object();
protected IsolationLevel isolationLevel=IsolationLevel.REPEATABLE_READ;
protected Element evictConfig_ = null;
public MessageListener getMessageListener() {
return ml;
}
protected Interceptor interceptor_chain=null;
protected Replicatable replication_handler=null;
protected TransactionManagerLookup tm_lookup=null;
protected String tm_lookup_class=null;
protected TransactionManager tm=null;
protected String cache_loader_class=null;
protected CacheLoader cache_loader=null;
protected Properties cache_loader_config=null;
protected boolean cache_loader_shared=true;
protected List cache_loader_preload=null;
protected boolean cache_loader_fetch_transient_state=true;
protected boolean cache_loader_fetch_persistent_state=true;
protected boolean sync_commit_phase=false;
protected boolean sync_rollback_phase=false;
protected boolean deadlockDetection=false;
protected ReplicationQueue repl_queue=null;
public static final String SEPARATOR="/";
public static final int LOCAL=1;
public static final int REPL_ASYNC=2;
public static final int REPL_SYNC=3;
static public final String UNINITIALIZED="jboss:internal:uninitialized"; static final String JNDI_LOCATOR_URI="socket://localhost:6789";
static {
try {
putDataMethodLocal=TreeCache.class.getDeclaredMethod("_put",
new Class[]{GlobalTransaction.class,
Fqn.class,
Map.class,
boolean.class});
putDataEraseMethodLocal=TreeCache.class.getDeclaredMethod("_put",
new Class[]{GlobalTransaction.class,
Fqn.class,
Map.class,
boolean.class,
boolean.class});
putKeyValMethodLocal=TreeCache.class.getDeclaredMethod("_put",
new Class[]{GlobalTransaction.class,
Fqn.class,
Object.class,
Object.class,
boolean.class});
putFailFastKeyValueMethodLocal=TreeCache.class.getDeclaredMethod("_put",
new Class[]{GlobalTransaction.class,
Fqn.class,
Object.class,
Object.class,
boolean.class,
long.class});
removeNodeMethodLocal=TreeCache.class.getDeclaredMethod("_remove",
new Class[]{GlobalTransaction.class,
Fqn.class,
boolean.class});
removeKeyMethodLocal=TreeCache.class.getDeclaredMethod("_remove",
new Class[]{GlobalTransaction.class,
Fqn.class,
Object.class,
boolean.class});
removeDataMethodLocal=TreeCache.class.getDeclaredMethod("_removeData",
new Class[]{GlobalTransaction.class,
Fqn.class,
boolean.class});
evictNodeMethodLocal=TreeCache.class.getDeclaredMethod("_evict", new Class[] {Fqn.class});
prepareMethod=TreeCache.class.getDeclaredMethod("prepare",
new Class[]{GlobalTransaction.class,
List.class,
Address.class,
boolean.class});
commitMethod=TreeCache.class.getDeclaredMethod("commit",
new Class[]{GlobalTransaction.class});
rollbackMethod=TreeCache.class.getDeclaredMethod("rollback",
new Class[]{GlobalTransaction.class});
addChildMethodLocal=TreeCache.class.getDeclaredMethod("_addChild",
new Class[]{GlobalTransaction.class,
Fqn.class, Object.class, Node.class});
getKeyValueMethodLocal=TreeCache.class.getDeclaredMethod("_get",
new Class[]{Fqn.class, Object.class, boolean.class});
getNodeMethodLocal=TreeCache.class.getDeclaredMethod("_get", new Class[]{Fqn.class});
getKeysMethodLocal=TreeCache.class.getDeclaredMethod("_getKeys", new Class[]{Fqn.class});
getChildrenNamesMethodLocal=TreeCache.class.getDeclaredMethod("_getChildrenNames", new Class[]{Fqn.class});
replicateMethod=TreeCache.class.getDeclaredMethod("_replicate", new Class[]{MethodCall.class});
replicateAllMethod=TreeCache.class.getDeclaredMethod("_replicate", new Class[]{List.class});
releaseAllLocksMethodLocal=TreeCache.class.getDeclaredMethod("_releaseAllLocks", new Class[]{Fqn.class});
printMethodLocal=TreeCache.class.getDeclaredMethod("_print", new Class[]{Fqn.class});
lockMethodLocal=TreeCache.class.getDeclaredMethod("_lock", new Class[]{Fqn.class,
int.class,
boolean.class});
}
catch(NoSuchMethodException ex) {
ex.printStackTrace();
throw new ExceptionInInitializerError(ex.toString());
}
crud_methods.add(putDataMethodLocal);
crud_methods.add(putDataEraseMethodLocal);
crud_methods.add(putKeyValMethodLocal);
crud_methods.add(putFailFastKeyValueMethodLocal);
crud_methods.add(removeNodeMethodLocal);
crud_methods.add(removeKeyMethodLocal);
crud_methods.add(removeDataMethodLocal);
}
public static boolean isCrudMethod(Method m) {
return m == null? false : crud_methods.contains(m);
}
public TreeCache(String cluster_name, String props, long state_fetch_timeout) throws Exception {
super();
if(cluster_name != null)
this.cluster_name=cluster_name;
if(props != null)
this.cluster_props=props;
this.state_fetch_timeout=state_fetch_timeout;
}
public TreeCache() throws Exception {
super();
}
public TreeCache(JChannel channel) throws Exception {
super();
this.channel=channel;
}
public Node getRoot() {
return root;
}
public Object getLocalAddress() {
return channel != null ? channel.getLocalAddress() : null;
}
public Vector getMembers() {
return members;
}
public boolean isCoordinator() {
return coordinator;
}
public String getClusterName() {
return cluster_name;
}
public void setClusterName(String name) {
cluster_name=name;
}
public String getClusterProperties() {
return cluster_props;
}
public void setClusterProperties(String cluster_props) {
this.cluster_props=cluster_props;
}
public TransactionTable getTransactionTable() {
return tx_table;
}
public HashMap getLockTable() {
return lock_table;
}
public String dumpTransactionTable() {
return tx_table.toString(true);
}
public boolean getDeadlockDetection() {
return deadlockDetection;
}
public void setDeadlockDetection(boolean dt) {
deadlockDetection=dt;
if(disp != null)
disp.setDeadlockDetection(dt);
}
public String getInterceptorChain() {
String retval=printInterceptorChain(interceptor_chain);
if(retval == null || retval.length() == 0)
return "<empty>";
else
return retval;
}
public List getInterceptors() {
if(interceptor_chain == null)
return null;
int num=1;
Interceptor tmp=interceptor_chain;
while((tmp=tmp.getNext()) != null) {
num++;
}
List retval=new ArrayList(num);
tmp=interceptor_chain;
num=0;
do {
retval.add(tmp);
tmp=tmp.getNext();
}
while(tmp != null);
return retval;
}
public String getCacheLoaderClass() {
return cache_loader_class;
}
public void setCacheLoaderClass(String cache_loader_class) {
this.cache_loader_class=cache_loader_class;
}
public Properties getCacheLoaderConfig() {
return cache_loader_config;
}
public void setCacheLoaderConfig(Properties cache_loader_config) {
this.cache_loader_config=cache_loader_config;
}
public CacheLoader getCacheLoader() {
return cache_loader;
}
public void setCacheLoader(CacheLoader cache_loader) {
this.cache_loader=cache_loader;
}
public boolean getCacheLoaderShared() {
return cache_loader_shared;
}
public void setCacheLoaderShared(boolean shared) {
this.cache_loader_shared=shared;
}
public void setCacheLoaderPreload(String list) {
if(list == null) return;
ArrayList l;
StringTokenizer st=new StringTokenizer(list, ",");
String tok;
Fqn fqn;
l=new ArrayList();
while(st.hasMoreTokens()) {
tok=st.nextToken();
fqn=Fqn.fromString(tok.trim());
l.add(fqn);
}
if(l.size() > 0)
this.cache_loader_preload=l;
}
public String getCacheLoaderPreload() {
return cache_loader_preload != null? cache_loader_preload.toString() : null;
}
public void setCacheLoaderFetchPersistentState(boolean flag) {
cache_loader_fetch_persistent_state=flag;
}
public boolean getCacheLoaderFetchPersistentState() {
return cache_loader_fetch_persistent_state;
}
public void setCacheLoaderFetchTransientState(boolean flag) {
cache_loader_fetch_transient_state=flag;
}
public boolean getCacheLoaderFetchTransientState() {
return cache_loader_fetch_transient_state;
}
public boolean getSyncCommitPhase() {
return sync_commit_phase;
}
public void setSyncCommitPhase(boolean sync_commit_phase) {
this.sync_commit_phase=sync_commit_phase;
}
public boolean getSyncRollbackPhase() {
return sync_rollback_phase;
}
public void setSyncRollbackPhase(boolean sync_rollback_phase) {
this.sync_rollback_phase=sync_rollback_phase;
}
public void setEvictionPolicyConfig(Element config) {
evictConfig_ = config;
log.info("setEvictionPolicyConfig(): " +config);
}
public Element getEvictionPolicyConfig() {
return evictConfig_;
}
public void setClusterConfig(Element config) {
StringBuffer buffer=new StringBuffer();
NodeList stack=config.getChildNodes();
int length=stack.getLength();
for(int s=0; s < length; s++) {
org.w3c.dom.Node node=stack.item(s);
if(node.getNodeType() != org.w3c.dom.Node.ELEMENT_NODE)
continue;
Element tag=(Element)node;
String protocol=tag.getTagName();
buffer.append(protocol);
NamedNodeMap attrs=tag.getAttributes();
int attrLength=attrs.getLength();
if(attrLength > 0)
buffer.append('(');
for(int a=0; a < attrLength; a++) {
Attr attr=(Attr)attrs.item(a);
String name=attr.getName();
String value=attr.getValue();
buffer.append(name);
buffer.append('=');
buffer.append(value);
if(a < attrLength - 1)
buffer.append(';');
}
if(attrLength > 0)
buffer.append(')');
buffer.append(':');
}
buffer.setLength(buffer.length() - 1);
setClusterProperties(buffer.toString());
log.info("setting cluster properties from xml to: " + cluster_props);
}
public long getInitialStateRetrievalTimeout() {
return state_fetch_timeout;
}
public void setInitialStateRetrievalTimeout(long timeout) {
state_fetch_timeout=timeout;
}
public String getCacheMode() {
return mode2String(cache_mode);
}
public int getCacheModeInternal() {
return cache_mode;
}
private String mode2String(int mode) {
switch(mode) {
case LOCAL:
return "LOCAL";
case REPL_ASYNC:
return "REPL_ASYNC";
case REPL_SYNC:
return "REPL_SYNC";
default:
throw new RuntimeException("setCacheMode(): caching mode " + mode + " is invalid");
}
}
public void setCacheMode(String mode) throws Exception {
int m=string2Mode(mode);
setCacheMode(m);
}
public void setCacheMode(int mode) {
if(mode == LOCAL || mode == REPL_ASYNC || mode == REPL_SYNC)
this.cache_mode=mode;
else
throw new IllegalArgumentException("setCacheMode(): caching mode " + mode + " is invalid");
}
public long getSyncReplTimeout() {
return sync_repl_timeout;
}
public void setSyncReplTimeout(long timeout) {
sync_repl_timeout=timeout;
}
public boolean getUseReplQueue() {
return use_repl_queue;
}
public void setUseReplQueue(boolean flag) {
use_repl_queue=flag;
if(flag) {
if(repl_queue == null) {
repl_queue=new ReplicationQueue(this, repl_queue_interval, repl_queue_max_elements);
if(repl_queue_interval >= 0)
repl_queue.start();
}
}
else {
if(repl_queue != null) {
repl_queue.stop();
repl_queue=null;
}
}
}
public long getReplQueueInterval() {
return repl_queue_interval;
}
public void setReplQueueInterval(long interval) {
this.repl_queue_interval=interval;
if(repl_queue != null)
repl_queue.setInterval(interval);
}
public int getReplQueueMaxElements() {
return repl_queue_max_elements;
}
public void setReplQueueMaxElements(int max_elements) {
this.repl_queue_max_elements=max_elements;
if(repl_queue != null)
repl_queue.setMax_elements(max_elements);
}
public ReplicationQueue getReplQueue() {
return repl_queue;
}
public String getIsolationLevel() {
return isolationLevel.toString();
}
public void setIsolationLevel(String level) {
IsolationLevel tmp_level=IsolationLevel.stringToIsolationLevel(level);
if(tmp_level == null) {
throw new IllegalArgumentException("TreeCache.setIsolationLevel(): level \"" + level + "\" is invalid");
}
setIsolationLevel(tmp_level);
}
public void setIsolationLevel(IsolationLevel level) {
isolationLevel=level;
LockStrategyFactory.setIsolationLevel(level);
}
public IsolationLevel getIsolationLevelClass() {
return isolationLevel;
}
public boolean getFetchStateOnStartup() {
return fetch_state_on_startup;
}
public void setFetchStateOnStartup(boolean flag) {
fetch_state_on_startup=flag;
}
public long getLockAcquisitionTimeout() {
return lock_acquisition_timeout;
}
public void setLockAcquisitionTimeout(long timeout) {
this.lock_acquisition_timeout=timeout;
}
public String getEvictionPolicyClass() {
return eviction_policy_class;
}
public void setEvictionPolicyClass(String eviction_policy_class) {
if(eviction_policy_class == null || eviction_policy_class.length() ==0)
return;
try {
this.eviction_policy_class=eviction_policy_class;
eviction_policy_provider =(TreeCacheListener)
getClass().getClassLoader().loadClass(eviction_policy_class).newInstance();
this.addTreeCacheListener(eviction_policy_provider );
}
catch(Throwable t) {
log.error("setEvictionPolicyClass(): failed creating instance of " + eviction_policy_class, t);
}
}
public int getEvictionThreadWakeupIntervalSeconds() {
if( eviction_policy_provider == null ) return -1;
else
return ((LRUPolicy)eviction_policy_provider).getWakeupIntervalSeconds();
}
public void setTransactionManagerLookup(TransactionManagerLookup l) {
this.tm_lookup=l;
}
public String getTransactionManagerLookupClass() {
return tm_lookup_class;
}
public void setTransactionManagerLookupClass(String cl) throws Exception {
this.tm_lookup_class=cl;
}
public TransactionManager getTransactionManager() {
return tm;
}
public TreeCache getInstance() {
return this;
}
public void setReplicationHandler(Replicatable handler) {
replication_handler=handler;
}
public Replicatable getReplicationHandler() {
return replication_handler;
}
public void fetchState(long timeout) throws ChannelClosedException, ChannelNotConnectedException {
if(channel == null)
throw new ChannelNotConnectedException();
boolean rc=channel.getState(null, timeout);
if(rc)
log.info("fetchState(): state was retrieved successfully");
else
log.info("fetchState(): state could not be retrieved (first member)");
}
public void addTreeCacheListener(TreeCacheListener listener) {
if(!listeners.contains(listener))
listeners.addElement(listener);
}
public void removeTreeCacheListener(TreeCacheListener listener) {
listeners.removeElement(listener);
}
public void createService() throws Exception {
}
public void destroyService() {
}
public void startService() throws Exception {
if(this.tm_lookup == null && this.tm_lookup_class != null) {
Class clazz=Thread.currentThread().getContextClassLoader().loadClass(this.tm_lookup_class);
this.tm_lookup=(TransactionManagerLookup)clazz.newInstance();
}
try {
if(tm_lookup != null)
tm=tm_lookup.getTransactionManager();
else
log.warn("No transaction manager lookup class has been defined. Transactions cannot be used");
}
catch(Exception e) {
log.debug("failed looking up TransactionManager, will not use transactions", e);
}
createCacheLoader();
createInterceptorChain();
createEvictionPolicy();
switch(cache_mode) {
case LOCAL:
log.info("cache mode is local, will not create the channel");
break;
case REPL_SYNC:
case REPL_ASYNC:
log.info("cache mode is " + mode2String(cache_mode));
if(channel != null) { log.info("channel is already running");
return;
}
if(cluster_props == null) {
cluster_props=getDefaultProperties();
log.debug("setting cluster properties to default value");
}
channel=new JChannel(cluster_props);
channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
if(log.isTraceEnabled())
log.trace("cache properties: " + cluster_props);
channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
disp=new RpcDispatcher(channel, ml, this, this);
disp.setDeadlockDetection(deadlockDetection);
channel.connect(cluster_name);
if(fetch_state_on_startup) {
fetchStateOnStartup();
}
break;
default:
throw new IllegalArgumentException("cache mode " + cache_mode + " is invalid");
}
cacheLoaderPreload();
coordinator=determineCoordinator();
notifyCacheStarted();
}
private void createEvictionPolicy() {
if(eviction_policy_provider != null)
((LRUPolicy)eviction_policy_provider).configure(this);
}
protected void createInterceptorChain() throws IllegalAccessException, InstantiationException, ClassNotFoundException {
Interceptor call_interceptor=null;
Interceptor lock_interceptor=null;
Interceptor repl_interceptor=null;
Interceptor cache_loader_interceptor=null;
Interceptor cache_store_interceptor=null;
Interceptor unlock_interceptor=null;
Interceptor first=null;
call_interceptor=createInterceptor("org.jboss.cache.interceptors.CallInterceptor");
call_interceptor.setCache(this);
lock_interceptor=createInterceptor("org.jboss.cache.interceptors.LockInterceptor");
lock_interceptor.setCache(this);
unlock_interceptor=createInterceptor("org.jboss.cache.interceptors.UnlockInterceptor");
unlock_interceptor.setCache(this);
if(cache_mode != LOCAL) {
repl_interceptor=createInterceptor("org.jboss.cache.interceptors.ReplicationInterceptor");
repl_interceptor.setCache(this);
}
if(cache_loader_class != null || cache_loader != null) {
cache_loader_interceptor=createInterceptor("org.jboss.cache.interceptors.CacheLoaderInterceptor");
cache_loader_interceptor.setCache(this);
cache_store_interceptor=createInterceptor("org.jboss.cache.interceptors.CacheStoreInterceptor");
cache_store_interceptor.setCache(this);
}
if(cache_loader_interceptor != null) {
if(cache_loader_shared == true) {
if(first == null)
first=cache_store_interceptor;
else
addInterceptor(first, cache_store_interceptor);
}
}
if(repl_interceptor != null) {
if(first == null)
first=repl_interceptor;
else
addInterceptor(first, repl_interceptor);
}
if(unlock_interceptor != null) {
if(first == null)
first=unlock_interceptor;
else
addInterceptor(first, unlock_interceptor);
}
if(cache_loader_interceptor != null) {
if(cache_loader_shared == true) {
if(first == null)
first=cache_loader_interceptor;
else
addInterceptor(first, cache_loader_interceptor);
}
else {
if(first == null)
first=cache_loader_interceptor;
else
addInterceptor(first, cache_loader_interceptor);
if(first == null)
first=cache_store_interceptor;
else
addInterceptor(first, cache_store_interceptor);
}
}
if(first == null)
first=lock_interceptor;
else
addInterceptor(first, lock_interceptor);
if(first == null)
first=call_interceptor;
else
addInterceptor(first, call_interceptor);
interceptor_chain=first;
if(log.isInfoEnabled())
log.info("interceptor chain is:\n" + printInterceptorChain(first));
}
private String printInterceptorChain(Interceptor i) {
StringBuffer sb=new StringBuffer();
if(i != null) {
if(i.getNext() != null) {
sb.append(printInterceptorChain(i.getNext())).append("\n");
}
sb.append(i.getClass());
}
return sb.toString();
}
private void addInterceptor(Interceptor first, Interceptor i) {
if(first == null) {
return;
}
do {
if(first.getNext() != null)
first=first.getNext();
else
break;
}
while(first != null); first.setNext(i);
}
private Interceptor createInterceptor(String classname) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
Class clazz=getClass().getClassLoader().loadClass(classname);
return (Interceptor)clazz.newInstance();
}
protected void createCacheLoader() throws Exception {
if(cache_loader == null && cache_loader_class != null) {
Class cl=Thread.currentThread().getContextClassLoader().loadClass(cache_loader_class);
cache_loader=(CacheLoader)cl.newInstance();
cache_loader.setConfig(cache_loader_config);
cache_loader.setCache(this);
cache_loader.create();
cache_loader.start();
}
}
protected void cacheLoaderPreload() throws Exception {
if(cache_loader != null) {
if(log.isTraceEnabled())
log.trace("preloading " + cache_loader_preload);
if(cache_loader_preload != null) {
for(Iterator it=cache_loader_preload.iterator(); it.hasNext();) {
Fqn fqn=(Fqn)it.next();
preload(fqn, true, true);
}
}
}
}
public void load(String fqn) throws Exception {
if(cache_loader != null)
preload(Fqn.fromString(fqn), true, true);
}
void preload(Fqn fqn, boolean preload_parents, boolean preload_children) throws Exception {
this.get(fqn, "bla");
if(preload_parents) {
Fqn tmp_fqn=new Fqn();
for(int i=0; i < fqn.size()-1; i++) {
tmp_fqn=new Fqn(tmp_fqn, fqn.get(i));
this.get(tmp_fqn, "bla");
}
}
if(preload_children == false)
return;
Set children=cache_loader.getChildrenNames(fqn);
if(children == null)
return;
for(Iterator it=children.iterator(); it.hasNext();) {
String child_name=(String)it.next();
Fqn child_fqn=new Fqn(fqn, child_name);
preload(child_fqn, false, true);
}
}
void destroyCacheLoader() {
if(cache_loader != null) {
cache_loader.stop();
cache_loader.destroy();
cache_loader=null;
}
}
protected boolean determineCoordinator() {
if(channel == null)
return false;
Object local_addr=getLocalAddress();
if(local_addr == null)
return false;
View view=channel.getView();
if(view == null) return false;
ViewId vid=view.getVid();
if(vid == null) return false;
Object coord=vid.getCoordAddress();
if(coord == null) return false;
return local_addr.equals(coord);
}
public Address getCoordinator() {
if(channel == null) return null;
View view=channel.getView();
if(view == null) return null;
ViewId vid=view.getVid();
if(vid == null) return null;
Address coord=vid.getCoordAddress();
return coord;
}
public byte[] getStateBytes() {
return this.getMessageListener().getState();
}
public void setStateBytes(byte[] state) {
this.getMessageListener().setState(state);
}
protected void fetchStateOnStartup() throws Exception {
long start, stop;
synchronized(stateLock) {
isStateSet=false;
start=System.currentTimeMillis();
boolean rc=channel.getState(null, state_fetch_timeout);
if(rc) {
while(!isStateSet) {
try {
stateLock.wait();
}
catch(InterruptedException iex) {
}
}
stop=System.currentTimeMillis();
log.info("state was retrieved successfully (in " + (stop-start) + " milliseconds)");
}
else
log.info("state could not be retrieved (must be first member in group)");
}
}
public void stopService() {
if(channel != null) {
log.info("stopService(): closing the channel");
channel.close();
channel=null;
}
if(disp != null) {
log.info("stopService(): stopping the dispatcher");
disp.stop();
disp=null;
}
if(members != null && members.size() > 0)
members.clear();
if(repl_queue != null)
repl_queue.stop();
destroyCacheLoader();
notifyCacheStopped();
listeners.clear();
}
public Node get(String fqn) throws CacheException {
return get(Fqn.fromString(fqn));
}
public Node get(Fqn fqn) throws CacheException {
MethodCall m=new MethodCall(getNodeMethodLocal, new Object[]{fqn});
return (Node)invokeMethod(m);
}
public Node _get(Fqn fqn) throws CacheException {
return findNode(fqn);
}
public Set getKeys(String fqn) throws CacheException {
return getKeys(Fqn.fromString(fqn));
}
public Set getKeys(Fqn fqn) throws CacheException {
MethodCall m=new MethodCall(getKeysMethodLocal, new Object[]{fqn});
return (Set)invokeMethod(m);
}
public Set _getKeys(Fqn fqn) throws CacheException {
Set retval=null;
Node n=findNode(fqn);
if(n == null)
return null;
retval=n.getDataKeys();
return retval != null? new LinkedHashSet(retval) : null;
}
public Object get(String fqn, Object key) throws CacheException {
return get(Fqn.fromString(fqn), key);
}
public Object get(Fqn fqn, Object key) throws CacheException {
return get(fqn, key, true);
}
public Object _get(Fqn fqn, Object key, boolean sendNodeEvent) throws CacheException {
if(log.isTraceEnabled())
log.trace("_get(" + ", \"" + fqn + "\", " + key + ", \"" +sendNodeEvent +"\")");
Node n=findNode(fqn);
if(n == null) return null;
if(sendNodeEvent)
notifyNodeVisisted(fqn);
return n.get(key);
}
protected Object get(Fqn fqn, Object key, boolean sendNodeEvent) throws CacheException {
MethodCall m=new MethodCall(getKeyValueMethodLocal, new Object[]{fqn, key, new Boolean(sendNodeEvent)});
return invokeMethod(m);
}
public Object peek(Fqn fqn, Object key) throws CacheException {
return get(fqn, key, false);
}
public boolean exists(String fqn) {
return exists(Fqn.fromString(fqn));
}
public boolean exists(Fqn fqn) {
Node n=findInternal(fqn);
return n != null;
}
private Node findInternal(Fqn fqn) {
if(fqn == null || fqn.size() == 0) return root;
Node n=root, retval=null;
Object obj;
for(int i=0; i < fqn.size(); i++) {
obj=fqn.get(i);
n=n.getChild(obj);
if(n == null)
return null;
else
retval=n;
}
return retval;
}
public boolean exists(String fqn, Object key) {
return exists(Fqn.fromString(fqn), key);
}
public boolean exists(Fqn fqn, Object key) {
Node n=findInternal(fqn);
if(n == null)
return false;
else
return n.containsKey(key);
}
public void put(String fqn, Map data) throws CacheException {
put(Fqn.fromString(fqn), data);
}
public void put(Fqn fqn, Map data) throws CacheException {
GlobalTransaction tx=getCurrentTransaction();
MethodCall m=new MethodCall(putDataMethodLocal, new Object[]{tx, fqn, data, Boolean.TRUE});
invokeMethod(m);
}
public Object put(String fqn, Object key, Object value) throws CacheException {
return put(Fqn.fromString(fqn), key, value);
}
public Object putFailFast(Fqn fqn, Object key, Object value, long timeout) throws CacheException {
GlobalTransaction tx=getCurrentTransaction();
MethodCall m=new MethodCall(putFailFastKeyValueMethodLocal,
new Object[]{tx, fqn, key, value, Boolean.TRUE, new Long(timeout)});
return invokeMethod(m);
}
public Object putFailFast(String fqn, Object key, Object value, long timeout) throws CacheException {
GlobalTransaction tx=getCurrentTransaction();
Fqn fqntmp=Fqn.fromString(fqn);
MethodCall m=new MethodCall(putFailFastKeyValueMethodLocal,
new Object[]{tx, fqntmp, key, value, Boolean.TRUE, new Long(timeout)});
return invokeMethod(m);
}
public Object put(Fqn fqn, Object key, Object value) throws CacheException {
GlobalTransaction tx=getCurrentTransaction();
MethodCall m=new MethodCall(putKeyValMethodLocal, new Object[]{tx, fqn, key, value, Boolean.TRUE});
return invokeMethod(m);
}
public void remove(String fqn) throws CacheException {
remove(Fqn.fromString(fqn));
}
public void remove(Fqn fqn) throws CacheException {
GlobalTransaction tx=getCurrentTransaction();
MethodCall m=new MethodCall(removeNodeMethodLocal, new Object[]{tx, fqn, Boolean.TRUE});
invokeMethod(m);
}
public void evict(Fqn fqn) throws CacheException {
MethodCall m=new MethodCall(evictNodeMethodLocal, new Object[]{fqn});
invokeMethod(m);
}
public Object remove(String fqn, Object key) throws CacheException {
return remove(Fqn.fromString(fqn), key);
}
public Object remove(Fqn fqn, Object key) throws CacheException {
GlobalTransaction tx=getCurrentTransaction();
MethodCall m=new MethodCall(removeKeyMethodLocal, new Object[]{tx, fqn, key, Boolean.TRUE});
return invokeMethod(m);
}
public void removeData(String fqn) throws CacheException {
removeData(Fqn.fromString(fqn));
}
public void removeData(Fqn fqn) throws CacheException {
GlobalTransaction tx=getCurrentTransaction();
MethodCall m=new MethodCall(removeDataMethodLocal, new Object[]{tx, fqn, Boolean.TRUE});
invokeMethod(m);
}
public void releaseAllLocks(String fqn) {
releaseAllLocks(Fqn.fromString(fqn));
}
public void releaseAllLocks(Fqn fqn) {
MethodCall m=new MethodCall(releaseAllLocksMethodLocal, new Object[]{fqn});
try {
invokeMethod(m);
}
catch(CacheException e) {
log.error("failed releasing all locks for " + fqn, e);
}
}
public String print(String fqn) {
return print(Fqn.fromString(fqn));
}
public String print(Fqn fqn) {
MethodCall m=new MethodCall(printMethodLocal, new Object[]{fqn});
Object retval=null;
try {
retval=invokeMethod(m);
}
catch(Throwable e) {
retval=e;
}
if(retval != null)
return retval.toString();
else return "";
}
public Set getChildrenNames(String fqn) throws CacheException {
return getChildrenNames(Fqn.fromString(fqn));
}
public Set getChildrenNames(Fqn fqn) throws CacheException {
MethodCall m=new MethodCall(getChildrenNamesMethodLocal, new Object[]{fqn});
return (Set)invokeMethod(m);
}
public Set _getChildrenNames(Fqn fqn) throws CacheException {
Node n=findNode(fqn);
if(n == null) return null;
Map m=n.getChildren();
if(m != null)
return new LinkedHashSet(m.keySet());
else
return null;
}
public boolean hasChild(Fqn fqn) {
if(fqn == null) return false;
Node n=root;
Object obj;
for(int i=0; i < fqn.size(); i++) {
obj=fqn.get(i);
n=n.getChild(obj);
if(n == null)
return false;
}
return n.hasChildren();
}
public String toString() {
return toString(false);
}
public String toString(boolean details) {
StringBuffer sb=new StringBuffer();
int indent=0;
Map children;
if(!details) {
sb.append(getClass().getName()).append(" [").append(getNumberOfNodes()).append(" nodes, ");
sb.append(getNumberOfLocksHeld()).append(" locks]");
}
else {
children=root.getChildren();
if(children != null && children.size() > 0) {
Collection nodes=children.values();
for(Iterator it=nodes.iterator(); it.hasNext();) {
((Node)it.next()).print(sb, indent);
sb.append("\n");
}
}
else
sb.append(SEPARATOR);
}
return sb.toString();
}
public String printDetails() {
StringBuffer sb=new StringBuffer();
int indent=0;
Map children;
children=root.getChildren();
if(children != null && children.size() > 0) {
Collection nodes=children.values();
for(Iterator it=nodes.iterator(); it.hasNext();) {
((Node)it.next()).printDetails(sb, indent);
sb.append("\n");
}
}
else
sb.append(SEPARATOR);
return sb.toString();
}
public String printLockInfo() {
StringBuffer sb=new StringBuffer("\n");
int indent=0;
Map children;
children=root.getChildren();
if(children != null && children.size() > 0) {
Collection nodes=children.values();
for(Iterator it=nodes.iterator(); it.hasNext();) {
((Node)it.next()).printLockInfo(sb, indent);
sb.append("\n");
}
}
else
sb.append(SEPARATOR);
return sb.toString();
}
public int getNumberOfLocksHeld() {
return numLocks(root);
}
int numLocks(Node n) {
int num=0;
Map children;
if(n.isLocked())
num++;
if((children=n.getChildren()) != null) {
for(Iterator it=children.values().iterator(); it.hasNext();) {
num+=numLocks((Node)it.next());
}
}
return num;
}
public int getNumberOfNodes() {
return numNodes(root)-1;
}
int numNodes(Node n) {
if(n == null)
return 0;
int count=1; if(n.hasChildren()) {
Map children=n.getChildren();
if(children != null && children.size() > 0) {
Collection child_nodes=children.values();
Node child;
for(Iterator it=child_nodes.iterator(); it.hasNext();) {
child=(Node)it.next();
count+=numNodes(child);
}
}
}
return count;
}
public int getNumberOfAttributes() {
return numAttributes(root);
}
int numAttributes(Node n) {
if(n == null)
return 0;
int count=n.numAttributes();
if(n.hasChildren()) {
Map children=n.getChildren();
if(children != null && children.size() > 0) {
Collection child_nodes=children.values();
Node child;
for(Iterator it=child_nodes.iterator(); it.hasNext();) {
child=(Node)it.next();
count+=child.numAttributes();
}
}
}
return count;
}
public List callRemoteMethods(Vector mbrs, MethodCall method_call,
boolean synchronous, boolean exclude_self, long timeout)
throws Exception {
RspList rsps;
Rsp rsp;
List retval;
Vector validMembers;
int mode=synchronous ? GroupRequest.GET_ALL : GroupRequest.GET_NONE;
if(disp == null)
return null;
validMembers=mbrs != null ? new Vector(mbrs) : new Vector(this.members);
if(exclude_self && validMembers.size() > 0) {
Object local_addr=getLocalAddress();
if(local_addr != null)
validMembers.remove(local_addr);
}
if(validMembers.size() == 0) {
if(log.isTraceEnabled())
log.trace("destination list is empty, discarding call");
return null;
}
if(log.isTraceEnabled())
log.trace("callRemoteMethods(): valid members are " + validMembers);
rsps=disp.callRemoteMethods(validMembers, method_call, mode, timeout);
if(log.isTraceEnabled())
log.trace("(" + getLocalAddress() + "): responses for method " + method_call.getName() + ":\n" + rsps);
if(rsps == null)
return null;
retval=new ArrayList(rsps.size());
for(int i=0; i < rsps.size(); i++) {
rsp=(Rsp)rsps.elementAt(i);
if(rsp.wasSuspected() || !rsp.wasReceived())
retval.add(new TimeoutException("rsp=" + rsp));
else
retval.add(rsp.getValue());
}
return retval;
}
public List callRemoteMethods(Vector members, Method method, Object[] args,
boolean synchronous, boolean exclude_self, long timeout)
throws Exception {
return callRemoteMethods(members, new MethodCall(method, args), synchronous, exclude_self, timeout);
}
public List callRemoteMethods(Vector members, String method_name,
Class[] types, Object[] args,
boolean synchronous, boolean exclude_self, long timeout)
throws Exception {
Method method=getClass().getDeclaredMethod(method_name, types);
return callRemoteMethods(members, method, args, synchronous, exclude_self, timeout);
}
public void _put(GlobalTransaction tx, String fqn, Map data, boolean create_undo_ops)
throws CacheException {
_put(tx, Fqn.fromString(fqn), data, create_undo_ops);
}
public void _put(GlobalTransaction tx, Fqn fqn, Map data, boolean create_undo_ops)
throws CacheException {
_put(tx, fqn, data, create_undo_ops, false);
}
public void _put(GlobalTransaction tx, Fqn fqn, Map data, boolean create_undo_ops, boolean erase_contents)
throws CacheException {
Node n;
MethodCall undo_op=null;
Map old_data;
if(log.isTraceEnabled())
log.trace(new StringBuffer().append("_put(").append(tx).append(", \"").append(fqn)
.append("\", ").append(data).append(")").toString());
n=findNode(fqn);
if(n == null) {
String errStr="node " + fqn + " not found (gtx=" + tx + ", caller=" + Thread.currentThread() + ")";
if(log.isTraceEnabled())
log.trace(errStr);
throw new NodeNotExistsException(errStr);
}
if(tx != null && create_undo_ops) {
if((old_data=n.getData()) == null) {
undo_op=new MethodCall(removeDataMethodLocal,
new Object[]{tx, fqn, Boolean.FALSE});
}
else {
undo_op=new MethodCall(putDataEraseMethodLocal,
new Object[]{tx, fqn,
new HashMap(old_data),
Boolean.FALSE,
Boolean.TRUE}); }
}
n.put(data, erase_contents);
if(tx != null && create_undo_ops) {
tx_table.addUndoOperation(tx, undo_op);
}
notifyNodeModified(fqn);
}
public Object _put(GlobalTransaction tx, String fqn, Object key, Object value, boolean create_undo_ops)
throws CacheException {
return _put(tx, Fqn.fromString(fqn), key, value, create_undo_ops);
}
public Object _put(GlobalTransaction tx, Fqn fqn, Object key, Object value, boolean create_undo_ops)
throws CacheException {
Node n=null;
MethodCall undo_op=null;
Object old_value=null;
if(log.isTraceEnabled()) {
log.trace(new StringBuffer().append("_put(").append(tx).append(", \"").
append(fqn).append("\", ").append(key).append(", ").append(value).append(")").toString());
}
n=findNode(fqn);
if(n == null) {
String errStr="node " + fqn + " not found (gtx=" + tx + ", caller=" + Thread.currentThread() + ")";
if(log.isTraceEnabled())
log.trace(errStr);
throw new NodeNotExistsException(errStr);
}
old_value=n.get(key);
n.put(key, value);
if(tx != null && create_undo_ops) {
if(old_value == null) {
undo_op=new MethodCall(removeKeyMethodLocal,
new Object[]{tx, fqn, key, Boolean.FALSE});
}
else {
undo_op=new MethodCall(putKeyValMethodLocal,
new Object[]{tx, fqn, key, old_value,
Boolean.FALSE});
}
tx_table.addUndoOperation(tx, undo_op);
}
notifyNodeModified(fqn);
return old_value;
}
public Object _put(GlobalTransaction tx, Fqn fqn, Object key, Object value, boolean create_undo_ops, long timeout)
throws CacheException {
return _put(tx, fqn, key, value, create_undo_ops);
}
public void _remove(GlobalTransaction tx, String fqn, boolean create_undo_ops) throws CacheException {
_remove(tx, Fqn.fromString(fqn), create_undo_ops);
}
public void _remove(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops) throws CacheException {
_remove(tx, fqn, create_undo_ops, true);
}
public void _remove(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent)
throws CacheException {
_remove(tx, fqn, create_undo_ops, sendNodeEvent, false);
}
public void _remove(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent, boolean eviction)
throws CacheException {
Node n, parent_node;
MethodCall undo_op=null;
if(log.isTraceEnabled())
log.trace("_remove(" + tx + ", \"" + fqn + "\")");
if(fqn.size() == 0) {
Set children=getChildrenNames(fqn);
if(children != null) {
Object[] kids=children.toArray();
for(int i=0; i < kids.length; i++) {
Object s=kids[i];
Fqn tmp=new Fqn(fqn, s);
try {
_remove(tx, tmp, create_undo_ops, true, eviction);
}
catch(Exception e) {
log.error("failure removing node " + tmp);
}
}
}
return;
}
n=findNode(fqn);
if(n == null) {
log.warn("node " + fqn + " not found");
return;
}
parent_node=n.getParent();
parent_node.removeChild(n.getName());
n.releaseAll(tx != null? tx : (Object)Thread.currentThread());
if(tx != null && create_undo_ops && n != null && eviction == false) {
undo_op=new MethodCall(addChildMethodLocal, new Object[]{tx, parent_node.getFqn(), n.getName(), n});
tx_table.addUndoOperation(tx, undo_op);
}
if(sendNodeEvent)
notifyNodeRemoved(fqn);
else
notifyNodeEvicted(fqn);
}
public Object _remove(GlobalTransaction tx, String fqn, Object key, boolean create_undo_ops)
throws CacheException {
return _remove(tx, Fqn.fromString(fqn), key, create_undo_ops);
}
public Object _remove(GlobalTransaction tx, Fqn fqn, Object key, boolean create_undo_ops)
throws CacheException {
return _remove(tx, fqn, key, create_undo_ops, false); }
public Object _remove(GlobalTransaction tx, Fqn fqn, Object key, boolean create_undo_ops, boolean sendNodeEvent)
throws CacheException {
Node n=null;
MethodCall undo_op=null;
Object old_value=null;
if(log.isTraceEnabled())
log.trace(new StringBuffer().append("_remove(").append(tx).append(", \"")
.append(fqn).append("\", ").append(key).append(")").toString());
n=findNode(fqn);
if(n == null) {
log.warn("node " + fqn + " not found");
return null;
}
old_value=n.remove(key);
if(tx != null && create_undo_ops && old_value != null) {
undo_op=new MethodCall(putKeyValMethodLocal,
new Object[]{tx, fqn, key, old_value,
Boolean.FALSE});
tx_table.addUndoOperation(tx, undo_op);
}
if(sendNodeEvent)
notifyNodeModified(fqn); return old_value;
}
public void _removeData(GlobalTransaction tx, String fqn, boolean create_undo_ops)
throws CacheException {
_removeData(tx, Fqn.fromString(fqn), create_undo_ops);
}
public void _removeData(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops)
throws CacheException {
_removeData(tx, fqn, create_undo_ops, true);
}
public void _removeData(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent)
throws CacheException {
_removeData(tx, fqn, create_undo_ops, sendNodeEvent, false);
}
public void _removeData(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean sendNodeEvent, boolean eviction)
throws CacheException {
Node n=null;
MethodCall undo_op=null;
Map old_data=null;
if(log.isTraceEnabled())
log.trace("_removeData(" + tx + ", \"" + fqn + "\")");
n=findNode(fqn);
if(n == null) {
log.warn("node " + fqn + " not found");
return;
}
if(tx != null && create_undo_ops && (old_data=n.getData()) != null && !eviction) {
undo_op=new MethodCall(putDataMethodLocal, new Object[]{tx, fqn, new HashMap(old_data), Boolean.FALSE});
}
n.clear();
if(eviction)
n.put(UNINITIALIZED, null);
if(sendNodeEvent) {
notifyNodeVisisted(fqn);
}
else { if(eviction)
notifyNodeEvicted(fqn);
else
notifyNodeModified(fqn); }
if(tx != null && create_undo_ops) {
tx_table.addUndoOperation(tx, undo_op);
}
}
public void _evict(Fqn fqn) throws CacheException {
if(!exists(fqn)) return; boolean create_undo_ops = false;
boolean sendNodeEvent = false;
boolean eviction=true;
if(log.isTraceEnabled())
log.trace("_evict(" + fqn + ")");
if(hasChild(fqn)) {
_removeData(null, fqn, create_undo_ops, sendNodeEvent, eviction);
}
else {
_remove(null, fqn, create_undo_ops, sendNodeEvent, eviction);
}
}
public void _addChild(GlobalTransaction tx, Fqn parent_fqn, Object child_name, Node old_node)
throws CacheException {
if(log.isTraceEnabled())
log.trace("_addChild(" + tx + ", \"" + parent_fqn + "\", \"" + child_name + "\")");
if(parent_fqn == null || child_name == null || old_node == null) {
log.error("parent_fqn or child_name or node was null");
return;
}
Node tmp=findNode(parent_fqn);
if(tmp == null) {
log.warn("node " + parent_fqn + " not found");
return;
}
tmp.addChild(child_name, old_node);
}
public Object _replicate(MethodCall method_call) throws Throwable {
if(replication_handler != null) {
return replication_handler.replicate(method_call);
}
else {
throw new UnsupportedOperationException("no replication handler is installed");
}
}
public void _replicate(List method_calls) throws Throwable {
if(replication_handler != null) {
replication_handler.replicate(method_calls);
}
else {
throw new UnsupportedOperationException("no replication handler is installed");
}
}
public void _releaseAllLocks(Fqn fqn) {
Node n;
try {
n=findNode(fqn);
if(n == null) {
log.error("releaseAllLocks(): node " + fqn + " not found");
return;
}
n.releaseAllForce();
}
catch(Throwable t) {
log.error("releaseAllLocks(): failed", t);
}
}
public String _print(Fqn fqn) {
try {
Node n=findNode(fqn);
if(n == null) return null;
return n.toString();
}
catch(Throwable t) {
return null;
}
}
public void _lock(Fqn fqn, int lock_type, boolean recursive)
throws TimeoutException, LockingException {
log.warn("method _lock() should not be invoked on TreeCache");
}
public void prepare(GlobalTransaction global_tx, List modifications, Address coord, boolean commit) {
throw new UnsupportedOperationException("prepare() should not be called on TreeCache directly");
}
public void commit(GlobalTransaction tx) {
throw new UnsupportedOperationException("commit() should not be called on TreeCache directly");
}
public void rollback(GlobalTransaction tx) {
throw new UnsupportedOperationException("rollback() should not be called on TreeCache directly");
}
public void addNode(GlobalTransaction gtx, Fqn node) {
tx_table.addNode(gtx, node);
}
class MessageListenerAdaptor implements MessageListener {
final TreeCache cache;
final Logger log;
MessageListenerAdaptor(TreeCache cache, Logger log) {
this.cache = cache;
this.log = log;
}
public void receive(Message msg) {
}
public byte[] getState() {
Object owner=null;
boolean fetch_persistent_state=cache_loader != null &&
cache_loader_shared == false &&
cache_loader_fetch_persistent_state;
byte[] transient_state=null;
byte[] persistent_state=null;
byte[][] states=new byte[2][];
byte[] retval=null;
boolean locked=false;
owner=getCurrentTransaction();
if(owner == null) owner=Thread.currentThread();
states[0]=states[1]=null;
try {
if(cache_loader_fetch_transient_state) {
log.info("locking the tree to obtain transient state");
root.acquireAll(owner, state_fetch_timeout, Node.LOCK_TYPE_READ);
locked=true;
transient_state=Util.objectToByteBuffer(root);
states[0]=transient_state;
log.info("returning the transient state (" + transient_state.length + " bytes)");
}
}
catch(Throwable t) {
log.error("failed getting the transient state", t);
}
try {
if(fetch_persistent_state) {
if(!locked) {
log.info("locking the tree to obtain persistent state");
root.acquireAll(owner, state_fetch_timeout, Node.LOCK_TYPE_READ);
locked=true;
}
log.info("getting the persistent state");
persistent_state=cache_loader.loadEntireState();
states[1]=persistent_state;
log.info("returning the persistent state (" + persistent_state.length + " bytes)");
}
}
catch(Throwable t) {
log.error("failed getting the persistent state", t);
}
try {
retval=Util.objectToByteBuffer(states);
return retval;
}
catch(Throwable t) {
log.error("failed serializing transient and persistent state", t);
return retval;
}
finally {
root.releaseAll(owner);
}
}
public void setState(byte[] new_state) {
try {
_setState(new_state);
}
finally {
synchronized(stateLock) {
stateLock.notifyAll();
if(root != null)
root.releaseAllForce();
}
}
}
void _setState(byte[] new_state) {
Node new_root=null, old_root=null;
Object obj;
Object owner=null;
byte[][] states=null;
byte[] transient_state=null;
byte[] persistent_state=null;
boolean locked=false;
if(new_state == null) {
log.info("new cache is null (maybe first member in cluster)");
return;
}
try {
log.info("received the state (size=" + new_state.length + " bytes)");
states=(byte[][])Util.objectFromByteBuffer(new_state);
transient_state=states[0];
persistent_state=states[1];
if(transient_state != null)
log.info("transient state: " + transient_state.length + " bytes");
if(persistent_state != null)
log.info("persistent state: " + persistent_state.length + " bytes");
owner=getCurrentTransaction();
if(owner == null) owner=Thread.currentThread();
}
catch(Throwable t) {
log.error("failed unserializing state", t);
}
if(transient_state != null) {
try {
log.info("setting transient state");
obj=Util.objectFromByteBuffer(transient_state);
new_root=(Node)obj;
new_root.setRecursiveTreeCacheInstance(cache); log.info("locking the old tree");
root.acquireAll(owner, state_fetch_timeout, Node.LOCK_TYPE_WRITE);
locked=true;
log.info("locking the old tree was successful");
old_root=root;
root=new_root;
log.info("setting the transient state was successful");
notifyAllNodesCreated(root);
}
catch(Throwable t) {
log.error("failed setting transient state", t);
}
}
if(persistent_state != null) {
if(cache_loader == null) {
log.error("cache loader is null, cannot set persistent state");
}
else {
try {
if(!locked) {
log.info("locking the old tree");
root.acquireAll(owner, state_fetch_timeout, Node.LOCK_TYPE_WRITE);
old_root=root; locked=true;
root=new Node(SEPARATOR, Fqn.fromString(SEPARATOR), null, null, cache);
log.info("locking the old tree was successful");
}
log.info("setting the persistent state");
cache_loader.storeEntireState(persistent_state);
log.info("setting the persistent state was successful");
}
catch(Throwable t) {
log.error("failed setting persistent state", t);
}
}
}
if(old_root != null) {
log.info("forcing release of all locks in old tree");
try {old_root.releaseAllForce();} catch(Throwable t) {log.error("failed releasing locks", t);}
}
isStateSet=true;
}
}
public void viewAccepted(View new_view) {
Vector new_mbrs=new_view.getMembers();
log.info("viewAccepted(): new members: " + new_mbrs);
if(new_mbrs != null) {
members.removeAllElements();
members.addAll(new_view.getMembers());
notifyViewChange(new_view);
}
if(cache_loader_shared) {
coordinator=determineCoordinator();
}
}
public void suspect(Address suspected_mbr) {
}
public void block() {
}
protected Transaction getLocalTransaction() {
if(tm == null) {
return null;
}
try {
return tm.getTransaction();
}
catch(Throwable t) {
return null;
}
}
boolean isValid(Transaction tx) {
if(tx == null) return false;
int status=-1;
try {
status=tx.getStatus();
return status == Status.STATUS_ACTIVE || status == Status.STATUS_PREPARING;
}
catch(SystemException e) {
log.error("failed getting transaction status", e);
return false;
}
}
public GlobalTransaction getCurrentTransaction() {
Transaction tx;
if((tx=getLocalTransaction()) == null) { return null;
}
if(!isValid(tx)) { int status=-1;
try {status=tx.getStatus();} catch(SystemException e) {}
log.warn("status is " + status + " (not ACTIVE or PREPARING); returning null)");
return null;
}
return getCurrentTransaction(tx);
}
public GlobalTransaction getCurrentTransaction(Transaction tx) {
synchronized(tx_table) {
GlobalTransaction gtx=tx_table.get(tx);
if(gtx == null) {
Address addr=(Address)getLocalAddress();
gtx=GlobalTransaction.create(addr);
tx_table.put(tx, gtx);
TransactionEntry ent=new TransactionEntry();
ent.setTransaction(tx);
tx_table.put(gtx, ent);
if(log.isTraceEnabled())
log.trace("created new GTX: " + gtx + ", local TX=" + tx);
}
return gtx;
}
}
protected Object invokeMethod(MethodCall m) throws CacheException {
try {
return interceptor_chain.invoke(m);
}
catch(Throwable t) {
if(t instanceof CacheException)
throw (CacheException)t;
throw new NestedRuntimeException(t);
}
}
private Node findNode(Fqn fqn) {
Node n, child_node=null;
Object child_name;
int treeNodeSize;
Fqn tmp_fqn=new Fqn();
if(fqn == null) return null;
if((treeNodeSize=fqn.size()) == 0)
return root;
n=root;
for(int i=0; i < treeNodeSize; i++) {
child_name=fqn.get(i);
tmp_fqn=new Fqn(tmp_fqn, child_name);
child_node=n.getChild(child_name);
if(child_node == null)
return null;
n=child_node;
}
return child_node;
}
public void notifyNodeCreated(Fqn fqn) {
for(int i=0; i < listeners.size(); i++)
((TreeCacheListener)listeners.elementAt(i)).nodeCreated(fqn);
}
public void notifyNodeLoaded(Fqn fqn) {
for(int i=0; i < listeners.size(); i++)
((TreeCacheListener)listeners.elementAt(i)).nodeLoaded(fqn);
}
protected void notifyNodeRemoved(Fqn fqn) {
for(int i=0; i < listeners.size(); i++)
((TreeCacheListener)listeners.elementAt(i)).nodeRemoved(fqn);
}
protected void notifyNodeEvicted(Fqn fqn) {
for(int i=0; i < listeners.size(); i++)
((TreeCacheListener)listeners.elementAt(i)).nodeEvicted(fqn);
}
protected void notifyNodeModified(Fqn fqn) {
for(int i=0; i < listeners.size(); i++)
((TreeCacheListener)listeners.elementAt(i)).nodeModified(fqn);
}
protected void notifyNodeVisisted(Fqn fqn) {
for(int i=0; i < listeners.size(); i++)
((TreeCacheListener)listeners.elementAt(i)).nodeVisited(fqn);
}
protected void notifyCacheStarted() {
for(int i=0; i < listeners.size(); i++)
((TreeCacheListener)listeners.elementAt(i)).cacheStarted(this);
}
protected void notifyCacheStopped() {
for(int i=0; i < listeners.size(); i++)
((TreeCacheListener)listeners.elementAt(i)).cacheStopped(this);
}
protected void notifyViewChange(View v) {
for(int i=0; i < listeners.size(); i++)
((TreeCacheListener)listeners.elementAt(i)).viewChange(v);
}
protected void notifyAllNodesCreated(Node curr) {
Node n;
Map children;
if(curr == null) return;
notifyNodeCreated(curr.fqn);
if((children=curr.getChildren()) != null) {
for(Iterator it=children.values().iterator(); it.hasNext();) {
n=(Node)it.next();
notifyAllNodesCreated(n);
}
}
}
protected String getDefaultProperties() {
return "UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;" +
"mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" +
"PING(timeout=1000;num_initial_members=2):" +
"MERGE2(min_interval=5000;max_interval=10000):" +
"FD_SOCK:" +
"VERIFY_SUSPECT(timeout=1500):" +
"pbcast.NAKACK(gc_lag=50;max_xmit_size=8192;retransmit_timeout=600,1200,2400,4800):" +
"UNICAST(timeout=600,1200,2400,4800):" +
"pbcast.STABLE(desired_avg_gossip=20000):" +
"FRAG(frag_size=8192;down_thread=false;up_thread=false):" +
"pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
"shun=false;print_local_addr=true):" +
"pbcast.STATE_TRANSFER";
}
protected int string2Mode(String mode) {
if(mode == null) return -1;
String m=mode.toLowerCase().trim();
if(m.equals("local"))
return LOCAL;
else
if(m.equals("repl_async") || m.equals("repl-async"))
return REPL_ASYNC;
else
if(m.equals("repl_sync") || m.equals("repl-sync"))
return REPL_SYNC;
else
return -1;
}
}