/***************************************
 *                                     *
 *  JBoss: The OpenSource J2EE WebOS   *
 *                                     *
 *  Distributable under LGPL license.  *
 *  See terms of license at gnu.org.   *
 *                                     *
 ***************************************/
package org.jboss.remoting.detection;


import org.jboss.logging.Logger;
import org.jboss.remoting.InvocationRequest;
import org.jboss.remoting.InvokerLocator;
import org.jboss.remoting.InvokerRegistry;
import org.jboss.remoting.Subsystem;
import org.jboss.remoting.ident.Identity;
import org.jboss.remoting.network.NetworkRegistryFinder;
import org.jboss.remoting.transport.ClientInvoker;
import org.jboss.system.ConfigurationException;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;


/**
 * AbstractDetector
 *
 * @author <a href="mailto:jhaynie@vocalocity.net">Jeff Haynie</a>
 * @author <a href="mailto:tom.elrod@jboss.com">Tom Elrod</a>
 * @version $Revision: 1.15.8.2 $
 */
public abstract class AbstractDetector implements Detector, AbstractDetectorMBean
{
    private long defaultTimeDelay = 5000;
    private long heartbeatTimeDelay = 1000;
    protected final Logger log = Logger.getLogger(getClass());
    protected MBeanServer mbeanserver;
    protected ObjectName objectName;
    protected ObjectName registryObjectName;

    private Identity myself;
    private Timer heartbeatTimer;
    private Timer failureTimer;
    private Map servers = new HashMap();
    private Element xml;
    private Set domains = new HashSet();

    public AbstractDetector()
    {
    }

   /**
    * The amount of time to wait between sending (and sometimes receiving) detection messages.
    * @param heartbeatTimeDelay
    * @throws ConfigurationException
    */
   public void setHeartbeatTimeDelay(long heartbeatTimeDelay) throws ConfigurationException
   {
      if(heartbeatTimeDelay > 0 && heartbeatTimeDelay < defaultTimeDelay)
      {
         this.heartbeatTimeDelay = heartbeatTimeDelay;
      }
      else
      {
         throw new ConfigurationException("Can not set heartbeat time delay (" + heartbeatTimeDelay + ") to a negative number or " +
                                          "to a number greater than the default time delay (" + defaultTimeDelay + ").");
      }
   }

   /**
    * The amount of time to wait between sending (and sometimes receiving) detection messages.
    * @return
    */
   public long getHeartbeatTimeDelay()
   {
      return heartbeatTimeDelay;
   }

   /**
    * The amount of time which can elapse without receiving a detection event before a server
    * will be suspected as being dead and peroforming an explicit invocation on it to verify it is alive.
    * @param defaultTimeDelay time in milliseconds
    * @throws ConfigurationException
    */
   public void setDefaultTimeDelay(long defaultTimeDelay) throws ConfigurationException
   {
      if(defaultTimeDelay >= heartbeatTimeDelay)
      {
         this.defaultTimeDelay = defaultTimeDelay;
      }
      else
      {
         throw new ConfigurationException("Can not set the default time delay (" + defaultTimeDelay + ") to be less" +
                                          " than that of the heartbeat time delay (" + heartbeatTimeDelay + ").");
      }
   }

   /**
    *
    * @return The amount of time which can elapse without receiving a detection event before a server
    * will be suspected as being dead and peroforming an explicit invocation on it to verify it is alive.
    */
   public long getDefaultTimeDelay()
   {
      return defaultTimeDelay;
   }

    /**
     * called by MBeanServer to start the mbean lifecycle
     *
     * @throws Exception
     */
    public void start() throws Exception
    {
        // get our own identity
        myself = Identity.get(mbeanserver);

        // add my domain if domains empty and xml not set
        if(domains.isEmpty() && xml == null)
        {
            domains.add(myself.getDomain());
        }

        // find our NetworkRegistry
        registryObjectName = NetworkRegistryFinder.find(mbeanserver);
        if(registryObjectName == null)
        {
            log.warn("Detector: " + getClass().getName() + " could not be loaded because the NetworkRegistry isn't registered");
            return;
        }

        startPinger(getPingerDelay(), getPingerPeriod());
        startHeartbeat(getHeartbeatDelay(), getHeartbeatPeriod());
    }

    /**
     * return the delay in milliseconds between when the timer is created to when the first pinger thread runs.
     * defaults to <tt>5000</tt>
     *
     * @return
     */
    protected long getPingerDelay()
    {
        return 5000;
    }

    /**
     * return the period in milliseconds between checking lost servers against the last detection timestamp.
     * defaults to <tt>1500</tt>
     *
     * @return
     */
    protected long getPingerPeriod()
    {
        return 1500;
    }

    /**
     * start the pinger timer thread
     *
     * @param delay
     * @param period
     */
    protected void startPinger(long delay, long period)
    {
        failureTimer = new Timer(false);
        failureTimer.schedule(new FailureDetector(), delay, period);
    }

