| AbstractDetector.java |
/***************************************
* *
* JBoss: The OpenSource J2EE WebOS *
* *
* Distributable under LGPL license. *
* See terms of license at gnu.org. *
* *
***************************************/
package org.jboss.remoting.detection;
import org.jboss.logging.Logger;
import org.jboss.remoting.InvocationRequest;
import org.jboss.remoting.InvokerLocator;
import org.jboss.remoting.InvokerRegistry;
import org.jboss.remoting.Subsystem;
import org.jboss.remoting.ident.Identity;
import org.jboss.remoting.network.NetworkRegistryFinder;
import org.jboss.remoting.transport.ClientInvoker;
import org.jboss.system.ConfigurationException;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
/**
* AbstractDetector
*
* @author <a href="mailto:jhaynie@vocalocity.net">Jeff Haynie</a>
* @author <a href="mailto:tom.elrod@jboss.com">Tom Elrod</a>
* @version $Revision: 1.15.8.2 $
*/
public abstract class AbstractDetector implements Detector, AbstractDetectorMBean
{
private long defaultTimeDelay = 5000;
private long heartbeatTimeDelay = 1000;
protected final Logger log = Logger.getLogger(getClass());
protected MBeanServer mbeanserver;
protected ObjectName objectName;
protected ObjectName registryObjectName;
private Identity myself;
private Timer heartbeatTimer;
private Timer failureTimer;
private Map servers = new HashMap();
private Element xml;
private Set domains = new HashSet();
public AbstractDetector()
{
}
/**
* The amount of time to wait between sending (and sometimes receiving) detection messages.
* @param heartbeatTimeDelay
* @throws ConfigurationException
*/
public void setHeartbeatTimeDelay(long heartbeatTimeDelay) throws ConfigurationException
{
if(heartbeatTimeDelay > 0 && heartbeatTimeDelay < defaultTimeDelay)
{
this.heartbeatTimeDelay = heartbeatTimeDelay;
}
else
{
throw new ConfigurationException("Can not set heartbeat time delay (" + heartbeatTimeDelay + ") to a negative number or " +
"to a number greater than the default time delay (" + defaultTimeDelay + ").");
}
}
/**
* The amount of time to wait between sending (and sometimes receiving) detection messages.
* @return
*/
public long getHeartbeatTimeDelay()
{
return heartbeatTimeDelay;
}
/**
* The amount of time which can elapse without receiving a detection event before a server
* will be suspected as being dead and peroforming an explicit invocation on it to verify it is alive.
* @param defaultTimeDelay time in milliseconds
* @throws ConfigurationException
*/
public void setDefaultTimeDelay(long defaultTimeDelay) throws ConfigurationException
{
if(defaultTimeDelay >= heartbeatTimeDelay)
{
this.defaultTimeDelay = defaultTimeDelay;
}
else
{
throw new ConfigurationException("Can not set the default time delay (" + defaultTimeDelay + ") to be less" +
" than that of the heartbeat time delay (" + heartbeatTimeDelay + ").");
}
}
/**
*
* @return The amount of time which can elapse without receiving a detection event before a server
* will be suspected as being dead and peroforming an explicit invocation on it to verify it is alive.
*/
public long getDefaultTimeDelay()
{
return defaultTimeDelay;
}
/**
* called by MBeanServer to start the mbean lifecycle
*
* @throws Exception
*/
public void start() throws Exception
{
// get our own identity
myself = Identity.get(mbeanserver);
// add my domain if domains empty and xml not set
if(domains.isEmpty() && xml == null)
{
domains.add(myself.getDomain());
}
// find our NetworkRegistry
registryObjectName = NetworkRegistryFinder.find(mbeanserver);
if(registryObjectName == null)
{
log.warn("Detector: " + getClass().getName() + " could not be loaded because the NetworkRegistry isn't registered");
return;
}
startPinger(getPingerDelay(), getPingerPeriod());
startHeartbeat(getHeartbeatDelay(), getHeartbeatPeriod());
}
/**
* return the delay in milliseconds between when the timer is created to when the first pinger thread runs.
* defaults to <tt>5000</tt>
*
* @return
*/
protected long getPingerDelay()
{
return 5000;
}
/**
* return the period in milliseconds between checking lost servers against the last detection timestamp.
* defaults to <tt>1500</tt>
*
* @return
*/
protected long getPingerPeriod()
{
return 1500;
}
/**
* start the pinger timer thread
*
* @param delay
* @param period
*/
protected void startPinger(long delay, long period)
{
failureTimer = new Timer(false);
failureTimer.schedule(new FailureDetector(), delay, period);
}
/**
* stop the pinger timer thread
*/
protected void stopPinger()
{
if(failureTimer != null)
{
failureTimer.cancel();
failureTimer = null;
}
}
/**
* called by the MBeanServer to stop the mbean lifecycle
*
* @throws Exception
*/
public void stop() throws Exception
{
stopPinger();
stopHeartbeat();
stopPinger();
}
public void postDeregister()
{
}
public void postRegister(Boolean aBoolean)
{
}
public void preDeregister() throws Exception
{
}
public ObjectName preRegister(MBeanServer mBeanServer, ObjectName objectName) throws Exception
{
this.mbeanserver = mBeanServer;
this.objectName = objectName;
return objectName;
}
/**
* set the configuration for the domains to be recognized by detector
*
* @param xml
*
* @jmx.managed-attribute description="Configuration is an xml element indicating domains to be recognized by detector"
* access="read-write"
*/
public void setConfiguration(Element xml)
throws Exception
{
this.xml = xml;
// check configuration xml
if(xml != null)
{
// clearing collection of domains since have new ones to set
domains.clear();
NodeList domainNodes = xml.getElementsByTagName("domain");
if(domainNodes == null || domainNodes.getLength() <= 0)
{
// no domains specified, so will accept all domains
if(log.isDebugEnabled())
{
log.debug("No domains specified. Will accept all domains.");
}
}
int len = domainNodes.getLength();
for(int c = 0; c < len; c++)
{
Node node = domainNodes.item(c);
String domain = node.getFirstChild().getNodeValue();
domains.add(domain);
if(log.isDebugEnabled())
{
log.debug("Added domain " + domain + " to detector list.");
}
}
}
}
/**
* The <code>getConfiguration</code> method
*
* @return an <code>Element</code> value
*
* @jmx.managed-attribute
*/
public Element getConfiguration()
{
return xml;
}
//----------------------- protected
/**
* start heartbeating
*
* @param delay
* @param period
*/
protected void startHeartbeat(long delay, long period)
{
if(heartbeatTimer == null)
{
heartbeatTimer = new Timer(false);
}
heartbeatTimer.schedule(new Heartbeat(), delay, period);
}
/**
* stop heartbeating
*
*/
protected void stopHeartbeat()
{
if(heartbeatTimer != null)
{
try
{
heartbeatTimer.cancel();
}
catch(Exception eg)
{
}
heartbeatTimer = null;
}
}
/**
* return the initial delay in milliseconds before the initial heartbeat is fired.
* Defaults to <tt>0</tt>
*
* @return
*/
protected long getHeartbeatDelay()
{
return 0;
}
/**
* return the period in milliseconds between subsequent heartbeats. Defaults to
* <tt>1000</tt>
*
* @return
*/
protected long getHeartbeatPeriod()
{
return heartbeatTimeDelay;
}
/**
* subclasses must implement to provide the specific heartbeat protocol
* for this server to send out to other servers on the network
*/
protected abstract void heartbeat();
/**
* called when a remote detection from a peer is received by a detector
*
* @param detection
*/
protected void detect(Detection detection)
{
if(log.isTraceEnabled())
{
log.trace("Detection message received.");
log.trace("Id = " + detection.getIdentity().getInstanceId());
log.trace("isRemoteDetection() = " + isRemoteDetection(detection));
}
// we only track detections within our own domain and not ourself
if(isRemoteDetection(detection))
{
try
{
boolean found = false;
Server server = null;
synchronized(servers)
{
server = (Server) servers.get(detection);
found = server != null;
if(!found)
{
// update either way the timestamp and the detection
servers.put(detection, (server = new Server(detection)));
}
else
{
server.lastDetection = System.currentTimeMillis();
}
}
if(found == false)
{
if(log.isDebugEnabled())
{
log.debug("detected NEW server: " + detection.getIdentity());
}
mbeanserver.invoke(registryObjectName, "addServer", new Object[]{detection.getIdentity(), detection.getLocators()}, new String[]{Identity.class.getName(), InvokerLocator[].class.getName()});
}
else
{
if(server.changed(detection))
{
// update hash
server.rehash(detection);
if(log.isTraceEnabled())
{
log.trace("detected UPDATE for server: " + detection.getIdentity());
}
mbeanserver.invoke(registryObjectName, "updateServer", new Object[]{detection.getIdentity(), detection.getLocators()}, new String[]{Identity.class.getName(), InvokerLocator[].class.getName()});
}
}
}
catch(javax.management.InstanceNotFoundException inf)
{
return;
}
catch(Exception e)
{
log.error("Error during detection of: " + detection, e);
}
}
else if(log.isTraceEnabled())
{
log.trace("detection from myself - ignored");
}
}
protected boolean isRemoteDetection(Detection detection)
{
String domain = detection.getIdentity().getDomain();
// is detection domain in accepted domain collection and not local
// if domains empty, then accept all
return (domains.isEmpty() || domains.contains(domain)) &&
myself.isSameJVM(detection.getIdentity()) == false;
}
protected boolean checkInvokerServer(Detection detection, ClassLoader cl)
{
boolean ok = false;
InvokerLocator il[] = detection.getLocators();
for(int c = 0; c < il.length; c++)
{
try
{
ClientInvoker ci = InvokerRegistry.createClientInvoker(il[c]);
if(ci.isConnected() == false)
{
ci.connect();
}
Boolean b = (Boolean) ci.invoke(new InvocationRequest(getClass().getName(), Subsystem.SELF, "$PING$", null, null, null));
if(b != null)
{
// the transport was successful
ok = true;
break;
}
}
catch(Throwable ig)
{
log.debug("failed calling ping on " + detection, ig);
// remove the client invoker, it's not any good anymore
InvokerRegistry.destroyClientInvoker(il[c]);
// will be aggressive with removal. if any fail, remove it.
// if still good, will pick up detection again next go around.
break;
}
}
if(ok == false)
{
// the server is down!
try
{
mbeanserver.invoke(registryObjectName, "removeServer", new Object[]{detection.getIdentity()}, new String[]{Identity.class.getName()});
if(log.isDebugEnabled())
{
log.debug("Removed detection " + detection);
}
}
catch(Exception ex)
{
log.warn("Error removing server", ex);
}
finally
{
// remove this server, it isn't available any more
servers.remove(detection);
}
}
return ok;
}
private final class FailureDetector extends TimerTask
{
public void run()
{
if(servers.isEmpty())
{
return;
}
// make a copy so we don't have to block incoming
// notifications during failure check
Map map = null;
synchronized(servers)
{
map = new HashMap(servers);
}
ClassLoader cl = AbstractDetector.this.getClass().getClassLoader();
// walk through each detection and see if it needs checking up on ...
Iterator iter = map.keySet().iterator();
while(iter.hasNext())
{
Detection detection = (Detection) iter.next();
long lastDetection = 0;
Server server = null;
synchronized(servers)
{
server = (Server) map.get(detection);
lastDetection = server.lastDetection;
}
long duration = System.currentTimeMillis() - lastDetection;
if(duration >= defaultTimeDelay)
{
if(log.isDebugEnabled())
{
log.debug("detection for: " + detection + " has not been received in: " + defaultTimeDelay + " ms, contacting..");
}
// OK, we've exceeded the time delay since the last time we've detected
// this dude, he might be down, let's walk through each of his transports and
// see if any of them lead to a valid invocation
if(checkInvokerServer(detection, cl))
{
if(log.isDebugEnabled())
{
log.debug("detection for: " + detection + " recovered on ping");
}
server.lastDetection = System.currentTimeMillis();
}
}
}
}
}
private final class Server
{
Detection detection;
private int hashCode = 0;
long lastDetection = System.currentTimeMillis();
Server(Detection detection)
{
rehash(detection);
}
private void rehash(Detection d)
{
this.hashCode = hash(d);
}
private int hash(Detection d)
{
int hc = 0;
InvokerLocator locators[] = d.getLocators();
if(locators != null)
{
for(int c = 0; c < locators.length; c++)
{
hc += locators[c].hashCode();
}
}
return hc;
}
boolean changed(Detection detection)
{
return hashCode != hash(detection);
}
public boolean equals(Object obj)
{
return obj instanceof Server && hashCode == obj.hashCode();
}
public int hashCode()
{
return hashCode;
}
}
private final class Heartbeat extends TimerTask
{
public void run()
{
InvokerLocator il[] = InvokerRegistry.getRegisteredServerLocators();
if(il != null && il.length > 0)
{
// we only heartbeat if we have connectors and the ability for a
// client to reach us back, otherwise its sort of a mute point ..
heartbeat();
}
}
}
}
| AbstractDetector.java |