package org.jboss.remoting.detection.multicast;
import org.jboss.remoting.InvokerLocator;
import org.jboss.remoting.InvokerRegistry;
import org.jboss.remoting.ServerInvoker;
import org.jboss.remoting.detection.AbstractDetector;
import org.jboss.remoting.detection.Detection;
import org.jboss.remoting.ident.Identity;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.util.ArrayList;
import java.util.List;
public class MulticastDetector extends AbstractDetector implements MulticastDetectorMBean
{
private String defaultIP = "224.1.9.1";
private InetAddress addr;
private InetAddress bindAddr;
private int port = 2410;
private MulticastSocket socket;
private Listener listener = new Listener();
public String getDefaultIP()
{
return defaultIP;
}
public void setDefaultIP(String defaultIP)
{
this.defaultIP = defaultIP;
}
public InetAddress getAddress ()
{
return addr;
}
public void setAddress (InetAddress ip)
{
this.addr = ip;
}
public InetAddress getBindAddress()
{
return bindAddr;
}
public void setBindAddress(InetAddress ip)
{
this.bindAddr = ip;
}
public int getPort ()
{
return port;
}
public void setPort (int port)
{
this.port = port;
}
public void start () throws Exception
{
if (addr==null)
{
this.addr = InetAddress.getByName(defaultIP);
}
InetAddress localHost = InetAddress.getLocalHost ();
if (bindAddr==null && localHost.getHostAddress().equals("127.0.0.1"))
{
this.bindAddr = localHost;
}
socket = new MulticastSocket(port);
if (bindAddr != null)
socket.setInterface(bindAddr);
socket.joinGroup(addr);
super.start ();
if (listener==null)
{
listener = new Listener();
}
listener.start();
}
public void stop () throws Exception
{
super.stop ();
listener.running=false;
listener.interrupt();
listener=null;
socket.leaveGroup(addr);
socket.close();
socket = null;
}
protected void heartbeat ()
{
ServerInvoker invokers[] = InvokerRegistry.getServerInvokers();
if (invokers==null || invokers.length<=0)
{
return;
}
List l = new ArrayList(invokers.length);
for (int c=0;c<invokers.length;c++)
{
if (invokers[c].isStarted())
{
l.add(invokers[c].getLocator());
}
}
if (l.isEmpty())
{
return;
}
InvokerLocator locators[]=(InvokerLocator[])l.toArray(new InvokerLocator[l.size()]);
if (socket!=null)
{
Detection msg=new Detection(Identity.get(mbeanserver),locators);
try
{
if (log.isTraceEnabled())
{
log.trace("sending heartbeat: "+msg);
}
ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
ObjectOutputStream objectOut = new ObjectOutputStream(byteOut);
objectOut.writeObject(msg);
objectOut.flush();
byteOut.flush();
byte buf[] = byteOut.toByteArray();
DatagramPacket p = new DatagramPacket(buf, buf.length, addr, port);
socket.send(p);
}
catch (Throwable ex)
{
log.debug("heartbeat failed",ex);
}
}
}
private void listen(DatagramPacket p, byte[] buf)
{
if (socket!=null)
{
try
{
socket.receive(p);
ByteArrayInputStream byteInput = new ByteArrayInputStream(buf);
ObjectInputStream objectInput = new ObjectInputStream(byteInput);
Detection msg = (Detection)objectInput.readObject();
if (log.isTraceEnabled())
log.trace("received detection: "+msg);
detect(msg);
}
catch (Throwable e)
{
if (e instanceof java.io.InvalidClassException)
{
return;
}
if (socket!=null)
{
log.debug("Error receiving detection",e);
}
}
}
}
private final class Listener extends Thread
{
boolean running = true;
public void run()
{
byte[] buf = new byte[4000];
DatagramPacket p = new DatagramPacket(buf, 0, buf.length);
while (running)
{
listen(p, buf);
}
}
}
}