    /**
     * stop the pinger timer thread
     */
    protected void stopPinger()
    {
        if(failureTimer != null)
        {
            failureTimer.cancel();
            failureTimer = null;
        }
    }

    /**
     * called by the MBeanServer to stop the mbean lifecycle
     *
     * @throws Exception
     */
    public void stop() throws Exception
    {
        stopPinger();
        stopHeartbeat();
        stopPinger();
    }

    public void postDeregister()
    {
    }

    public void postRegister(Boolean aBoolean)
    {
    }

    public void preDeregister() throws Exception
    {
    }

    public ObjectName preRegister(MBeanServer mBeanServer, ObjectName objectName) throws Exception
    {
        this.mbeanserver = mBeanServer;
        this.objectName = objectName;
        return objectName;
    }

    /**
     * set the configuration for the domains to be recognized by detector
     *
     * @param xml
     *
     * @jmx.managed-attribute description="Configuration is an xml element indicating domains to be recognized by detector"
     *     access="read-write"
     */
    public void setConfiguration(Element xml)
            throws Exception
    {
        this.xml = xml;

        // check configuration xml
        if(xml != null)
        {
            // clearing collection of domains since have new ones to set
            domains.clear();

            NodeList domainNodes = xml.getElementsByTagName("domain");
            if(domainNodes == null || domainNodes.getLength() <= 0)
            {
                // no domains specified, so will accept all domains
                if(log.isDebugEnabled())
                {
                    log.debug("No domains specified.  Will accept all domains.");
                }
            }
            int len = domainNodes.getLength();
            for(int c = 0; c < len; c++)
            {
                Node node = domainNodes.item(c);
                String domain = node.getFirstChild().getNodeValue();
                domains.add(domain);
                if(log.isDebugEnabled())
                {
                    log.debug("Added domain " + domain + " to detector list.");
                }
            }
        }
    }

    /**
     * The <code>getConfiguration</code> method
     *
     * @return an <code>Element</code> value
     *
     * @jmx.managed-attribute
     */
    public Element getConfiguration()
    {
        return xml;
    }

    //----------------------- protected

    /**
     * start heartbeating
     *
     * @param delay
     * @param period
     */
    protected void startHeartbeat(long delay, long period)
    {
        if(heartbeatTimer == null)
        {
            heartbeatTimer = new Timer(false);
        }
        heartbeatTimer.schedule(new Heartbeat(), delay, period);
    }

    /**
     * stop heartbeating
     *
     */
    protected void stopHeartbeat()
    {
        if(heartbeatTimer != null)
        {
            try
            {
                heartbeatTimer.cancel();
            }
            catch(Exception eg)
            {
            }
            heartbeatTimer = null;
        }
    }

    /**
     * return the initial delay in milliseconds before the initial heartbeat is fired.
     * Defaults to <tt>0</tt>
     *
     * @return
     */
    protected long getHeartbeatDelay()
    {
        return 0;
    }

    /**
     * return the period in milliseconds between subsequent heartbeats. Defaults to
     * <tt>1000</tt>
     *
     * @return
     */
    protected long getHeartbeatPeriod()
    {
        return heartbeatTimeDelay;
    }

    /**
     * subclasses must implement to provide the specific heartbeat protocol
     * for this server to send out to other servers on the network
     */
    protected abstract void heartbeat();

    /**
     * called when a remote detection from a peer is received by a detector
     *
     * @param detection
     */
    protected void detect(Detection detection)
    {
        if(log.isTraceEnabled())
        {
            log.trace("Detection message received.");
            log.trace("Id = " + detection.getIdentity().getInstanceId());
            log.trace("isRemoteDetection() = " + isRemoteDetection(detection));
        }
        // we only track detections within our own domain and not ourself
        if(isRemoteDetection(detection))
        {
            try
            {
                boolean found = false;
                Server server = null;

                synchronized(servers)
                {
                    server = (Server) servers.get(detection);
                    found = server != null;
                    if(!found)
                    {
                        // update either way the timestamp and the detection
                        servers.put(detection, (server = new Server(detection)));
                    }
                    else
                    {
                        server.lastDetection = System.currentTimeMillis();
                    }
                }
                if(found == false)
                {
                    if(log.isDebugEnabled())
                    {
                        log.debug("detected NEW server: " + detection.getIdentity());
                    }
                    mbeanserver.invoke(registryObjectName, "addServer", new Object[]{detection.getIdentity(), detection.getLocators()}, new String[]{Identity.class.getName(), InvokerLocator[].class.getName()});
                }
                else
                {
                    if(server.changed(detection))
                    {
                        // update hash
                        server.rehash(detection);
                        if(log.isTraceEnabled())
                        {
                            log.trace("detected UPDATE for server: " + detection.getIdentity());
                        }
                        mbeanserver.invoke(registryObjectName, "updateServer", new Object[]{detection.getIdentity(), detection.getLocators()}, new String[]{Identity.class.getName(), InvokerLocator[].class.getName()});
                    }
                }
            }
            catch(javax.management.InstanceNotFoundException inf)
            {
                return;
            }
            catch(Exception e)
            {
                log.error("Error during detection of: " + detection, e);
            }
        }
        else if(log.isTraceEnabled())
        {
            log.trace("detection from myself - ignored");
        }
    }

