package org.jboss.mx.remoting;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import org.jboss.logging.Logger;
import org.jboss.remoting.*;
import org.jboss.remoting.invocation.NameBasedInvocation;
import org.jboss.remoting.transport.ClientInvoker;
import org.jboss.remoting.network.NetworkNotification;
import org.jboss.remoting.network.NetworkRegistryFinder;
import javax.management.*;
import java.util.*;
public class MBeanNotificationCache implements NotificationListener
{
private static final Logger log = Logger.getLogger(MBeanNotificationCache.class.getName());
private final MBeanServer server;
private final List listeners=new ArrayList();
private final Map queue=new HashMap();
private final ObjectName networkRegistry;
private final ServerInvoker serverInvoker;
private final String localServerId;
public MBeanNotificationCache (ServerInvoker invoker, MBeanServer server)
throws Exception
{
this.server = server;
this.serverInvoker = invoker;
this.localServerId = JMXUtil.getServerId(server);
networkRegistry = NetworkRegistryFinder.find(server);
if (networkRegistry==null)
{
throw new Exception("Couldn't find the required NetworkRegistryMBean in this MBeanServer");
}
server.addNotificationListener(networkRegistry,this,null,this);
}
public void handleNotification (Notification notification, Object o)
{
if (notification instanceof NetworkNotification && o!=null && this.equals(o))
{
String type = notification.getType();
if (type.equals(NetworkNotification.SERVER_REMOVED))
{
NetworkNotification nn=(NetworkNotification)notification;
String sessionId = nn.getIdentity().getJMXId();
List failed = new ArrayList();
synchronized(listeners)
{
Iterator iter = listeners.iterator();
while(iter.hasNext())
{
Listener listener=(Listener)iter.next();
if (sessionId.equals(listener.sessionId))
{
failed.add(listener);
}
}
}
if (failed.isEmpty()==false)
{
Iterator iter = failed.iterator();
while(iter.hasNext())
{
Listener listener=(Listener)iter.next();
if (log.isDebugEnabled())
{
log.debug("++ Removed orphaned listener because server failed: "+nn.getIdentity());
}
try
{
removeNotificationListener(listener.locator,listener.sessionId,listener.objectName,listener.handback);
}
catch (Exception ig) { }
listener = null;
}
failed = null;
}
synchronized(queue)
{
queue.remove(sessionId);
}
}
}
}
public synchronized void destroy ()
{
if (log.isDebugEnabled())
{
log.debug("destroy call on notification cache");
}
synchronized(listeners)
{
Iterator iter = listeners.iterator();
while(iter.hasNext())
{
Listener l=(Listener)iter.next();
try
{
removeNotificationListener(l.locator,l.sessionId,l.objectName,l.handback);
}
catch (Exception e)
{
}
}
}
synchronized (queue)
{
queue.clear();
}
try
{
server.removeNotificationListener(networkRegistry,this);
}
catch (Exception ig) { }
}
public void addNotificationListener (InvokerLocator clientLocator, String sessionId, ObjectName objectName, NotificationFilter filter, Object handback)
throws InstanceNotFoundException
{
if (log.isDebugEnabled())
{
log.debug("remote notification listener added for client ["+clientLocator+"] on objectName ["+objectName+"] and mbeanServerId ["+sessionId+"], filter: "+filter+", handback: "+handback);
}
Listener l = new Listener(clientLocator, sessionId, objectName, filter, handback);
synchronized(this.listeners)
{
if (this.listeners.contains(l)==false)
{
this.listeners.add(l);
server.addNotificationListener(objectName,l,filter,handback);
}
}
}
public void removeNotificationListener (InvokerLocator clientLocator, String sessionId, ObjectName objectName, Object handback)
throws InstanceNotFoundException, ListenerNotFoundException
{
if (log.isDebugEnabled())
{
log.debug("removeNotificationListener called with clientLocator: "+clientLocator+", sessionId: "+sessionId+", objectName: "+objectName);
}
synchronized(this.listeners)
{
Iterator iter = listeners.iterator();
while (iter.hasNext())
{
Listener l=(Listener)iter.next();
if (l.locator.equals(clientLocator) && l.objectName.equals(objectName) && l.sessionId.equals(sessionId))
{
if (log.isDebugEnabled())
{
log.debug("remote notification listener removed for client ["+clientLocator+"] on objectName ["+objectName+"] and MBeanServerId ["+sessionId+"]");
}
iter.remove();
server.removeNotificationListener(objectName,l,l.filter,handback);
l.destroy();
l=null;
}
}
}
}
public NotificationQueue getNotifications (String sessionId)
{
synchronized(queue)
{
return (NotificationQueue)queue.remove(sessionId);
}
}
private final class Listener implements NotificationListener
{
final ObjectName objectName;
final Object handback;
final NotificationFilter filter;
final InvokerLocator locator;
final String sessionId;
private ClientInvoker clientInvoker;
private Client client;
private boolean asyncSend = false;
private LinkedQueue asyncQueue;
private int counter=0;
private BiDirectionClientNotificationSender biDirectionalSender;
Listener (InvokerLocator locator, String sessionId, ObjectName objectName, NotificationFilter filter, Object handback)
{
this.objectName = objectName;
this.filter = filter;
this.locator = locator;
this.sessionId = sessionId;
this.handback = handback;
if (serverInvoker.isTransportBiDirectional())
{
connectAsync();
}
}
synchronized void destroy ()
{
if (log.isDebugEnabled())
{
log.debug("destroy called on client ["+locator+"], session id ["+sessionId+"]");
}
try
{
removeNotificationListener(locator,sessionId,objectName,handback);
}
catch (Throwable e)
{
}
if (biDirectionalSender!=null)
{
biDirectionalSender.running = false;
biDirectionalSender.interrupt();
biDirectionalSender=null;
while (asyncQueue!=null && asyncQueue.isEmpty()==false)
{
try { asyncQueue.take(); } catch (InterruptedException ex) { break; }
}
asyncQueue=null;
}
if (client!=null)
{
try
{
client.disconnect();
}
finally
{
client = null;
}
}
}
private void connectAsync ()
{
try
{
if (log.isDebugEnabled())
{
log.debug("attempting an bi-directional connection back to client ["+locator+"], server id ["+sessionId+"]");
}
clientInvoker = InvokerRegistry.createClientInvoker(locator);
clientInvoker.connect();
client=new Client(Thread.currentThread().getContextClassLoader(),clientInvoker,Subsystem.JMX);
asyncQueue = new LinkedQueue();
biDirectionalSender = new BiDirectionClientNotificationSender();
biDirectionalSender.start();
asyncSend = true;
}
catch ( Throwable e )
{
log.debug("attempted a bi-directional connection back to client ["+locator+"], but it failed",e);
}
}
private final class BiDirectionClientNotificationSender extends Thread
{
private boolean running = true;
public void run ()
{
NotificationQueue nq=new NotificationQueue(sessionId);
int count = 0;
long lastTx = 0;
while (running)
{
try
{
while (count<10 && !asyncQueue.isEmpty())
{
NotificationEntry ne=(NotificationEntry)asyncQueue.take();
nq.add(ne);
count++;
counter++;
}
if ((count>10 || asyncQueue.isEmpty()||System.currentTimeMillis()-lastTx>=2000) && nq.isEmpty()==false)
{
try
{
if (log.isDebugEnabled())
{
log.debug("sending notification queue ["+nq+"] to client ["+locator+"] with sessionId ["+sessionId+"], counter="+counter+" ,count="+count);
}
lastTx = System.currentTimeMillis();
client.setSessionId(localServerId);
client.invoke(new NameBasedInvocation("$NOTIFICATIONS$",
new Object[]{nq},
new String[]{NotificationQueue.class.getName()}),
null);
}
catch (Throwable t)
{
if (t instanceof ConnectionFailedException)
{
if (log.isDebugEnabled())
{
log.debug("Client is dead during invocation");
}
Listener.this.destroy();
break;
}
else
{
log.warn("Error sending async notifications to client: "+locator,t);
}
}
finally
{
nq.clear();
count=0;
}
}
else if (asyncQueue.isEmpty())
{
if (log.isDebugEnabled())
{
log.debug("blocking on more notifications to arrive");
}
NotificationEntry ne=(NotificationEntry)asyncQueue.take();
nq.add(ne);
count+=1;
counter++;
}
}
catch (InterruptedException ex)
{
break;
}
}
}
}
public void handleNotification (Notification notification, Object o)
{
if (log.isDebugEnabled())
{
log.debug("("+(asyncSend?"async":"polling")+") notification received ..."+notification+" for client ["+locator+"]");
}
if (asyncSend==false)
{
NotificationQueue q = null;
synchronized(queue)
{
q = (NotificationQueue)queue.get(sessionId);
if (q==null)
{
q = new NotificationQueue(sessionId);
queue.put(sessionId,q);
}
if (log.isDebugEnabled())
{
log.debug("added notification to polling queue: "+notification+" for sessionId: "+sessionId);
}
q.add (new NotificationEntry(notification,handback));
}
}
else
{
if (asyncQueue!=null)
{
try
{
asyncQueue.put(new NotificationEntry(notification,handback));
}
catch (InterruptedException ie)
{
}
}
}
}
}
}