/***************************************
 *                                     *
 *  JBoss: The OpenSource J2EE WebOS   *
 *                                     *
 *  Distributable under LGPL license.  *
 *  See terms of license at gnu.org.   *
 *                                     *
 ***************************************/
package org.jboss.remoting.detection.multicast;

import org.jboss.remoting.InvokerLocator;
import org.jboss.remoting.InvokerRegistry;
import org.jboss.remoting.ServerInvoker;
import org.jboss.remoting.detection.AbstractDetector;
import org.jboss.remoting.detection.Detection;
import org.jboss.remoting.ident.Identity;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.util.ArrayList;
import java.util.List;

/**
 * MulticastDetector
 *
 * @author <a href="mailto:jhaynie@vocalocity.net">Jeff Haynie</a>
 * @author <a href="mailto:adrian.brock@happeningtimes.com">Adrian Brock</a>
 * @author <a href="mailto:tom.elrod@jboss.com">Tom Elrod</a>
 * @version $Revision: 1.7.10.3 $
 */
public class MulticastDetector extends AbstractDetector implements MulticastDetectorMBean
{

    private String defaultIP = "224.1.9.1";

    private InetAddress addr;
    private InetAddress bindAddr;
    private int port = 2410;
    private MulticastSocket socket;
//    private Identity identity;
    private Listener listener = new Listener();


   /**
    *
    * @return The IP that is used to broadcast detection messages on via multicast.
    */
   public String getDefaultIP()
   {
      return defaultIP;
   }

   /**
    *
    * @param defaultIP The IP that is used to broadcast detection messages on via multicast.
    */
   public void setDefaultIP(String defaultIP)
   {
      this.defaultIP = defaultIP;
   }

    /**
     * return the multicast address of the detector
     *
     * @return
     */
    public InetAddress getAddress ()
    {
        return addr;
    }

    /**
     * set the interface address of the multicast
     *
     * @param ip
     */
    public void setAddress (InetAddress ip)
    {
        this.addr = ip;
    }

    /**
     * return the bind address of the detector
     *
     * @return
     */
    public InetAddress getBindAddress()
    {
       return bindAddr;
    }

    /**
     * set the bind address of the multicast
     *
     * @param ip
     */
    public void setBindAddress(InetAddress ip)
    {
       this.bindAddr = ip;
    }

    /**
     * get the port that the detector is multicasting to
     *
     * @return
     */
    public int getPort ()
    {
        return port;
    }

    /**
     * set the port for detections to be multicast to
     *
     * @param port
     */
    public void setPort (int port)
    {
        this.port = port;
    }

    /**
     * called by MBeanServer to start the mbean lifecycle
     *
     * @throws Exception
     */
    public void start () throws Exception
    {
        if (addr==null)
        {
            this.addr = InetAddress.getByName(defaultIP);
        }
        // check to see if we're running on a machine with loopback and no NIC
        InetAddress localHost = InetAddress.getLocalHost ();
        if (bindAddr==null && localHost.getHostAddress().equals("127.0.0.1"))
        {
           // use this to bind so multicast will work w/o network
           this.bindAddr = localHost;
        }
        socket = new MulticastSocket(port);
        if (bindAddr != null)
           socket.setInterface(bindAddr);
        socket.joinGroup(addr);

       super.start ();
       
        if (listener==null)
        {
            listener = new Listener();
        }
        listener.start();
    }

    /**
     * called by the MBeanServer to stop the mbean lifecycle
     *
     * @throws Exception
     */
    public void stop () throws Exception
    {
        super.stop ();
        listener.running=false;
        listener.interrupt();
        listener=null;
        socket.leaveGroup(addr);
        socket.close();
        socket = null;
    }

    /**
     * subclasses must implement to provide the specific heartbeat protocol
     * for this server to send out to other servers on the network
     */
    protected void heartbeat ()
    {
        ServerInvoker invokers[] = InvokerRegistry.getServerInvokers();
        if (invokers==null || invokers.length<=0)
        {
            return;
        }
        List l = new ArrayList(invokers.length);
        for (int c=0;c<invokers.length;c++)
        {
            if (invokers[c].isStarted())
            {
                l.add(invokers[c].getLocator());
            }
        }
        if (l.isEmpty())
        {
            return;
        }
        InvokerLocator locators[]=(InvokerLocator[])l.toArray(new InvokerLocator[l.size()]);
        if (socket!=null)
        {
          Detection msg=new Detection(Identity.get(mbeanserver),locators);
          try
          {
            if (log.isTraceEnabled())
            {
                log.trace("sending heartbeat: "+msg);
            }
            ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
            ObjectOutputStream objectOut = new ObjectOutputStream(byteOut);
            objectOut.writeObject(msg);
            objectOut.flush();
            byteOut.flush();
            byte buf[] = byteOut.toByteArray();
            DatagramPacket p = new DatagramPacket(buf, buf.length, addr, port);
            socket.send(p);
          }
          catch (Throwable ex)
          {
            // its failed
            log.debug("heartbeat failed",ex);
          }
        }
    }

    private void listen(DatagramPacket p, byte[] buf)
    {
       if (socket!=null)
       {
          try
          {
             // should block until we get a multicast
             socket.receive(p);

             // take the multicast, and deserialize into the detection event
             ByteArrayInputStream byteInput = new ByteArrayInputStream(buf);
             ObjectInputStream objectInput = new ObjectInputStream(byteInput);
             Detection msg = (Detection)objectInput.readObject();
             if (log.isTraceEnabled())
                log.trace("received detection: "+msg);

             // let the subclass do the hard work off handling detection
             detect(msg);
          }
          catch (Throwable e)
          {
             if (e instanceof java.io.InvalidClassException)
             {
                 return;
             }
             if (socket!=null)
             {
                log.debug("Error receiving detection",e);
             }
          }
       }
    }

    private final class Listener extends Thread
    {
       boolean running = true;

       public void run()
       {
          byte[] buf = new byte[4000];
          DatagramPacket p = new DatagramPacket(buf, 0, buf.length);
          //p.setAddress(addr);
          //p.setPort(port);
          while (running)
          {
             listen(p, buf);
          }
       }
   }
}