package org.jboss.ha.framework.server;
import java.util.Vector;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.io.Serializable;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import EDU.oswego.cs.dl.util.concurrent.Latch;
import org.jboss.logging.Logger;
import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
import org.jboss.ha.framework.interfaces.HAPartition;
public class DistributedReplicantManagerImpl
implements DistributedReplicantManagerImplMBean,
HAPartition.HAMembershipExtendedListener,
HAPartition.HAPartitionStateTransfer
{
protected final static String SERVICE_NAME = "DistributedReplicantManager";
protected static int threadID;
protected HashMap localReplicants = new HashMap();
protected HashMap replicants = new HashMap();
protected HashMap keyListeners = new HashMap();
protected HashMap intraviewIdCache = new HashMap();
protected HAPartition partition;
protected Logger log;
protected MBeanServer mbeanserver;
protected ObjectName jmxName;
protected String nodeName = null;
protected Latch partitionNameKnown = new Latch ();
protected boolean trace;
protected Class[] add_types=new Class[]{String.class, String.class, Serializable.class};
protected Class[] remove_types=new Class[]{String.class, String.class};
public DistributedReplicantManagerImpl(HAPartition partition, MBeanServer server)
{
this.partition = partition;
this.mbeanserver = server;
this.log = Logger.getLogger(DistributedReplicantManagerImpl.class.getName() + "." + partition.getPartitionName());
this.trace = log.isTraceEnabled();
}
public void init() throws Exception
{
log.debug("registerRPCHandler");
partition.registerRPCHandler(SERVICE_NAME, this);
log.debug("subscribeToStateTransferEvents");
partition.subscribeToStateTransferEvents(SERVICE_NAME, this);
log.debug("registerMembershipListener");
partition.registerMembershipListener(this);
String name = "jboss:service=" + SERVICE_NAME +
",partitionName=" + this.partition.getPartitionName();
this.jmxName = new javax.management.ObjectName(name);
this.mbeanserver.registerMBean(this, jmxName);
}
public void start() throws Exception
{
this.nodeName = this.partition.getNodeName ();
partitionNameKnown.release ();
}
public void stop() throws Exception
{
if (localReplicants != null)
{
synchronized(localReplicants)
{
while (! localReplicants.isEmpty ())
{
this.remove ((String)localReplicants.keySet().iterator().next ());
}
}
}
this.mbeanserver.unregisterMBean (this.jmxName);
}
public String listContent () throws Exception
{
java.util.Collection services = this.getAllServices ();
StringBuffer result = new StringBuffer ();
java.util.Iterator catsIter = services.iterator ();
result.append ("<pre>");
while (catsIter.hasNext ())
{
String category = (String)catsIter.next ();
HashMap content = (HashMap)this.replicants.get (category);
if (content == null)
content = new HashMap ();
java.util.Iterator keysIter = content.keySet ().iterator ();
result.append ("-----------------------------------------------\n");
result.append ("Service : ").append (category).append ("\n\n");
Serializable local = lookupLocalReplicant(category);
if (local == null)
result.append ("\t- Service is *not* available locally\n");
else
result.append ("\t- Service *is* also available locally\n");
while (keysIter.hasNext ())
{
String location = (String)keysIter.next ();
result.append ("\t- ").append(location).append ("\n");
}
result.append ("\n");
}
result.append ("</pre>");
return result.toString ();
}
public String listXmlContent () throws Exception
{
java.util.Collection services = this.getAllServices ();
StringBuffer result = new StringBuffer ();
result.append ("<ReplicantManager>\n");
java.util.Iterator catsIter = services.iterator ();
while (catsIter.hasNext ())
{
String category = (String)catsIter.next ();
HashMap content = (HashMap)this.replicants.get (category);
if (content == null)
content = new HashMap ();
java.util.Iterator keysIter = content.keySet ().iterator ();
result.append ("\t<Service>\n");
result.append ("\t\t<ServiceName>").append (category).append ("</ServiceName>\n");
Serializable local = lookupLocalReplicant(category);
if (local != null)
{
result.append ("\t\t<Location>\n");
result.append ("\t\t\t<Name local=\"True\">").append (this.nodeName).append ("</Name>\n");
result.append ("\t\t</Location>\n");
}
while (keysIter.hasNext ())
{
String location = (String)keysIter.next ();
result.append ("\t\t<Location>\n");
result.append ("\t\t\t<Name local=\"False\">").append (location).append ("</Name>\n");
result.append ("\t\t</Location>\n");
}
result.append ("\t<Service>\n");
}
result.append ("<ReplicantManager>\n");
return result.toString ();
}
public Serializable getCurrentState ()
{
java.util.Collection services = this.getAllServices ();
HashMap result = new HashMap ();
java.util.Iterator catsIter = services.iterator ();
while (catsIter.hasNext ())
{
String category = (String)catsIter.next ();
HashMap content = (HashMap)this.replicants.get (category);
if (content == null)
content = new HashMap ();
else
content = (HashMap)content.clone ();
Serializable local = lookupLocalReplicant(category);
if (local != null)
content.put (this.nodeName, local);
result.put (category, content);
}
Object[] globalResult = new Object[] {result, intraviewIdCache};
return globalResult;
}
public void setCurrentState(Serializable newState)
{
Object[] globalState = (Object[])newState;
this.replicants = (HashMap)globalState[0];
this.intraviewIdCache = (HashMap)globalState[1];
if( log.isDebugEnabled() )
{
log.debug(nodeName + ": received new state, will republish local replicants");
}
MembersPublisher publisher = new MembersPublisher();
publisher.start();
}
public Collection getAllServices ()
{
HashSet services = new HashSet();
services.addAll (localReplicants.keySet ());
services.addAll (replicants.keySet ());
return services;
}
public void membershipChangedDuringMerge(Vector deadMembers, Vector newMembers, Vector allMembers, Vector originatingGroups)
{
log.info("Merging partitions...");
log.info("Dead members: " + deadMembers.size());
log.info("Originating groups: " + originatingGroups);
purgeDeadMembers(deadMembers);
if (newMembers.size() > 0)
{
new MergeMembers().start();
}
}
public void membershipChanged(Vector deadMembers, Vector newMembers, Vector allMembers)
{
if( log.isDebugEnabled() )
{
log.info("I am (" + nodeName + ") received membershipChanged event:");
log.info("Dead members: " + deadMembers.size() + " (" + deadMembers + ")");
log.info("New Members : " + newMembers.size() + " (" + newMembers + ")");
log.info("All Members : " + allMembers.size() + " (" + allMembers + ")");
}
purgeDeadMembers(deadMembers);
}
public void add(String key, Serializable replicant) throws Exception
{
if( trace )
log.trace("add, key="+key+", value="+replicant);
partitionNameKnown.acquire ();
Object[] args = {key, this.nodeName, replicant};
partition.callMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
synchronized(localReplicants)
{
localReplicants.put(key, replicant);
notifyKeyListeners(key, lookupReplicants(key));
}
}
public void remove(String key) throws Exception
{
partitionNameKnown.acquire ();
if (localReplicants.containsKey (key))
{
Object[] args = {key, this.nodeName};
partition.callAsynchMethodOnCluster(SERVICE_NAME, "_remove", args, remove_types, true);
synchronized(localReplicants)
{
localReplicants.remove(key);
List result = lookupReplicants(key);
if (result == null)
result = new ArrayList (); notifyKeyListeners(key, result);
}
}
}
public Serializable lookupLocalReplicant(String key)
{
synchronized(localReplicants)
{
return (Serializable)localReplicants.get(key);
}
}
public List lookupReplicants(String key)
{
Serializable local = lookupLocalReplicant(key);
synchronized(replicants)
{
HashMap replicant = (HashMap)replicants.get(key);
if (replicant == null && local == null) return null;
ArrayList rtn = new ArrayList();
if (local != null) rtn.add(local);
if (replicant != null) rtn.addAll(replicant.values());
return rtn;
}
}
public List lookupReplicantsNodeNames(String key)
{
boolean locallyReplicated = localReplicants.containsKey (key);
synchronized(replicants)
{
HashMap replicant = (HashMap)replicants.get(key);
if (replicant == null && !locallyReplicated) return null;
ArrayList rtn = new ArrayList();
if (locallyReplicated) rtn.add(this.nodeName);
if (replicant != null) rtn.addAll(replicant.keySet ());
return rtn;
}
}
public void registerListener(String key, DistributedReplicantManager.ReplicantListener subscriber)
{
synchronized(keyListeners)
{
ArrayList listeners = (ArrayList)keyListeners.get(key);
if (listeners == null)
{
listeners = new ArrayList();
keyListeners.put(key, listeners);
}
listeners.add(subscriber);
}
}
public void unregisterListener(String key, DistributedReplicantManager.ReplicantListener subscriber)
{
synchronized(keyListeners)
{
ArrayList listeners = (ArrayList)keyListeners.get (key);
if (listeners == null) return;
listeners.remove(subscriber);
if (listeners.size() == 0)
keyListeners.remove(key);
}
}
public int getReplicantsViewId(String key)
{
Integer result = (Integer)this.intraviewIdCache.get (key);
if (result == null)
return 0;
else
return result.intValue ();
}
public boolean isMasterReplica (String key)
{
if( trace )
log.trace("isMasterReplica, key="+key);
if (!localReplicants.containsKey (key))
{
if( trace )
log.trace("no localReplicants, key="+key+", isMasterReplica=false");
return false;
}
Vector allNodes = this.partition.getCurrentView ();
HashMap repForKey = (HashMap)replicants.get(key);
if (repForKey==null)
{
if( trace )
log.trace("no replicants, key="+key+", isMasterReplica=true");
return true;
}
Vector replicaNodes = new Vector ((repForKey).keySet ());
boolean isMasterReplica = false;
for (int i=0; i<allNodes.size (); i++)
{
String aMember = (String)allNodes.elementAt (i);
if( trace )
log.trace("Testing member: "+aMember);
if (replicaNodes.contains (aMember))
{
if( trace )
log.trace("Member found in replicaNodes, isMasterReplica=false");
break;
}
else if (aMember.equals (this.nodeName))
{
if( trace )
log.trace("Member == nodeName, isMasterReplica=true");
isMasterReplica = true;
break;
}
}
return isMasterReplica;
}
public void _add(String key, String nodeName, Serializable replicant)
{
if (log.isDebugEnabled()) {
log.debug("_add(" + key + ", " + nodeName);
}
try
{
addReplicant(key, nodeName, replicant);
notifyKeyListeners(key, lookupReplicants(key));
}
catch (Exception ex)
{
log.error("_add failed", ex);
}
}
public void _remove(String key, String nodeName)
{
try
{
if (removeReplicant (key, nodeName))
notifyKeyListeners(key, lookupReplicants(key));
}
catch (Exception ex)
{
log.error("_remove failed", ex);
}
}
protected boolean removeReplicant (String key, String nodeName) throws Exception
{
synchronized(replicants)
{
HashMap replicant = (HashMap)replicants.get(key);
if (replicant == null) return false;
Object removed = replicant.remove(nodeName);
if (removed != null)
{
Collection values = replicant.values();
if (values.size() == 0)
{
replicants.remove(key);
}
return true;
}
}
return false;
}
public Object[] lookupLocalReplicants() throws Exception
{
partitionNameKnown.acquire ();
Object[] rtn = {this.nodeName, localReplicants};
if (log.isDebugEnabled())
log.debug ("lookupLocalReplicants called ("+ rtn[0] + "). Return: " + localReplicants.size ());
return rtn;
}
protected int calculateReplicantsHash (List members)
{
int result = 0;
Object obj = null;
for (int i=0; i<members.size (); i++)
{
obj = members.get (i);
if (obj != null)
result+= obj.hashCode (); }
return result;
}
protected int updateReplicantsHashId (String key)
{
List nodes = this.lookupReplicantsNodeNames (key);
int result = 0;
if ( (nodes == null) || (nodes.size () == 0) )
{
this.intraviewIdCache.remove (key);
}
else
{
result = this.calculateReplicantsHash (nodes);
this.intraviewIdCache.put (key, new Integer (result));
}
return result;
}
protected void addReplicant(String key, String nodeName, Serializable replicant)
{
addReplicant(replicants, key, nodeName, replicant);
}
protected void addReplicant(HashMap map, String key, String nodeName, Serializable replicant)
{
synchronized(map)
{
HashMap rep = (HashMap)map.get(key);
if (rep == null)
{
log.debug("_adding new HashMap");
rep = new HashMap();
map.put(key, rep);
}
rep.put(nodeName, replicant);
}
}
protected Vector getKeysReplicatedByNode (String nodeName)
{
Vector result = new Vector ();
synchronized (replicants)
{
Iterator keysIter = replicants.keySet ().iterator ();
while (keysIter.hasNext ())
{
String key = (String)keysIter.next ();
HashMap values = (HashMap)replicants.get (key);
if ( (values != null) && values.containsKey (nodeName) )
{
result.add (key);
}
}
}
return result;
}
protected boolean replicantEntryAlreadyExists (String key, String nodeName)
{
return replicantEntryAlreadyExists (replicants, key, nodeName);
}
protected boolean replicantEntryAlreadyExists (HashMap map, String key, String nodeName)
{
HashMap rep = (HashMap)map.get(key);
if (rep == null)
return false;
else
return rep.containsKey (nodeName);
}
protected void notifyKeyListeners(String key, List newReplicants)
{
log.debug("notifyKeyListeners");
synchronized(keyListeners)
{
int newId = updateReplicantsHashId (key);
ArrayList listeners = (ArrayList)keyListeners.get(key);
if (listeners == null)
{
log.debug("listeners is null");
return;
}
if (log.isDebugEnabled())
log.debug("notifying " + listeners.size() + " listeners for key change: " + key);
for (int i = 0; i < listeners.size(); i++)
{
DistributedReplicantManager.ReplicantListener listener = (DistributedReplicantManager.ReplicantListener)listeners.get(i);
listener.replicantsChanged(key, newReplicants, newId);
}
}
}
protected void republishLocalReplicants()
{
try
{
log.debug("Start Re-Publish local replicants in DRM");
HashMap localReplicants;
synchronized (this.localReplicants)
{
localReplicants = new HashMap(this.localReplicants);
}
Iterator entries = localReplicants.entrySet().iterator();
while( entries.hasNext() )
{
Map.Entry entry = (Map.Entry) entries.next();
String key = (String) entry.getKey();
Object replicant = entry.getValue();
if (log.isDebugEnabled())
log.debug("publishing, key=" + key + ", value=" + replicant);
Object[] args = {key, this.nodeName, replicant};
partition.callAsynchMethodOnCluster(SERVICE_NAME, "_add", args, add_types, true);
notifyKeyListeners(key, lookupReplicants(key));
}
log.debug("End Re-Publish local replicants");
}
catch (Exception e)
{
log.error("Re-Publish failed", e);
}
}
protected void mergeMembers()
{
try
{
log.debug("Start merging members in DRM service...");
java.util.HashSet notifies = new java.util.HashSet ();
ArrayList rsp = partition.callMethodOnCluster(SERVICE_NAME,
"lookupLocalReplicants",
new Object[]{}, new Class[]{}, true);
if (rsp.size() == 0)
log.debug("No responses from other nodes during the DRM merge process.");
else
{
log.debug("The DRM merge process has received " + rsp.size() + " answers");
}
for (int i = 0; i < rsp.size(); i++)
{
Object[] objs = (Object[])rsp.get(i);
if (objs == null)
{
log.warn("As part of the answers received during the DRM merge process, a NULL message was received!");
continue;
}
String node = (String)objs[0];
HashMap replicants = (HashMap)objs[1];
Iterator keys = replicants.keySet().iterator();
while (keys.hasNext())
{
String key = (String)keys.next();
if (!replicantEntryAlreadyExists (key, node))
{
addReplicant(key, node, (Serializable)replicants.get(key));
notifies.add (key);
}
}
Vector currentStatus = getKeysReplicatedByNode (node);
if (currentStatus.size () > replicants.size ())
{
for (int currentKeysId=0, currentKeysMax=currentStatus.size (); currentKeysId<currentKeysMax; currentKeysId++)
{
String theKey = (String)currentStatus.elementAt (currentKeysId);
if (!replicants.containsKey (theKey))
{
removeReplicant (theKey, node);
notifies.add(theKey);
}
}
}
}
Iterator notifIter = notifies.iterator ();
while (notifIter.hasNext ())
{
String key = (String)notifIter.next ();
notifyKeyListeners(key, lookupReplicants(key));
}
log.debug ("..Finished merging members in DRM service");
}
catch (Exception ex)
{
log.error("merge failed", ex);
}
}
protected void purgeDeadMembers(Vector deadMembers)
{
if (deadMembers.size() <= 0) return;
try
{
synchronized(replicants)
{
Iterator keys = replicants.keySet().iterator();
while (keys.hasNext())
{
String key = (String)keys.next();
HashMap replicant = (HashMap)replicants.get(key);
boolean modified = false;
for (int i = 0; i < deadMembers.size(); i++)
{
String node = deadMembers.elementAt(i).toString();
log.debug("trying to remove deadMember " + node + " for key " + key);
Object removed = replicant.remove(node);
if (removed != null)
{
log.debug(node + " was removed");
modified = true;
}
else
{
log.debug(node + " was NOT removed!!!");
}
}
if (modified)
{
notifyKeyListeners(key, lookupReplicants(key));
}
}
}
}
catch (Exception ex)
{
log.error("membershipChanged failed", ex);
}
}
protected void cleanupKeyListeners()
{
}
protected synchronized static int nextThreadID()
{
return threadID ++;
}
protected class MergeMembers extends Thread
{
public MergeMembers()
{
super("DRM Async Merger#"+nextThreadID());
}
public void run()
{
log.debug("Sleeping for 50ms second just in case");
try
{
Thread.sleep(50);
}
catch (Exception ignored)
{
}
mergeMembers();
}
}
protected class MembersPublisher extends Thread
{
public MembersPublisher()
{
super("DRM Async Publisher#"+nextThreadID());
}
public void run()
{
log.debug("DRM: Sleeping before re-publishing for 50ms just in case");
try
{
Thread.sleep(50);
}
catch (Exception ignored)
{
}
republishLocalReplicants();
}
}
}