    protected boolean isRemoteDetection(Detection detection)
    {
        String domain = detection.getIdentity().getDomain();
        // is detection domain in accepted domain collection and not local
        // if domains empty, then accept all
        return (domains.isEmpty() || domains.contains(domain)) &&
                myself.isSameJVM(detection.getIdentity()) == false;
    }

    protected boolean checkInvokerServer(Detection detection, ClassLoader cl)
    {
        boolean ok = false;
        InvokerLocator il[] = detection.getLocators();
        for(int c = 0; c < il.length; c++)
        {
            try
            {
                ClientInvoker ci = InvokerRegistry.createClientInvoker(il[c]);

                if(ci.isConnected() == false)
                {
                    ci.connect();
                }

                Boolean b = (Boolean) ci.invoke(new InvocationRequest(getClass().getName(), Subsystem.SELF, "$PING$", null, null, null));
                if(b != null)
                {
                    // the transport was successful
                    ok = true;
                    break;
                }

            }
            catch(Throwable ig)
            {
                log.debug("failed calling ping on " + detection, ig);
                // remove the client invoker, it's not any good anymore
                InvokerRegistry.destroyClientInvoker(il[c]);
                // will be aggressive with removal.  if any fail, remove it.
                // if still good, will pick up detection again next go around.
                break;
            }
        }
        if(ok == false)
        {
            // the server is down!
            try
            {
                mbeanserver.invoke(registryObjectName, "removeServer", new Object[]{detection.getIdentity()}, new String[]{Identity.class.getName()});
                if(log.isDebugEnabled())
                {
                    log.debug("Removed detection " + detection);
                }
            }
            catch(Exception ex)
            {
                log.warn("Error removing server", ex);
            }
            finally
            {
                // remove this server, it isn't available any more
                servers.remove(detection);
            }
        }

        return ok;
    }


    private final class FailureDetector extends TimerTask
    {
        public void run()
        {
            if(servers.isEmpty())
            {
                return;
            }
            // make a copy so we don't have to block incoming
            // notifications during failure check
            Map map = null;
            synchronized(servers)
            {
                map = new HashMap(servers);
            }
            ClassLoader cl = AbstractDetector.this.getClass().getClassLoader();
            // walk through each detection and see if it needs checking up on ...
            Iterator iter = map.keySet().iterator();
            while(iter.hasNext())
            {
                Detection detection = (Detection) iter.next();
                long lastDetection = 0;
                Server server = null;
                synchronized(servers)
                {
                    server = (Server) map.get(detection);
                    lastDetection = server.lastDetection;
                }
                long duration = System.currentTimeMillis() - lastDetection;
                if(duration >= defaultTimeDelay)
                {
                    if(log.isDebugEnabled())
                    {
                        log.debug("detection for: " + detection + " has not been received in: " + defaultTimeDelay + " ms, contacting..");
                    }
                    // OK, we've exceeded the time delay since the last time we've detected
                    // this dude, he might be down, let's walk through each of his transports and
                    // see if any of them lead to a valid invocation
                    if(checkInvokerServer(detection, cl))
                    {
                        if(log.isDebugEnabled())
                        {
                            log.debug("detection for: " + detection + " recovered on ping");
                        }
                        server.lastDetection = System.currentTimeMillis();
                    }
                }
            }
        }

    }

    private final class Server
    {
        Detection detection;
        private int hashCode = 0;
        long lastDetection = System.currentTimeMillis();

        Server(Detection detection)
        {
            rehash(detection);
        }

        private void rehash(Detection d)
        {
            this.hashCode = hash(d);
        }

        private int hash(Detection d)
        {
            int hc = 0;
            InvokerLocator locators[] = d.getLocators();
            if(locators != null)
            {
                for(int c = 0; c < locators.length; c++)
                {
                    hc += locators[c].hashCode();
                }
            }
            return hc;
        }

        boolean changed(Detection detection)
        {
            return hashCode != hash(detection);
        }

        public boolean equals(Object obj)
        {
            return obj instanceof Server && hashCode == obj.hashCode();
        }

        public int hashCode()
        {
            return hashCode;
        }
    }

    private final class Heartbeat extends TimerTask
    {
        public void run()
        {
            InvokerLocator il[] = InvokerRegistry.getRegisteredServerLocators();
            if(il != null && il.length > 0)
            {
                // we only heartbeat if we have connectors and the ability for a
                // client to reach us back, otherwise its sort of a mute point ..
                heartbeat();
            }
        }
    }
}