package org.jboss.ejb.plugins.jms;
import java.util.Hashtable;
import java.util.HashMap;
import java.util.Map;
import java.util.Enumeration;
import java.util.Iterator;
import javax.naming.Context;
import javax.jms.Session;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.QueueSender;
import javax.jms.Queue;
import javax.jms.Message;
import javax.jms.JMSException;
import javax.jms.Destination;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.Transaction;
import org.w3c.dom.Element;
import org.jboss.deployment.DeploymentException;
import org.jboss.metadata.MetaData;
import org.jboss.jms.jndi.JMSProviderAdapter;
import org.jboss.system.ServiceMBeanSupport;
public class DLQHandler
extends ServiceMBeanSupport
{
public static final String JBOSS_ORIG_DESTINATION = "JBOSS_ORIG_DESTINATION";
public static final String JBOSS_ORIG_MESSAGEID = "JBOSS_ORIG_MESSAGEID";
private static final String JMS_JBOSS_REDELIVERY_COUNT = "JMS_JBOSS_REDELIVERY_COUNT";
private static final String JMS_JBOSS_REDELIVERY_LIMIT = "JMS_JBOSS_REDELIVERY_LIMIT";
private String destinationJNDI = "queue/DLQ";
private int maxResent = 10;
private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
private int priority = Message.DEFAULT_PRIORITY;
private String dlqUser;
private String dlqPass;
private QueueConnection connection;
private Queue dlq;
private JMSProviderAdapter providerAdapter;
private Hashtable resentBuffer = new Hashtable();
public DLQHandler(final JMSProviderAdapter providerAdapter)
{
this.providerAdapter = providerAdapter;
}
protected void createService() throws Exception
{
Context ctx = providerAdapter.getInitialContext();
try
{
String factoryName = providerAdapter.getQueueFactoryRef();
QueueConnectionFactory factory = (QueueConnectionFactory)
ctx.lookup(factoryName);
log.debug("Using factory: " + factory);
if (dlqUser == null)
connection = factory.createQueueConnection();
else
connection = factory.createQueueConnection(dlqUser, dlqPass);
log.debug("Created connection: " + connection);
dlq = (Queue) ctx.lookup(destinationJNDI);
log.debug("Using Queue: " + dlq);
}
finally
{
ctx.close();
}
}
protected void startService() throws Exception
{
connection.start();
}
protected void stopService() throws Exception
{
connection.stop();
}
protected void destroyService() throws Exception
{
if (connection != null)
connection.close();
connection = null;
dlq = null;
providerAdapter = null;
}
public boolean handleRedeliveredMessage(final Message msg, final Transaction tx)
{
boolean handled = false;
int max = this.maxResent;
String id = null;
boolean jbossmq = true;
int count = 0;
try
{
if (msg.propertyExists(JMS_JBOSS_REDELIVERY_LIMIT))
max = msg.getIntProperty(JMS_JBOSS_REDELIVERY_LIMIT);
if (msg.propertyExists(JMS_JBOSS_REDELIVERY_COUNT))
count = msg.getIntProperty(JMS_JBOSS_REDELIVERY_COUNT);
else
{
id = msg.getJMSMessageID();
if (id == null)
{
log.error("Message id is null, can't handle message");
return false;
}
count = incrementResentCount(id);
jbossmq = false;
}
if (count > max)
{
id = msg.getJMSMessageID();
log.warn("Message resent too many times; sending it to DLQ; message id=" + id);
sendMessage(msg);
deleteFromBuffer(id);
handled = true;
}
else if (jbossmq == false && tx != null)
{
DLQSynchronization synch = new DLQSynchronization(id);
try
{
tx.registerSynchronization(synch);
}
catch (Exception e)
{
log.warn("Error registering DlQ Synchronization with transaction " + tx, e);
}
}
}
catch (JMSException e)
{
log.error("Could not send message to Dead Letter Queue", e);
}
return handled;
}
protected void sendMessage(Message msg) throws JMSException
{
boolean trace = log.isTraceEnabled();
QueueSession session = null;
QueueSender sender = null;
try
{
msg = makeWritable(msg, trace);
msg.setStringProperty(JBOSS_ORIG_MESSAGEID, msg.getJMSMessageID());
Destination d = msg.getJMSDestination();
if (d != null)
msg.setStringProperty(JBOSS_ORIG_DESTINATION, d.toString());
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
sender = session.createSender(dlq);
if (trace)
{
log.trace("Sending message to DLQ; destination=" +
dlq + ", session=" + session + ", sender=" + sender);
}
sender.send(msg, deliveryMode, priority, timeToLive);
if (trace)
{
log.trace("Message sent.");
}
}
finally
{
try
{
if (sender != null) sender.close();
if (session != null) session.close();
}
catch (Exception e)
{
log.warn("Failed to close sender or session; ignoring", e);
}
}
}
protected int incrementResentCount(String id)
{
BufferEntry entry = null;
boolean trace = log.isTraceEnabled();
if (!resentBuffer.containsKey(id))
{
if (trace)
log.trace("Making new entry for id " + id);
entry = new BufferEntry();
entry.id = id;
entry.count = 1;
resentBuffer.put(id, entry);
}
else
{
entry = (BufferEntry) resentBuffer.get(id);
entry.count++;
if (trace)
log.trace("Incremented old entry for id " + id + " count " + entry.count);
}
return entry.count;
}
protected void deleteFromBuffer(String id)
{
resentBuffer.remove(id);
}
protected Message makeWritable(Message msg, boolean trace) throws JMSException
{
HashMap tmp = new HashMap();
for (Enumeration en = msg.getPropertyNames(); en.hasMoreElements();)
{
String key = (String) en.nextElement();
tmp.put(key, msg.getObjectProperty(key));
}
msg.clearProperties();
Iterator i = tmp.entrySet().iterator();
while (i.hasNext())
{
Map.Entry me = (Map.Entry)i.next();
String key = (String) me.getKey();
try
{
msg.setObjectProperty(key, me.getValue());
}
catch (JMSException ignored)
{
if (trace)
log.trace("Could not copy message property " + key, ignored);
}
}
return msg;
}
public void importXml(final Element element) throws DeploymentException
{
destinationJNDI = MetaData.getElementContent
(MetaData.getUniqueChild(element, "DestinationQueue"));
try
{
String mr = MetaData.getElementContent
(MetaData.getUniqueChild(element, "MaxTimesRedelivered"));
maxResent = Integer.parseInt(mr);
}
catch (Exception ignore)
{
}
try
{
String ttl = MetaData.getElementContent
(MetaData.getUniqueChild(element, "TimeToLive"));
timeToLive = Long.parseLong(ttl);
if (timeToLive < 0)
{
log.warn("Invalid TimeToLive: " + timeToLive + "; using default");
timeToLive = Message.DEFAULT_TIME_TO_LIVE;
}
}
catch (Exception ignore)
{
}
dlqUser = MetaData.getElementContent(MetaData.getOptionalChild(element, "DLQUser"));
dlqPass = MetaData.getElementContent(MetaData.getOptionalChild(element, "DLQPassword"));
}
public String toString()
{
return super.toString() +
"{ destinationJNDI=" + destinationJNDI +
", maxResent=" + maxResent +
", timeToLive=" + timeToLive +
" }";
}
private static class BufferEntry
{
int count;
String id;
}
protected class DLQSynchronization
implements Synchronization
{
String id;
public DLQSynchronization(String id)
{
this.id = id;
}
public void beforeCompletion()
{
}
public void afterCompletion(int status)
{
if (status == Status.STATUS_COMMITTED)
deleteFromBuffer(id);
}
}
}