package org.jboss.cache.loader;
import org.jboss.cache.Fqn;
import org.jboss.cache.Node;
import org.jboss.cache.TreeCache;
import org.jboss.cache.lock.TimeoutException;
import org.jgroups.Address;
import org.jgroups.blocks.MethodCall;
import java.lang.reflect.Method;
import java.util.*;
public class RpcDelegatingCacheLoader extends DelegatingCacheLoader {
private int timeout;
private TreeCache cache;
private Address localAddress;
public static Method METHOD_GET_STATE;
public static Method METHOD_SET_STATE;
public static Method METHOD_GET_CHILDREN_NAMES;
public static Method METHOD_GET_WITH_2_PARAMS;
public static Method METHOD_GET_WITH_1_PARAM;
public static Method METHOD_EXISTS;
public static Method METHOD_PUT_WITH_3_PARAMS;
public static Method METHOD_PUT_WITH_2_PARAMS;
public static Method METHOD_REMOVE_WITH_2_PARAMS;
public static Method METHOD_REMOVE_WITH_1_PARAM;
public static Method METHOD_REMOVE_DATA;
static {
try {
METHOD_GET_STATE = TreeCache.class.getDeclaredMethod("getStateBytes", new Class[] {});
METHOD_SET_STATE = TreeCache.class.getDeclaredMethod("setStateBytes", new Class[] { byte[].class });
METHOD_GET_CHILDREN_NAMES = TreeCache.class.getDeclaredMethod("getChildrenNames", new Class[] { Fqn.class });
METHOD_GET_WITH_2_PARAMS = TreeCache.class.getDeclaredMethod("get", new Class[] { Fqn.class, Object.class });
METHOD_GET_WITH_1_PARAM = TreeCache.class.getDeclaredMethod("get", new Class[] { Fqn.class });
METHOD_EXISTS = TreeCache.class.getDeclaredMethod("exists", new Class[] { Fqn.class });
METHOD_PUT_WITH_3_PARAMS = TreeCache.class.getDeclaredMethod("put", new Class[] { Fqn.class, Object.class, Object.class });
METHOD_PUT_WITH_2_PARAMS = TreeCache.class.getDeclaredMethod("put", new Class[] { Fqn.class, Map.class });
METHOD_REMOVE_WITH_2_PARAMS = TreeCache.class.getDeclaredMethod("remove", new Class[] { Fqn.class, Object.class });
METHOD_REMOVE_WITH_1_PARAM = TreeCache.class.getDeclaredMethod("remove", new Class[] { Fqn.class });
METHOD_REMOVE_DATA = TreeCache.class.getDeclaredMethod("removeData", new Class[] { Fqn.class });
} catch (NoSuchMethodException ex) {
ex.printStackTrace();
throw new ExceptionInInitializerError(ex.toString());
}
}
public RpcDelegatingCacheLoader() {
}
public RpcDelegatingCacheLoader(TreeCache cache, int timeout) {
this.cache = cache;
this.timeout = timeout;
}
public void setConfig(Properties props) {
if(props == null) return;
String t = props.getProperty("timeout");
this.timeout = (t == null || t.length() == 0 ? 5000 : Integer.parseInt(t));
}
public void setCache(TreeCache cache) {
this.cache = cache;
}
protected Set delegateGetChildrenNames(Fqn name) throws Exception {
return (Set) this.doMethodCall( METHOD_GET_CHILDREN_NAMES, new Object[] { name } );
}
protected Object delegateGet(Fqn name, Object key) throws Exception {
return this.doMethodCall( METHOD_GET_WITH_2_PARAMS, new Object[] { name, key } );
}
protected Node delegateGet(Fqn name) throws Exception {
return (Node) this.doMethodCall( METHOD_GET_WITH_1_PARAM, new Object[] { name } );
}
protected boolean delegateExists(Fqn name) throws Exception {
Boolean exists = (Boolean) this.doMethodCall( METHOD_EXISTS, new Object[] { name } );
return ( exists != null ? exists.booleanValue() : false );
}
protected Object delegatePut(Fqn name, Object key, Object value) throws Exception {
return this.doMethodCall( METHOD_PUT_WITH_3_PARAMS, new Object[] { name, key, value } );
}
protected void delegatePut(Fqn name, Map attributes) throws Exception {
this.doMethodCall( METHOD_PUT_WITH_2_PARAMS, new Object[] { name, attributes } );
}
protected Object delegateRemove(Fqn name, Object key) throws Exception {
return this.doMethodCall( METHOD_REMOVE_WITH_2_PARAMS, new Object[] { name, key } );
}
protected void delegateRemove(Fqn name) throws Exception {
this.doMethodCall( METHOD_REMOVE_WITH_1_PARAM, new Object[] { name } );
}
protected void delegateRemoveData(Fqn name) throws Exception {
this.doMethodCall( METHOD_REMOVE_DATA, new Object[] { name } );
}
public byte[] delegateLoadEntireState() throws Exception {
return (byte[]) this.doMethodCall( METHOD_GET_STATE, new Object[0] );
}
public void delegateStoreEntireState(byte[] state) throws Exception {
this.doMethodCall( METHOD_SET_STATE, new Object[] { state } );
}
private Object doMethodCall( Method method, Object[] args ) throws Exception {
if( this.cache.isCoordinator() ) {
if( log.isTraceEnabled() ) {
log.trace( "Cannot delegate to the remote coordinator because the cache is itself the coordinator." );
}
return null;
}
if( this.localAddress == null ) {
this.localAddress = (Address) this.cache.getLocalAddress();
}
if( this.localAddress == null ) {
throw new Exception( "Cannot delegate to the remote coordinator because the cache has no local address." );
}
Address coordinator = cache.getCoordinator();
if( coordinator == null ) {
throw new Exception( "Cannot delegate to the remote coordinator because the cache has no coordinator." );
}
Vector members = new Vector();
members.add( coordinator );
MethodCall methodCall = new MethodCall( method, args );
boolean synchronous = true;
boolean excludeSelf = true;
List responses = cache.callRemoteMethods( members, methodCall, synchronous, excludeSelf, this.timeout );
if( responses == null ) {
throw new Exception( "Remote method call [" + cache.getLocalAddress() + "]->[" + coordinator + "]." + methodCall.getMethod().getName() + "() was discarded!" );
}
Object response = responses.get( 0 );
if( response instanceof TimeoutException ) {
throw new Exception( "Remote method call [" + cache.getLocalAddress() + "]->[" + coordinator + "]." + methodCall.getMethod().getName() + "() timed out: " + response );
}
else if( response instanceof Throwable ) {
throw new Exception( "Remote method call [" + cache.getLocalAddress() + "]->[" + coordinator + "]." + methodCall.getMethod().getName() + "() failed!", (Throwable) response );
}
return response;
}
}