package org.jboss.mx.remoting.tracker;
import java.util.Iterator;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;
import java.util.Collections;
import java.util.Collection;
import java.lang.reflect.UndeclaredThrowableException;
import javax.management.*;
import org.jboss.logging.Logger;
import org.jboss.mx.remoting.MBeanLocator;
import org.jboss.mx.remoting.MBeanServerLocator;
import org.jboss.mx.remoting.event.ClassQueryExp;
import org.jboss.mx.remoting.event.CompositeQueryExp;
import org.jboss.mx.remoting.event.CompositeEventFilter;
import org.jboss.mx.remoting.JMXUtil;
import org.jboss.remoting.ConnectionFailedException;
import org.jboss.remoting.ident.Identity;
import org.jboss.remoting.network.NetworkRegistryFinder;
import org.jboss.remoting.network.NetworkNotification;
import org.jboss.remoting.network.NetworkRegistryMBean;
import org.jboss.remoting.network.NetworkInstance;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
public class MBeanTracker implements NotificationListener
{
private static final boolean logEvents = Boolean.getBoolean("jboss.mx.tracker.debug");
private static final transient Logger log = Logger.getLogger(MBeanTracker.class.getName());
private final QueryExp query;
private final boolean localOnly;
private final boolean wantNotifications;
private final NotificationFilter filter;
private final SynchronizedInt count=new SynchronizedInt(0);
private final Map mbeans=new HashMap();
private final String classes[];
private final List actions=new ArrayList(1);
private final ObjectName networkRegistry;
private final MBeanServer myserver;
public MBeanTracker (MBeanServer myserver, Class cl[], QueryExp query, boolean localOnly, MBeanTrackerAction action)
throws Exception
{
this(myserver,cl,query,localOnly,null,false,new MBeanTrackerAction[]{action});
}
public MBeanTracker (MBeanServer myserver, Class cl[], QueryExp query, boolean localOnly, MBeanTrackerAction actions[])
throws Exception
{
this(myserver,cl,query,localOnly,null,false,actions);
}
public MBeanTracker (MBeanServer myserver, Class cl[], boolean localOnly, MBeanTrackerAction action)
throws Exception
{
this(myserver,cl,null,localOnly,null,false,new MBeanTrackerAction[]{action});
}
public MBeanTracker (MBeanServer myserver, Class cl[], boolean localOnly, MBeanTrackerAction actions[])
throws Exception
{
this(myserver,cl,null,localOnly,null,false,actions);
}
public MBeanTracker (MBeanServer myserver, Class cl[], QueryExp query, boolean localOnly, NotificationFilter filter, boolean wantNotifications, MBeanTrackerAction action)
throws Exception
{
this(myserver,cl,query,localOnly,filter,wantNotifications,new MBeanTrackerAction[]{action});
}
public MBeanTracker (MBeanServer myserver, Class cl[], QueryExp query, boolean localOnly, NotificationFilter filter, boolean wantNotifications)
throws Exception
{
this(myserver,cl,query,localOnly,filter,wantNotifications,(MBeanTrackerAction[])null);
}
public MBeanTracker (MBeanServer myserver, Class cl[], QueryExp query, boolean localOnly, NotificationFilter filter, boolean wantNotifications, MBeanTrackerAction actions[])
throws Exception
{
this.localOnly = localOnly;
this.wantNotifications = wantNotifications;
this.filter = filter;
this.myserver = myserver;
if (log.isDebugEnabled())
{
StringBuffer buf=new StringBuffer("creating an MBeanTracker with the following parameters:\n");
buf.append("==========================================\n");
buf.append("MBeanServer: "+myserver+"\n");
if (cl==null)
{
buf.append("classes: none\n");
}
else
{
for (int c=0;c<cl.length;c++)
{
buf.append("classes["+c+"] "+cl[c].getName()+"\n");
}
}
log.debug("QueryExp: "+query+"\n");
log.debug("localOnly: "+localOnly+"\n");
log.debug("filter: "+filter+"\n");
log.debug("notifications: "+wantNotifications+"\n");
if (actions==null)
{
log.debug("actions: none\n");
}
else
{
for (int c=0;c<actions.length;c++)
{
log.debug("actions["+c+"]: "+actions[c]+"\n");
}
}
buf.append("==========================================\n");
log.debug(buf.toString());
}
if (actions!=null)
{
for (int c=0;c<actions.length;c++)
{
if (actions[c]!=null)
{
addActionListener(actions[c]);
}
}
}
if (cl!=null)
{
this.classes=new String[cl.length];
for (int c=0;c<cl.length;c++)
{
classes[c]=cl[c].getName();
}
}
else
{
this.classes = null;
}
if (query==null && cl!=null)
{
this.query = new ClassQueryExp(cl);
}
else
{
if (cl!=null)
{
this.query=new CompositeQueryExp(new QueryExp[]{new ClassQueryExp(cl,ClassQueryExp.OR),query});
}
else
{
this.query=query;
}
}
networkRegistry = NetworkRegistryFinder.find(myserver);
if (networkRegistry==null)
{
throw new Exception("NetworkRegistryMBean not found - MBeanTracker has a dependency on this MBean");
}
foundMBeanServer(new MBeanServerLocator(Identity.get(myserver)));
if (this.localOnly==false)
{
myserver.addNotificationListener(networkRegistry,this,null,null);
NetworkInstance instances[] = (NetworkInstance[])myserver.getAttribute(networkRegistry,"Servers");
if (instances!=null)
{
for (int c=0;c<instances.length;c++)
{
foundMBeanServer(new MBeanServerLocator(instances[c].getIdentity()));
}
}
}
}
public void addActionListener (MBeanTrackerAction action)
{
addActionListener(action,true);
}
public void addActionListener (MBeanTrackerAction action, boolean autoinitialregister)
{
if (log.isDebugEnabled())
{
log.debug("adding action: "+action+", autoinitialregister:"+autoinitialregister);
}
synchronized (actions)
{
actions.add(action);
}
if (autoinitialregister)
{
Set set = getMBeans();
Iterator iter = set.iterator();
while(iter.hasNext())
{
MBeanLocator locator=(MBeanLocator)iter.next();
fireRegister(locator);
}
}
}
public void removeActionListener (MBeanTrackerAction action)
{
if (log.isDebugEnabled())
{
log.debug("removing action: "+action);
}
Iterator iter=actions();
while(iter.hasNext())
{
MBeanTrackerAction _action = (MBeanTrackerAction)iter.next();
if (_action.equals(action))
{
iter.remove();
}
}
}
private NotificationFilter createFilterForServer (String id)
{
NotificationFilter serverfilter = null;
NotificationFilter nfilter=new MBeanTrackerFilter(id,classes,wantNotifications);
if (filter==null)
{
serverfilter = nfilter;
}
else
{
serverfilter = new CompositeEventFilter(new NotificationFilter[]{nfilter,filter});
}
return serverfilter;
}
protected void finalize () throws Throwable
{
destroy();
super.finalize();
}
public void destroy ()
{
if (log.isDebugEnabled())
{
log.debug("destroy");
}
try
{
myserver.removeNotificationListener(networkRegistry,this);
}
catch (Throwable ex) {}
}
public final boolean isEmpty ()
{
return count()<=0;
}
public final int count ()
{
return count.get();
}
public final Set getMBeans ()
{
Set set = new HashSet();
synchronized (mbeans)
{
Iterator iter = mbeans.values().iterator();
while(iter.hasNext())
{
Set beans = (Set)iter.next();
set.addAll(beans);
}
}
return set;
}
public final Iterator iterator ()
{
return getMBeans().iterator();
}
private void tryAddListener (MBeanServerLocator server, ObjectName mbean)
{
try
{
if (server.getMBeanServer().isInstanceOf(mbean,NotificationBroadcaster.class.getName()) &&
server.getMBeanServer().isInstanceOf(mbean,NetworkRegistryMBean.class.getName())==false)
{
server.getMBeanServer().addNotificationListener(mbean,this,createFilterForServer(server.getServerId()),server);
if (log.isDebugEnabled())
{
log.debug("added notification listener to: "+mbean+" on server: "+server);
}
}
}
catch (Throwable e)
{
log.error("Error registering listener for server:"+server+" and mbean:"+mbean,e);
}
}
private void tryRemoveListener (MBeanServerLocator server, ObjectName mbean)
{
try
{
if (server.getMBeanServer()==null)
{
return;
}
if (server.getMBeanServer().isInstanceOf(mbean,NotificationBroadcaster.class.getName())&&
server.getMBeanServer().isInstanceOf(mbean,NetworkRegistryMBean.class.getName())==false)
{
server.getMBeanServer().removeNotificationListener(mbean,this);
if (log.isDebugEnabled())
{
log.debug("removed notification listener to: "+mbean+" on server: "+server);
}
}
}
catch (javax.management.InstanceNotFoundException nf)
{
}
catch (ConnectionFailedException cnf)
{
}
catch (Exception e)
{
if (e instanceof UndeclaredThrowableException)
{
UndeclaredThrowableException ut=(UndeclaredThrowableException)e;
if (ut.getUndeclaredThrowable() instanceof ReflectionException)
{
ReflectionException re=(ReflectionException)ut.getUndeclaredThrowable();
if (re.getTargetException() instanceof InstanceNotFoundException||
re.getTargetException() instanceof ConnectionFailedException)
{
return;
}
}
else if (ut.getUndeclaredThrowable() instanceof MBeanException)
{
MBeanException mbe=(MBeanException)ut.getUndeclaredThrowable();
if (mbe.getTargetException() instanceof ConnectionFailedException)
{
return;
}
}
}
if (e instanceof MBeanException)
{
MBeanException mbe=(MBeanException)e;
if (mbe.getTargetException() instanceof ConnectionFailedException)
{
return;
}
}
log.warn("Error removing listener for server:"+server+" and mbean:"+mbean,e);
}
}
public void handleNotification (Notification notification, Object o)
{
if (log.isDebugEnabled())
{
log.debug("tracker received notification="+notification+" with handback="+o);
}
try
{
if (notification instanceof MBeanServerNotification && JMXUtil.getMBeanServerObjectName().equals(notification.getSource()))
{
MBeanServerNotification n=(MBeanServerNotification)notification;
String type=n.getType();
ObjectName mbean= n.getMBeanName();
if (type.equals(MBeanServerNotification.REGISTRATION_NOTIFICATION))
{
addMBean((MBeanServerLocator)o,mbean);
}
else
{
removeMBean((MBeanServerLocator)o,mbean);
}
return;
}
else if (notification instanceof NetworkNotification)
{
NetworkNotification nn=(NetworkNotification)notification;
String type=nn.getType();
if (type.equals(NetworkNotification.SERVER_ADDED))
{
Identity ident = nn.getIdentity();
MBeanServerLocator l=new MBeanServerLocator(ident);
foundMBeanServer(l);
}
else if (type.equals(NetworkNotification.SERVER_REMOVED))
{
Identity ident = nn.getIdentity();
MBeanServerLocator l=new MBeanServerLocator(ident);
lostMBeanServer(l);
}
return;
}
else if (notification instanceof AttributeChangeNotification)
{
AttributeChangeNotification ch=(AttributeChangeNotification)notification;
if (ch.getAttributeName().equals("State") && hasActions())
{
MBeanServerLocator server = (MBeanServerLocator)o;
Object src=ch.getSource();
if (src instanceof ObjectName)
{
ObjectName obj=(ObjectName)src;
fireStateChange(new MBeanLocator(server,obj),((Integer)ch.getOldValue()).intValue(),((Integer)ch.getNewValue()).intValue());
return;
}
else if (src instanceof MBeanLocator)
{
fireNotification((MBeanLocator)src,notification,o);
return;
}
}
}
if (wantNotifications&&hasActions())
{
MBeanServerLocator server=(MBeanServerLocator)o;
if (server!=null)
{
Object src=notification.getSource();
if (src instanceof ObjectName)
{
ObjectName obj=(ObjectName)src;
MBeanLocator locator=new MBeanLocator(server,obj);
fireNotification(locator,notification,o);
return;
}
else if (src instanceof MBeanLocator)
{
fireNotification((MBeanLocator)src,notification,o);
return;
}
else
{
log.debug("Unknown source type for notification: "+src);
}
}
}
}
catch (Exception e)
{
log.warn("Error encountered receiving notification: "+notification,e);
}
}
private boolean hasActions ()
{
synchronized(actions)
{
return actions.isEmpty()==false;
}
}
protected void fireNotification (MBeanLocator locator, Notification n, Object o)
{
Iterator iter=actions();
while(iter.hasNext())
{
MBeanTrackerAction action=(MBeanTrackerAction)iter.next();
if (wantNotifications && log.isDebugEnabled()) log.debug("forwarding tracker notification: "+n+" to action: "+action+" for tracker: "+this);
action.mbeanNotification(locator,n,o);
}
}
protected void fireStateChange (MBeanLocator locator, int ov, int nv)
{
Iterator iter=actions();
while(iter.hasNext())
{
MBeanTrackerAction action=(MBeanTrackerAction)iter.next();
if (wantNotifications && log.isDebugEnabled()) log.debug("forwarding tracker state change: "+nv+" ["+ov+"] to action: "+action+" for tracker: "+this);
action.mbeanStateChanged(locator,ov,nv);
}
}
private final Iterator actions ()
{
synchronized (actions)
{
if (actions.isEmpty())
{
return Collections.EMPTY_LIST.iterator();
}
return new ArrayList(actions).iterator();
}
}
protected void fireUnregister (MBeanLocator locator)
{
int c=0;
Iterator iter=actions();
while(iter.hasNext())
{
MBeanTrackerAction action=(MBeanTrackerAction)iter.next();
if (logEvents && log.isDebugEnabled())
{
log.debug("firing unregister to action ["+(++c)+"] => "+action+" for locator => "+locator);
}
action.mbeanUnregistered(locator);
}
}
protected void fireRegister (MBeanLocator locator)
{
int c = 0;
Iterator iter=actions();
while(iter.hasNext())
{
MBeanTrackerAction action=(MBeanTrackerAction)iter.next();
if (logEvents && log.isDebugEnabled())
{
log.debug("firing register to action ["+(++c)+"] => "+action+" for locator => "+locator);
}
action.mbeanRegistered(locator);
}
}
public void foundMBeanServer (MBeanServerLocator theserver)
{
synchronized(mbeans)
{
if (mbeans.containsKey(theserver))
{
return;
}
mbeans.put(theserver,new HashSet());
}
for (int c=0;c<3;c++)
{
try
{
theserver.getMBeanServer().addNotificationListener(JMXUtil.getMBeanServerObjectName(),this,createFilterForServer(theserver.getServerId()),theserver);
Set beans=theserver.getMBeanServer().queryMBeans(new ObjectName("*:*"),query);
if (beans.isEmpty()==false)
{
Iterator iter=beans.iterator();
while(iter.hasNext())
{
addMBean(theserver,((ObjectInstance)iter.next()).getObjectName());
}
}
else
{
if (log.isDebugEnabled())
{
log.debug("Queried server: "+theserver+", but found 0 mbeans matching query");
}
}
break;
}
catch (ConnectionFailedException ce)
{
if (log.isDebugEnabled())
{
log.debug("while trying to add a listener and get info for: "+theserver+", i lost it",ce);
}
if (c>=3)
{
if (log.isDebugEnabled())
{
log.debug("giving up on connection failed after "+c+" attempts... "+theserver);
}
lostMBeanServer(theserver);
}
}
catch (Exception ex)
{
log.warn("Exception adding mbeans from server: "+theserver,ex);
}
}
}
private void addMBean (MBeanServerLocator server, ObjectName mbean)
{
if (log.isDebugEnabled())
{
log.debug("addMBean called: "+server+", mbean: "+mbean);
}
MBeanLocator locator = new MBeanLocator(server,mbean);
boolean found = false;
synchronized(mbeans)
{
Set set = (Set)mbeans.get(server);
if (set!=null)
{
if (set.add(locator))
{
count.increment();
found=true;
}
}
}
if (!found) return;
tryAddListener(server,mbean);
if (hasActions())
{
fireRegister(locator);
}
}
private void removeMBean (MBeanServerLocator server, ObjectName mbean)
{
if (log.isDebugEnabled())
{
log.debug("removeMBean called: "+server+", mbean: "+mbean);
}
MBeanLocator locator = new MBeanLocator(server,mbean);
synchronized(mbeans)
{
Set set = (Set)mbeans.get(server);
if (set!=null)
{
if (set.remove(locator))
{
count.decrement();
}
else
{
return;
}
}
}
tryRemoveListener (server,mbean);
if (hasActions())
{
fireUnregister(locator);
}
}
public void lostMBeanServer (MBeanServerLocator server)
{
if (wantNotifications && log.isDebugEnabled()) log.debug("lostMBeanServer: "+server+" for tracker: "+this);
Collection list = null;
synchronized(mbeans)
{
list = (Set)mbeans.remove(server);
}
if (list!=null)
{
if (log.isDebugEnabled())
{
log.debug("lost mbean server = "+server+", list = "+list);
}
Iterator iter=list.iterator();
while(iter.hasNext())
{
MBeanLocator locator=(MBeanLocator)iter.next();
removeMBean(server,locator.getObjectName());
}
list.clear();
list = null;
}
}
}