package org.jboss.remoting.detection;
import org.jboss.logging.Logger;
import org.jboss.remoting.InvocationRequest;
import org.jboss.remoting.InvokerLocator;
import org.jboss.remoting.InvokerRegistry;
import org.jboss.remoting.Subsystem;
import org.jboss.remoting.ident.Identity;
import org.jboss.remoting.network.NetworkRegistryFinder;
import org.jboss.remoting.transport.ClientInvoker;
import org.jboss.system.ConfigurationException;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
public abstract class AbstractDetector implements Detector, AbstractDetectorMBean
{
private long defaultTimeDelay = 5000;
private long heartbeatTimeDelay = 1000;
protected final Logger log = Logger.getLogger(getClass());
protected MBeanServer mbeanserver;
protected ObjectName objectName;
protected ObjectName registryObjectName;
private Identity myself;
private Timer heartbeatTimer;
private Timer failureTimer;
private Map servers = new HashMap();
private Element xml;
private Set domains = new HashSet();
public AbstractDetector()
{
}
public void setHeartbeatTimeDelay(long heartbeatTimeDelay) throws ConfigurationException
{
if(heartbeatTimeDelay > 0 && heartbeatTimeDelay < defaultTimeDelay)
{
this.heartbeatTimeDelay = heartbeatTimeDelay;
}
else
{
throw new ConfigurationException("Can not set heartbeat time delay (" + heartbeatTimeDelay + ") to a negative number or " +
"to a number greater than the default time delay (" + defaultTimeDelay + ").");
}
}
public long getHeartbeatTimeDelay()
{
return heartbeatTimeDelay;
}
public void setDefaultTimeDelay(long defaultTimeDelay) throws ConfigurationException
{
if(defaultTimeDelay >= heartbeatTimeDelay)
{
this.defaultTimeDelay = defaultTimeDelay;
}
else
{
throw new ConfigurationException("Can not set the default time delay (" + defaultTimeDelay + ") to be less" +
" than that of the heartbeat time delay (" + heartbeatTimeDelay + ").");
}
}
public long getDefaultTimeDelay()
{
return defaultTimeDelay;
}
public void start() throws Exception
{
myself = Identity.get(mbeanserver);
if(domains.isEmpty() && xml == null)
{
domains.add(myself.getDomain());
}
registryObjectName = NetworkRegistryFinder.find(mbeanserver);
if(registryObjectName == null)
{
log.warn("Detector: " + getClass().getName() + " could not be loaded because the NetworkRegistry isn't registered");
return;
}
startPinger(getPingerDelay(), getPingerPeriod());
startHeartbeat(getHeartbeatDelay(), getHeartbeatPeriod());
}
protected long getPingerDelay()
{
return 5000;
}
protected long getPingerPeriod()
{
return 1500;
}
protected void startPinger(long delay, long period)
{
failureTimer = new Timer(false);
failureTimer.schedule(new FailureDetector(), delay, period);
}
protected void stopPinger()
{
if(failureTimer != null)
{
failureTimer.cancel();
failureTimer = null;
}
}
public void stop() throws Exception
{
stopPinger();
stopHeartbeat();
stopPinger();
}
public void postDeregister()
{
}
public void postRegister(Boolean aBoolean)
{
}
public void preDeregister() throws Exception
{
}
public ObjectName preRegister(MBeanServer mBeanServer, ObjectName objectName) throws Exception
{
this.mbeanserver = mBeanServer;
this.objectName = objectName;
return objectName;
}
public void setConfiguration(Element xml)
throws Exception
{
this.xml = xml;
if(xml != null)
{
domains.clear();
NodeList domainNodes = xml.getElementsByTagName("domain");
if(domainNodes == null || domainNodes.getLength() <= 0)
{
if(log.isDebugEnabled())
{
log.debug("No domains specified. Will accept all domains.");
}
}
int len = domainNodes.getLength();
for(int c = 0; c < len; c++)
{
Node node = domainNodes.item(c);
String domain = node.getFirstChild().getNodeValue();
domains.add(domain);
if(log.isDebugEnabled())
{
log.debug("Added domain " + domain + " to detector list.");
}
}
}
}
public Element getConfiguration()
{
return xml;
}
protected void startHeartbeat(long delay, long period)
{
if(heartbeatTimer == null)
{
heartbeatTimer = new Timer(false);
}
heartbeatTimer.schedule(new Heartbeat(), delay, period);
}
protected void stopHeartbeat()
{
if(heartbeatTimer != null)
{
try
{
heartbeatTimer.cancel();
}
catch(Exception eg)
{
}
heartbeatTimer = null;
}
}
protected long getHeartbeatDelay()
{
return 0;
}
protected long getHeartbeatPeriod()
{
return heartbeatTimeDelay;
}
protected abstract void heartbeat();
protected void detect(Detection detection)
{
if(log.isTraceEnabled())
{
log.trace("Detection message received.");
log.trace("Id = " + detection.getIdentity().getInstanceId());
log.trace("isRemoteDetection() = " + isRemoteDetection(detection));
}
if(isRemoteDetection(detection))
{
try
{
boolean found = false;
Server server = null;
synchronized(servers)
{
server = (Server) servers.get(detection);
found = server != null;
if(!found)
{
servers.put(detection, (server = new Server(detection)));
}
else
{
server.lastDetection = System.currentTimeMillis();
}
}
if(found == false)
{
if(log.isDebugEnabled())
{
log.debug("detected NEW server: " + detection.getIdentity());
}
mbeanserver.invoke(registryObjectName, "addServer", new Object[]{detection.getIdentity(), detection.getLocators()}, new String[]{Identity.class.getName(), InvokerLocator[].class.getName()});
}
else
{
if(server.changed(detection))
{
server.rehash(detection);
if(log.isTraceEnabled())
{
log.trace("detected UPDATE for server: " + detection.getIdentity());
}
mbeanserver.invoke(registryObjectName, "updateServer", new Object[]{detection.getIdentity(), detection.getLocators()}, new String[]{Identity.class.getName(), InvokerLocator[].class.getName()});
}
}
}
catch(javax.management.InstanceNotFoundException inf)
{
return;
}
catch(Exception e)
{
log.error("Error during detection of: " + detection, e);
}
}
else if(log.isTraceEnabled())
{
log.trace("detection from myself - ignored");
}
}
protected boolean isRemoteDetection(Detection detection)
{
String domain = detection.getIdentity().getDomain();
return (domains.isEmpty() || domains.contains(domain)) &&
myself.isSameJVM(detection.getIdentity()) == false;
}
protected boolean checkInvokerServer(Detection detection, ClassLoader cl)
{
boolean ok = false;
InvokerLocator il[] = detection.getLocators();
for(int c = 0; c < il.length; c++)
{
try
{
ClientInvoker ci = InvokerRegistry.createClientInvoker(il[c]);
if(ci.isConnected() == false)
{
ci.connect();
}
Boolean b = (Boolean) ci.invoke(new InvocationRequest(getClass().getName(), Subsystem.SELF, "$PING$", null, null, null));
if(b != null)
{
ok = true;
break;
}
}
catch(Throwable ig)
{
log.debug("failed calling ping on " + detection, ig);
InvokerRegistry.destroyClientInvoker(il[c]);
break;
}
}
if(ok == false)
{
try
{
mbeanserver.invoke(registryObjectName, "removeServer", new Object[]{detection.getIdentity()}, new String[]{Identity.class.getName()});
if(log.isDebugEnabled())
{
log.debug("Removed detection " + detection);
}
}
catch(Exception ex)
{
log.warn("Error removing server", ex);
}
finally
{
servers.remove(detection);
}
}
return ok;
}
private final class FailureDetector extends TimerTask
{
public void run()
{
if(servers.isEmpty())
{
return;
}
Map map = null;
synchronized(servers)
{
map = new HashMap(servers);
}
ClassLoader cl = AbstractDetector.this.getClass().getClassLoader();
Iterator iter = map.keySet().iterator();
while(iter.hasNext())
{
Detection detection = (Detection) iter.next();
long lastDetection = 0;
Server server = null;
synchronized(servers)
{
server = (Server) map.get(detection);
lastDetection = server.lastDetection;
}
long duration = System.currentTimeMillis() - lastDetection;
if(duration >= defaultTimeDelay)
{
if(log.isDebugEnabled())
{
log.debug("detection for: " + detection + " has not been received in: " + defaultTimeDelay + " ms, contacting..");
}
if(checkInvokerServer(detection, cl))
{
if(log.isDebugEnabled())
{
log.debug("detection for: " + detection + " recovered on ping");
}
server.lastDetection = System.currentTimeMillis();
}
}
}
}
}
private final class Server
{
Detection detection;
private int hashCode = 0;
long lastDetection = System.currentTimeMillis();
Server(Detection detection)
{
rehash(detection);
}
private void rehash(Detection d)
{
this.hashCode = hash(d);
}
private int hash(Detection d)
{
int hc = 0;
InvokerLocator locators[] = d.getLocators();
if(locators != null)
{
for(int c = 0; c < locators.length; c++)
{
hc += locators[c].hashCode();
}
}
return hc;
}
boolean changed(Detection detection)
{
return hashCode != hash(detection);
}
public boolean equals(Object obj)
{
return obj instanceof Server && hashCode == obj.hashCode();
}
public int hashCode()
{
return hashCode;
}
}
private final class Heartbeat extends TimerTask
{
public void run()
{
InvokerLocator il[] = InvokerRegistry.getRegisteredServerLocators();
if(il != null && il.length > 0)
{
heartbeat();
}
}
}
}