package org.jboss.ejb.plugins;
import java.lang.reflect.Method;
import java.security.Principal;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import javax.transaction.Transaction;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.jms.DeliveryMode;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Connection;
import javax.jms.JMSException;
import org.jboss.ejb.Container;
import org.jboss.invocation.Invocation;
import org.jboss.logging.Logger;
import org.jboss.monitor.MetricsConstants;
public class MetricsInterceptor extends AbstractInterceptor
implements MetricsConstants
{
private String applicationName = "<undefined>";
private String beanName = "<undefined>";
private Thread publisher = null;
private List msgQueue = new ArrayList(2000);
public void setContainer(Container container)
{
super.setContainer(container);
if (container != null)
{
applicationName = container.getEjbModule().getName();
beanName = container.getBeanMetaData().getJndiName();
}
}
public Object invokeHome(Invocation mi) throws Exception
{
long begin = System.currentTimeMillis();
try
{
return super.invokeHome(mi);
}
finally
{
if (mi.getMethod() != null)
{
addEntry(mi, begin, System.currentTimeMillis());
}
}
}
public Object invoke(Invocation mi) throws Exception
{
long begin = System.currentTimeMillis();
try
{
return super.invoke(mi);
}
finally
{
if (mi.getMethod() != null)
{
addEntry(mi, begin, System.currentTimeMillis());
}
}
}
public void create()
{
publisher = new Thread(new Publisher());
publisher.setName("Metrics Publisher Thread for " + beanName + ".");
publisher.setDaemon(true);
publisher.start();
}
public void destroy()
{
publisher.interrupt();
}
private final void addEntry(Invocation mi, long begin, long end)
{
Transaction tx = mi.getTransaction();
Principal princ = mi.getPrincipal();
Method method = mi.getMethod();
Entry start = new Entry(princ, method, tx, begin, "START");
Entry stop = new Entry(princ, method, tx, end, "STOP");
synchronized (msgQueue)
{
msgQueue.add(start);
msgQueue.add(stop);
}
}
private Message createMessage(Session session, String principal, int txID,
String method, String checkpoint, long time)
{
try
{
Message msg = session.createMessage();
msg.setJMSType(INVOCATION_METRICS);
msg.setStringProperty(CHECKPOINT, checkpoint);
msg.setStringProperty(BEAN, beanName);
msg.setObjectProperty(METHOD, method);
msg.setLongProperty(TIME, time);
if (txID != -1)
msg.setStringProperty("ID", String.valueOf(txID));
if (principal != null)
msg.setStringProperty("PRINCIPAL", principal);
return msg;
}
catch (Exception e)
{
return null;
}
}
public void sample(Object s)
{
}
public Map retrieveStatistic()
{
return null;
}
public void resetStatistic()
{
}
private class Publisher implements Runnable
{
private boolean running = true;
private int delay = 2000;
private TopicConnection connection = null;
public void run()
{
try
{
final boolean IS_TRANSACTED = true;
final int ACKNOWLEDGE_MODE = Session.DUPS_OK_ACKNOWLEDGE;
Context namingContext = new InitialContext();
TopicConnectionFactory fact = (TopicConnectionFactory) namingContext.lookup("java:/ConnectionFactory");
connection = fact.createTopicConnection();
Topic topic = (Topic) namingContext.lookup("topic/metrics");
TopicSession session = connection.createTopicSession(IS_TRANSACTED, ACKNOWLEDGE_MODE);
TopicPublisher pub = session.createPublisher(topic);
pub.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
pub.setPriority(Message.DEFAULT_PRIORITY);
pub.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
connection.start();
while (running)
{
Object[] array;
long sleepTime = delay;
try
{
Thread.sleep(sleepTime);
long begin = System.currentTimeMillis();
synchronized (msgQueue)
{
array = msgQueue.toArray();
msgQueue.clear();
}
for (int i = 0; i < array.length; ++i)
{
Message msg = createMessage(session,
((Entry) array[i]).principal,
((Entry) array[i]).id,
((Entry) array[i]).method,
((Entry) array[i]).checkpoint,
((Entry) array[i]).time
);
pub.publish(msg);
}
try
{
session.commit();
}
catch (Exception e)
{
}
long end = System.currentTimeMillis();
sleepTime = delay - (end - begin);
}
catch (InterruptedException e)
{
running = false;
}
}
}
catch (NamingException e)
{
log.warn(e);
}
catch (JMSException e)
{
log.warn(e);
}
finally
{
try
{
if (connection != null)
connection.close();
}
catch (JMSException e)
{
log.warn(e);
}
}
}
}
private final class Entry
{
int id = -1;
long time;
String principal = null;
String checkpoint;
String method;
Entry(Principal principal, Method method, Transaction tx, long time, String checkpoint)
{
this.time = time;
this.checkpoint = checkpoint;
this.method = method.getName();
if (tx != null)
this.id = tx.hashCode();
if (principal != null)
this.principal = principal.getName();
}
}
}