package org.jboss.ha.framework.server;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Vector;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.Name;
import javax.naming.NameNotFoundException;
import javax.naming.Reference;
import javax.naming.StringRefAddr;
import javax.management.MBeanServer;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import org.jgroups.JChannel;
import org.jgroups.MergeView;
import org.jgroups.View;
import org.jgroups.Message;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.MethodCall;
import org.jgroups.stack.IpAddress;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
import org.jgroups.util.Util;
import org.jboss.invocation.MarshalledValueInputStream;
import org.jboss.invocation.MarshalledValueOutputStream;
import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
import org.jboss.ha.framework.interfaces.DistributedState;
import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.ha.framework.interfaces.HAPartition.HAPartitionStateTransfer;
import org.jboss.ha.framework.interfaces.HAPartition.HAMembershipListener;
import org.jboss.ha.framework.interfaces.ClusterNode;
import org.jboss.naming.NonSerializableFactory;
import org.jboss.logging.Logger;
public class HAPartitionImpl
extends org.jgroups.blocks.RpcDispatcher
implements org.jgroups.MessageListener, org.jgroups.MembershipListener,
HAPartition
{
private static class NoHandlerForRPC implements Serializable
{
static final long serialVersionUID = -1263095408483622838L;
}
protected HashMap rpcHandlers = new HashMap();
protected HashMap stateHandlers = new HashMap();
protected ArrayList listeners = new ArrayList();
protected ArrayList asynchListeners = new ArrayList();
protected LinkedQueue asynchViewChanges = new LinkedQueue();
protected Thread asynchNotifyThread;
protected Vector members = null;
protected Vector jgmembers = null;
public Vector history = null;
protected Vector otherMembers = null;
protected Vector jgotherMembers = null;
protected String partitionName;
protected org.jgroups.stack.IpAddress localJGAddress = null;
protected String nodeName;
protected ClusterNode me = null;
protected long timeout = 60000;
protected JChannel channel;
protected DistributedReplicantManagerImpl replicantManager;
protected DistributedStateImpl dsManager;
protected Logger log;
protected Logger clusterLifeCycleLog;
protected long currentViewId = -1;
protected MBeanServer server;
protected long state_transfer_timeout=60000;
public static Object objectFromByteBuffer (byte[] buffer) throws Exception
{
if(buffer == null)
return null;
ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
MarshalledValueInputStream mvis = new MarshalledValueInputStream(bais);
return mvis.readObject();
}
public static byte[] objectToByteBuffer (Object obj) throws Exception
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
MarshalledValueOutputStream mvos = new MarshalledValueOutputStream(baos);
mvos.writeObject(obj);
mvos.flush();
return baos.toByteArray();
}
public long getStateTransferTimeout() {
return state_transfer_timeout;
}
public void setStateTransferTimeout(long state_transfer_timeout) {
this.state_transfer_timeout=state_transfer_timeout;
}
public long getMethodCallTimeout() {
return timeout;
}
public void setMethodCallTimeout(long timeout) {
this.timeout=timeout;
}
public HAPartitionImpl(String partitionName, org.jgroups.JChannel channel, boolean deadlock_detection, MBeanServer server) throws Exception
{
this(partitionName, channel, deadlock_detection);
this.server = server;
}
public HAPartitionImpl(String partitionName, org.jgroups.JChannel channel, boolean deadlock_detection) throws Exception
{
super(channel, null, null, new Object(), deadlock_detection); this.log = Logger.getLogger(HAPartition.class.getName() + "." + partitionName);
this.clusterLifeCycleLog = Logger.getLogger(HAPartition.class.getName() + ".lifecycle." + partitionName);
this.channel = channel;
this.partitionName = partitionName;
this.history = new Vector();
logHistory ("Partition object created");
}
public void init() throws Exception
{
log.info("Initializing");
logHistory ("Initializing partition");
log.debug("setMembershipListener");
setMembershipListener(this);
log.debug("setMessageListener");
setMessageListener(this);
log.debug("create replicant manager");
this.replicantManager = new DistributedReplicantManagerImpl(this, this.server);
log.debug("init replicant manager");
this.replicantManager.init();
log.debug("bind replicant manager");
log.debug("create distributed state");
this.dsManager = new DistributedStateImpl(this, this.server);
log.debug("init distributed state service");
this.dsManager.init();
log.debug("bind distributed state service");
Context ctx = new InitialContext();
this.bind("/HAPartition/" + partitionName, this, HAPartitionImpl.class, ctx);
log.debug("done initing.");
}
public void startPartition() throws Exception
{
logHistory ("Starting partition");
log.debug("get nodeName");
this.localJGAddress = (IpAddress)channel.getLocalAddress();
this.me = new ClusterNode(this.localJGAddress);
this.nodeName = this.me.getName();
log.debug("Get current members");
View view = channel.getView();
this.jgmembers = (Vector)view.getMembers().clone();
this.members = translateAddresses(this.jgmembers); log.info("Number of cluster members: " + members.size());
for(int m = 0; m > members.size(); m ++)
{
Object node = members.get(m);
log.debug(node);
}
this.jgotherMembers = (Vector)view.getMembers().clone();
this.jgotherMembers.remove (channel.getLocalAddress());
this.otherMembers = translateAddresses(this.jgotherMembers); log.info ("Other members: " + this.otherMembers.size ());
verifyNodeIsUnique (view.getMembers());
this.currentViewId = view.getVid().getId();
log.info("Fetching state (will wait for " + this.state_transfer_timeout + " milliseconds):");
boolean rc = channel.getState(null, this.state_transfer_timeout);
if (rc)
log.debug("State was retrieved successfully");
else
log.debug("State could not be retrieved, (must be first member of group)");
this.replicantManager.start();
this.dsManager.start();
AsynchViewChangeHandler asynchHandler = new AsynchViewChangeHandler();
asynchNotifyThread = new Thread(asynchHandler, "AsynchHAMembershipListener Thread");
asynchNotifyThread.start();
}
public void closePartition() throws Exception
{
logHistory ("Closing partition");
log.info("Closing partition " + partitionName);
try
{
asynchNotifyThread.interrupt();
}
catch( Exception e)
{
log.warn("Failed to interrupte asynchNotifyThread", e);
}
try
{
this.replicantManager.stop();
}
catch (Exception e)
{
log.error("operation failed", e);
}
try
{
this.dsManager.stop();
}
catch (Exception e)
{
log.error("operation failed", e);
}
try
{
channel.close();
}
catch (Exception e)
{
log.error("operation failed", e);
}
String boundName = "/HAPartition/" + partitionName;
InitialContext ctx = new InitialContext();
try
{
ctx.unbind(boundName);
}
finally
{
ctx.close();
}
NonSerializableFactory.unbind (boundName);
log.info("Partition " + partitionName + " closed.");
}
public byte[] getState()
{
logHistory ("getState called on partition");
boolean debug = log.isDebugEnabled();
log.debug("getState called.");
try
{
HashMap state = new HashMap();
Iterator keys = stateHandlers.keySet().iterator();
while (keys.hasNext())
{
String key = (String)keys.next();
HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
if (debug)
log.debug("getState for " + key);
state.put(key, subscriber.getCurrentState());
}
return objectToByteBuffer(state);
}
catch (Exception ex)
{
log.error("getState failed", ex);
}
return null;
}
public void setState(byte[] obj)
{
logHistory ("setState called on partition");
try
{
log.debug("setState called");
if (obj == null)
{
log.debug("state is null");
return;
}
long used_mem_before, used_mem_after;
int state_size=obj != null? obj.length : 0;
Runtime rt=Runtime.getRuntime();
used_mem_before=rt.totalMemory() - rt.freeMemory();
HashMap state = (HashMap)objectFromByteBuffer(obj);
java.util.Iterator keys = state.keySet().iterator();
while (keys.hasNext())
{
String key = (String)keys.next();
log.debug("setState for " + key);
Object someState = state.get(key);
HAPartition.HAPartitionStateTransfer subscriber = (HAPartition.HAPartitionStateTransfer)stateHandlers.get(key);
if (subscriber != null)
{
subscriber.setCurrentState((java.io.Serializable)someState);
}
else
{
log.debug("There is no stateHandler for: " + key);
}
}
used_mem_after=rt.totalMemory() - rt.freeMemory();
log.debug("received a state of " + state_size + " bytes; expanded memory by " +
(used_mem_after - used_mem_before) + " bytes (used memory before: " + used_mem_before +
", used memory after: " + used_mem_after + ")");
}
catch (Exception ex)
{
log.error("setState failed", ex);
}
}
public void receive(org.jgroups.Message msg)
{ }
public void suspect(org.jgroups.Address suspected_mbr)
{
logHistory ("Node suspected: " + (suspected_mbr==null?"null":suspected_mbr.toString()));
if (isCurrentNodeCoordinator ())
clusterLifeCycleLog.info ("Suspected member: " + suspected_mbr);
else
log.info("Suspected member: " + suspected_mbr);
}
public void block() {}
public void viewAccepted(View newView)
{
try
{
this.currentViewId = newView.getVid().getId();
this.jgotherMembers = (Vector)newView.getMembers().clone();
this.jgotherMembers.remove (channel.getLocalAddress());
this.otherMembers = translateAddresses (this.jgotherMembers); Vector translatedNewView = translateAddresses ((Vector)newView.getMembers().clone());
logHistory ("New view: " + translatedNewView + " with viewId: " + this.currentViewId +
" (old view: " + this.members + " )");
Vector oldMembers = this.members;
Vector newjgMembers = (Vector)newView.getMembers().clone();
Vector newMembers = translateAddresses(newjgMembers); if (this.members == null)
{
this.members = newMembers;
this.jgmembers = newjgMembers;
log.debug("ViewAccepted: initial members set");
return;
}
this.members = newMembers;
this.jgmembers = newjgMembers;
int difference = 0;
if (oldMembers == null)
difference = newMembers.size () - 1;
else
difference = newMembers.size () - oldMembers.size ();
if (isCurrentNodeCoordinator ())
clusterLifeCycleLog.info ("New cluster view for partition " + this.partitionName + " (id: " +
this.currentViewId + ", delta: " + difference + ") : " + this.members);
else
log.info("New cluster view for partition " + this.partitionName + ": " +
this.currentViewId + " (" + this.members + " delta: " + difference + ")");
ViewChangeEvent event = new ViewChangeEvent();
event.viewId = currentViewId;
event.allMembers = translatedNewView;
event.deadMembers = getDeadMembers(oldMembers, event.allMembers);
event.newMembers = getNewMembers(oldMembers, event.allMembers);
event.originatingGroups = null;
if(newView instanceof MergeView)
{
MergeView mergeView = (MergeView) newView;
event.originatingGroups = mergeView.getSubgroups();
}
log.debug("membership changed from " + this.members.size() + " to "
+ event.allMembers.size());
this.asynchViewChanges.put(event);
this.notifyListeners(listeners, event.viewId, event.allMembers,
event.deadMembers, event.newMembers, event.originatingGroups);
}
catch (Exception ex)
{
log.error("ViewAccepted failed", ex);
}
}
public String getNodeName()
{
return nodeName;
}
public String getPartitionName()
{
return partitionName;
}
public DistributedReplicantManager getDistributedReplicantManager()
{
return replicantManager;
}
public DistributedState getDistributedStateService()
{
return this.dsManager;
}
public long getCurrentViewId()
{
return this.currentViewId;
}
public Vector getCurrentView()
{
Vector result = new Vector (this.members.size());
for (int i = 0; i < members.size(); i++)
{
result.add( ((ClusterNode) members.elementAt(i)).getName() );
}
return result;
}
public ClusterNode[] getClusterNodes ()
{
ClusterNode[] nodes = new ClusterNode[this.members.size()];
this.members.toArray(nodes);
return nodes;
}
public boolean isCurrentNodeCoordinator ()
{
if(this.members == null || this.members.size() == 0 || this.me == null)
return false;
return this.members.elementAt (0).equals (this.me);
}
public void registerRPCHandler(String objName, Object subscriber)
{
rpcHandlers.put(objName, subscriber);
}
public void unregisterRPCHandler(String objName, Object subscriber)
{
rpcHandlers.remove(objName);
}
public ArrayList callMethodOnCluster(String objName, String methodName,
Object[] args, boolean excludeSelf) throws Exception
{
return callMethodOnCluster(objName, methodName, args, null, excludeSelf);
}
public ArrayList callMethodOnCluster(String objName, String methodName,
Object[] args, Class[] types, boolean excludeSelf) throws Exception
{
return callMethodOnCluster(objName, methodName, args, types, excludeSelf, this.timeout);
}
public ArrayList callMethodOnCluster(String objName, String methodName,
Object[] args, Class[] types, boolean excludeSelf, long methodTimeout) throws Exception
{
ArrayList rtn = new ArrayList();
MethodCall m=null;
RspList rsp = null;
boolean trace = log.isTraceEnabled();
if(types != null)
m=new MethodCall(objName + "." + methodName, args, types);
else
m=new MethodCall(objName + "." + methodName, args);
if (excludeSelf)
{
if( trace )
{
log.trace("callMethodOnCluster(true), objName="+objName
+", methodName="+methodName+", members="+jgotherMembers);
}
rsp = this.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_ALL, methodTimeout);
}
else
{
if( trace )
{
log.trace("callMethodOnCluster(false), objName="+objName
+", methodName="+methodName+", members="+members);
}
rsp = this.callRemoteMethods(null, m, GroupRequest.GET_ALL, methodTimeout);
}
if (rsp != null)
{
for (int i = 0; i < rsp.size(); i++)
{
Object item = rsp.elementAt(i);
if (item instanceof Rsp)
{
Rsp response = (Rsp) item;
boolean wasReceived = response.wasReceived();
if( wasReceived == true )
{
item = response.getValue();
if (!(item instanceof NoHandlerForRPC))
rtn.add(item);
}
else if( trace )
log.trace("Ignoring non-received response: "+response);
}
else
{
if (!(item instanceof NoHandlerForRPC))
rtn.add(item);
else if( trace )
log.trace("Ignoring NoHandlerForRPC");
}
}
}
return rtn;
}
public void callAsynchMethodOnCluster(String objName, String methodName,
Object[] args, boolean excludeSelf) throws Exception {
callAsynchMethodOnCluster(objName, methodName, args, null, excludeSelf);
}
public void callAsynchMethodOnCluster(String objName, String methodName,
Object[] args, Class[] types, boolean excludeSelf) throws Exception
{
MethodCall m = null;
boolean trace = log.isTraceEnabled();
if(types != null)
m=new MethodCall(objName + "." + methodName, args, types);
else
m=new MethodCall(objName + "." + methodName, args);
if (excludeSelf)
{
if( trace )
{
log.trace("callAsynchMethodOnCluster(true), objName="+objName
+", methodName="+methodName+", members="+jgotherMembers);
}
this.callRemoteMethods(this.jgotherMembers, m, GroupRequest.GET_NONE, timeout);
}
else
{
if( trace )
{
log.trace("callAsynchMethodOnCluster(false), objName="+objName
+", methodName="+methodName+", members="+members);
}
this.callRemoteMethods(null, m, GroupRequest.GET_NONE, timeout);
}
}
public void subscribeToStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
{
stateHandlers.put(objectName, subscriber);
}
public void unsubscribeFromStateTransferEvents(String objectName, HAPartitionStateTransfer subscriber)
{
stateHandlers.remove(objectName);
}
public void registerMembershipListener(HAMembershipListener listener)
{
synchronized(this.listeners)
{
this.listeners.add(listener);
}
}
public void unregisterMembershipListener(HAMembershipListener listener)
{
synchronized(this.listeners)
{
this.listeners.remove(listener);
}
}
public Object handle(Message req)
{
Object body = null;
Object retval = null;
MethodCall method_call = null;
boolean trace = log.isTraceEnabled();
if( trace )
log.trace("Partition " + partitionName + " received msg");
if(req == null || req.getBuffer() == null)
{
log.warn("RpcProtocol.Handle(): message or message buffer is null !");
return null;
}
try
{
body = Util.objectFromByteBuffer(req.getBuffer());
}
catch(Exception e)
{
log.warn("RpcProtocol.Handle(): " + e);
return null;
}
if(body == null || !(body instanceof MethodCall))
{
log.warn("RpcProtocol.Handle(): message does not contain a MethodCall object !");
return null;
}
method_call = (MethodCall)body;
String methodName = method_call.getName();
if( trace )
log.trace("pre methodName: " + methodName);
int idx = methodName.lastIndexOf('.');
String handlerName = methodName.substring(0, idx);
String newMethodName = methodName.substring(idx + 1);
if( trace )
{
log.trace("handlerName: " + handlerName + " methodName: " + newMethodName);
log.trace("Handle: " + methodName);
}
method_call.setName(newMethodName);
Object handler = rpcHandlers.get(handlerName);
if (handler == null)
{
if( trace )
log.debug("No rpc handler registered under: "+handlerName);
return new NoHandlerForRPC();
}
try
{
retval = method_call.invoke(handler);
if( trace )
log.trace("rpc call return value: "+retval);
}
catch (Throwable t)
{
if( trace )
log.trace("rpc call threw exception", t);
retval = t;
}
return retval;
}
protected void verifyNodeIsUnique (Vector javaGroupIpAddresses) throws Exception
{
byte[] localUniqueName = this.localJGAddress.getAdditionalData();
if (localUniqueName == null)
log.warn("No additional information has been found in the JavaGroup address: " +
"make sure you are running with a correct version of JGroups and that the protocol " +
" you are using supports the 'additionalData' behaviour");
for (int i = 0; i < javaGroupIpAddresses.size(); i++)
{
IpAddress address = (IpAddress) javaGroupIpAddresses.elementAt(i);
if (!address.equals(this.localJGAddress))
{
if (localUniqueName.equals(address.getAdditionalData()))
throw new Exception ("Local node removed from cluster (" + this.localJGAddress + "): another node (" + address + ") publicizing the same name was already there");
}
}
}
protected void bind(String jndiName, Object who, Class classType, Context ctx) throws Exception
{
NonSerializableFactory.bind(jndiName, who);
Name n = ctx.getNameParser("").parse(jndiName);
while (n.size () > 1)
{
String ctxName = n.get (0);
try
{
ctx = (Context)ctx.lookup (ctxName);
}
catch (NameNotFoundException e)
{
log.debug ("creating Subcontext" + ctxName);
ctx = ctx.createSubcontext (ctxName);
}
n = n.getSuffix (1);
}
StringRefAddr addr = new StringRefAddr("nns", jndiName);
Reference ref = new Reference(classType.getName (), addr, NonSerializableFactory.class.getName (), null);
ctx.rebind (n.get (0), ref);
}
protected Vector getDeadMembers(Vector oldMembers, Vector newMembers)
{
boolean debug = log.isDebugEnabled();
if(oldMembers == null) oldMembers=new Vector();
if(newMembers == null) newMembers=new Vector();
Vector dead=(Vector)oldMembers.clone();
dead.removeAll(newMembers);
if(dead.size() > 0 && debug)
log.debug("dead members: " + dead);
return dead;
}
protected Vector getNewMembers(Vector oldMembers, Vector allMembers)
{
if(oldMembers == null) oldMembers=new Vector();
if(allMembers == null) allMembers=new Vector();
Vector newMembers=(Vector)allMembers.clone();
newMembers.removeAll(oldMembers);
return newMembers;
}
protected void notifyListeners(ArrayList theListeners, long viewID,
Vector allMembers, Vector deadMembers, Vector newMembers,
Vector originatingGroups)
{
log.debug("Begin notifyListeners, viewID: "+viewID);
synchronized(theListeners)
{
for (int i = 0; i < theListeners.size(); i++)
{
HAMembershipListener aListener = null;
try
{
aListener = (HAMembershipListener) theListeners.get(i);
if(originatingGroups != null && (aListener instanceof HAMembershipExtendedListener))
{
HAMembershipExtendedListener exListener = (HAMembershipExtendedListener) aListener;
exListener.membershipChangedDuringMerge (deadMembers, newMembers,
allMembers, originatingGroups);
}
else
{
aListener.membershipChanged(deadMembers, newMembers, allMembers);
}
}
catch (Throwable e)
{
log.warn("HAMembershipListener callback failure: "+aListener, e);
}
}
}
log.debug("End notifyListeners, viewID: "+viewID);
}
protected Vector translateAddresses (Vector jgAddresses)
{
if (jgAddresses == null)
return null;
Vector result = new Vector (jgAddresses.size());
for (int i = 0; i < jgAddresses.size(); i++)
{
IpAddress addr = (IpAddress) jgAddresses.elementAt(i);
result.add(new ClusterNode (addr));
}
return result;
}
public void logHistory (String message)
{
try
{
history.add(new SimpleDateFormat().format (new Date()) + " : " + message);
}
catch (Exception ignored){}
}
private static class ViewChangeEvent
{
long viewId;
Vector deadMembers;
Vector newMembers;
Vector allMembers;
Vector originatingGroups;
}
private class AsynchViewChangeHandler implements Runnable
{
public void run()
{
log.debug("Begin AsynchViewChangeHandler");
while( true )
{
try
{
ViewChangeEvent event = (ViewChangeEvent) asynchViewChanges.take();
notifyListeners(asynchListeners, event.viewId, event.allMembers,
event.deadMembers, event.newMembers, event.originatingGroups);
}
catch(InterruptedException e)
{
log.debug("AsynchViewChangeHandler interrupted", e);
break;
}
}
log.debug("End AsynchViewChangeHandler");
}
}
}