package org.jboss.ejb.plugins.jms;
import java.lang.reflect.Method;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.Collection;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.ServerSessionPool;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.management.MBeanServer;
import javax.management.Notification;
import javax.management.ObjectName;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.jboss.deployment.DeploymentException;
import org.jboss.ejb.Container;
import org.jboss.ejb.EJBProxyFactory;
import org.jboss.invocation.Invocation;
import org.jboss.invocation.InvocationType;
import org.jboss.jms.ConnectionFactoryHelper;
import org.jboss.jms.asf.ServerSessionPoolFactory;
import org.jboss.jms.asf.StdServerSessionPool;
import org.jboss.jms.jndi.JMSProviderAdapter;
import org.jboss.logging.Logger;
import org.jboss.metadata.ActivationConfigPropertyMetaData;
import org.jboss.metadata.InvokerProxyBindingMetaData;
import org.jboss.metadata.MessageDestinationMetaData;
import org.jboss.metadata.MessageDrivenMetaData;
import org.jboss.metadata.MetaData;
import org.jboss.system.ServiceMBeanSupport;
import org.w3c.dom.Element;
public class JMSContainerInvoker
extends ServiceMBeanSupport
implements EJBProxyFactory, JMSContainerInvokerMBean
{
private static final Logger log = Logger.getLogger(JMSContainerInvoker.class);
private static final String CONNECTING_NOTIFICATION = "org.jboss.ejb.plugins.jms.CONNECTING";
private static final String CONNECTED_NOTIFICATION = "org.jboss.ejb.plugins.jms.CONNECTED";
private static final String DISCONNECTING_NOTIFICATION = "org.jboss.ejb.plugins.jms.DISCONNECTING";
private static final String DISCONNECTED_NOTIFICATION = "org.jboss.ejb.plugins.jms.DISCONNECTED";
private static final String FAILURE_NOTIFICATION = "org.jboss.ejb.plugins.jms.FAILURE";
protected static Method ON_MESSAGE;
protected final static String DEFAULT_DESTINATION_TYPE = "javax.jms.Topic";
static
{
try
{
final Class type = MessageListener.class;
final Class arg = Message.class;
ON_MESSAGE = type.getMethod("onMessage", new Class[]{arg});
}
catch (Exception e)
{
e.printStackTrace();
throw new ExceptionInInitializerError(e);
}
}
protected boolean optimize;
protected int maxMessagesNr = 1;
protected int minPoolSize = 1;
protected long keepAlive = 30 * 1000;
protected int maxPoolSize = 15;
protected long reconnectInterval = 10000;
protected boolean useDLQ = false;
protected String providerAdapterJNDI;
protected String serverSessionPoolFactoryJNDI;
protected int acknowledgeMode;
protected boolean isContainerManagedTx;
protected boolean isNotSupportedTx;
protected Container container;
protected Connection connection;
protected ConnectionConsumer connectionConsumer;
protected TransactionManager tm;
protected ServerSessionPool pool;
protected ExceptionListenerImpl exListener;
protected DLQHandler dlqHandler;
protected Element dlqConfig;
protected InvokerProxyBindingMetaData invokerMetaData;
protected String invokerBinding;
protected boolean deliveryActive = true;
protected boolean createJBossMQDestination = true;
public void setInvokerMetaData(InvokerProxyBindingMetaData imd)
{
invokerMetaData = imd;
}
public void setInvokerBinding(String binding)
{
invokerBinding = binding;
}
public void setContainer(final Container container)
{
this.container = container;
}
public int getMinPoolSize()
{
return minPoolSize;
}
public void setMinPoolSize(int minPoolSize)
{
this.minPoolSize = minPoolSize;
}
public int getMaxPoolSize()
{
return maxPoolSize;
}
public void setMaxPoolSize(int maxPoolSize)
{
this.maxPoolSize = maxPoolSize;
}
public long getKeepAliveMillis()
{
return keepAlive;
}
public void setKeepAliveMillis(long keepAlive)
{
this.keepAlive = keepAlive;
}
public int getMaxMessages()
{
return maxMessagesNr;
}
public void setMaxMessages(int maxMessages)
{
this.maxMessagesNr = maxMessages;
}
public MessageDrivenMetaData getMetaData()
{
MessageDrivenMetaData config =
(MessageDrivenMetaData) container.getBeanMetaData();
return config;
}
public boolean getDeliveryActive()
{
return deliveryActive;
}
public boolean getCreateJBossMQDestination()
{
return createJBossMQDestination;
}
public void startDelivery()
throws Exception
{
if (getState() != STARTED)
throw new IllegalStateException("The MDB is not started");
if (deliveryActive)
return;
deliveryActive = true;
startService();
}
public void stopDelivery()
throws Exception
{
if (getState() != STARTED)
throw new IllegalStateException("The MDB is not started");
if (deliveryActive == false)
return;
deliveryActive = false;
stopService();
}
public void setOptimized(final boolean optimize)
{
if (log.isDebugEnabled())
log.debug("Container Invoker optimize set to " + optimize);
this.optimize = optimize;
}
public boolean isIdentical(Container container, Invocation mi)
{
throw new Error("Not valid for MessageDriven beans");
}
public Object getEJBHome()
{
throw new Error("Not valid for MessageDriven beans");
}
public javax.ejb.EJBMetaData getEJBMetaData()
{
throw new Error("Not valid for MessageDriven beans");
}
public Collection getEntityCollection(Collection ids)
{
throw new Error("Not valid for MessageDriven beans");
}
public Object getEntityEJBObject(Object id)
{
throw new Error("Not valid for MessageDriven beans");
}
public Object getStatefulSessionEJBObject(Object id)
{
throw new Error("Not valid for MessageDriven beans");
}
public Object getStatelessSessionEJBObject()
{
throw new Error("Not valid for MessageDriven beans");
}
public boolean isOptimized()
{
if (log.isDebugEnabled())
log.debug("Optimize in action: " + optimize);
return optimize;
}
public void importXml(final Element element) throws Exception
{
try
{
if ("false".equalsIgnoreCase(MetaData.getElementContent(MetaData.getUniqueChild(element, "CreateJBossMQDestination"))))
{
createJBossMQDestination = false;
}
}
catch (Exception ignore)
{
}
try
{
String maxMessages = MetaData.getElementContent
(MetaData.getUniqueChild(element, "MaxMessages"));
maxMessagesNr = Integer.parseInt(maxMessages);
}
catch (Exception ignore)
{
}
try
{
String minSize = MetaData.getElementContent
(MetaData.getUniqueChild(element, "MinimumSize"));
minPoolSize = Integer.parseInt(minSize);
}
catch (Exception ignore)
{
}
try
{
String maxSize = MetaData.getElementContent
(MetaData.getUniqueChild(element, "MaximumSize"));
maxPoolSize = Integer.parseInt(maxSize);
}
catch (Exception ignore)
{
}
try
{
String keepAliveMillis = MetaData.getElementContent
(MetaData.getUniqueChild(element, "KeepAliveMillis"));
keepAlive = Integer.parseInt(keepAliveMillis);
}
catch (Exception ignore)
{
}
Element mdbConfig = MetaData.getUniqueChild(element, "MDBConfig");
try
{
String reconnect = MetaData.getElementContent
(MetaData.getUniqueChild(mdbConfig, "ReconnectIntervalSec"));
reconnectInterval = Long.parseLong(reconnect) * 1000;
}
catch (Exception ignore)
{
}
try
{
if ("false".equalsIgnoreCase(MetaData.getElementContent(MetaData.getUniqueChild(mdbConfig, "DeliveryActive"))))
{
deliveryActive = false;
}
}
catch (Exception ignore)
{
}
Element dlqEl = MetaData.getOptionalChild(mdbConfig, "DLQConfig");
if (dlqEl != null)
{
dlqConfig = (Element) dlqEl.cloneNode(true);
useDLQ = true;
}
else
{
useDLQ = false;
}
providerAdapterJNDI = MetaData.getElementContent
(MetaData.getUniqueChild(element, "JMSProviderAdapterJNDI"));
serverSessionPoolFactoryJNDI = MetaData.getElementContent
(MetaData.getUniqueChild(element, "ServerSessionPoolFactoryJNDI"));
if (!providerAdapterJNDI.startsWith("java:/"))
{
providerAdapterJNDI = "java:/" + providerAdapterJNDI;
}
if (!serverSessionPoolFactoryJNDI.startsWith("java:/"))
{
serverSessionPoolFactoryJNDI = "java:/" + serverSessionPoolFactoryJNDI;
}
}
protected void createService() throws Exception
{
importXml(invokerMetaData.getProxyFactoryConfig());
exListener = new ExceptionListenerImpl(this);
}
private void innerCreate() throws Exception
{
log.debug("Initializing");
JMSProviderAdapter adapter = getJMSProviderAdapter();
log.debug("Provider adapter: " + adapter);
if (useDLQ)
{
dlqHandler = new DLQHandler(adapter);
dlqHandler.importXml(dlqConfig);
dlqHandler.create();
}
tm = container.getTransactionManager();
MessageDrivenMetaData config = getMetaData();
String messageSelector = config.getMessageSelector();
String activationConfig = getActivationConfigProperty("messageSelector");
if (activationConfig != null)
messageSelector = activationConfig;
String destinationType = config.getDestinationType();
activationConfig = getActivationConfigProperty("destinationType");
if (activationConfig != null)
destinationType = activationConfig;
isContainerManagedTx = config.isContainerManagedTx();
acknowledgeMode = config.getAcknowledgeMode();
activationConfig = getActivationConfigProperty("acknowledgeMode");
if (activationConfig != null)
{
if (activationConfig.equals("DUPS_OK_ACKNOWLEDGE"))
acknowledgeMode = MessageDrivenMetaData.DUPS_OK_ACKNOWLEDGE_MODE;
else
acknowledgeMode = MessageDrivenMetaData.AUTO_ACKNOWLEDGE_MODE;
}
byte txType = config.getMethodTransactionType("onMessage",
new Class[]{Message.class},
InvocationType.LOCAL);
isNotSupportedTx = txType == MetaData.TX_NOT_SUPPORTED;
String destinationJNDI = config.getDestinationJndiName();
activationConfig = getActivationConfigProperty("destination");
if (activationConfig != null)
destinationJNDI = activationConfig;
if (destinationJNDI == null)
{
String link = config.getDestinationLink();
if (link != null)
{
link = link.trim();
if (link.length() > 0)
{
MessageDestinationMetaData destinationMetaData = container.getMessageDestination(link);
if (destinationMetaData == null)
log.warn("Unresolved message-destination-link '" + link + "' no message-destination in ejb-jar.xml");
else
{
String jndiName = destinationMetaData.getJNDIName();
if (jndiName == null)
log.warn("The message-destination '" + link + "' has no jndi-name in jboss.xml");
else
destinationJNDI = jndiName;
}
}
}
}
String user = config.getUser();
String password = config.getPasswd();
Context context = adapter.getInitialContext();
log.debug("context: " + context);
if (context == null)
{
throw new RuntimeException("Failed to get the root context");
}
String jndiSuffix = parseJndiSuffix(destinationJNDI, config.getEjbName());
log.debug("jndiSuffix: " + jndiSuffix);
if (destinationType == null)
{
log.warn("No message-driven-destination given; using; guessing type");
destinationType = getDestinationType(context, destinationJNDI);
}
if ("javax.jms.Topic".equals(destinationType))
{
log.debug("Got destination type Topic for " + config.getEjbName());
Object factory = context.lookup(adapter.getTopicFactoryRef());
TopicConnection tConnection = null;
try
{
tConnection = ConnectionFactoryHelper.createTopicConnection(factory, user, password);
connection = tConnection;
}
catch (ClassCastException e)
{
throw new DeploymentException("Expected a TopicConnection check your provider adaptor: "
+ adapter.getTopicFactoryRef());
}
try
{
String clientId = config.getClientId();
activationConfig = getActivationConfigProperty("clientID");
if (activationConfig != null)
clientId = activationConfig;
log.debug("Using client id: " + clientId);
if (clientId != null && clientId.length() > 0)
connection.setClientID(clientId);
Topic topic = null;
try
{
if (destinationJNDI != null)
topic = (Topic) context.lookup(destinationJNDI);
else if (createJBossMQDestination == false)
throw new DeploymentException("Unable to determine destination for '" + container.getBeanMetaData().getEjbName()
+ "' use destination-jndi-name in jboss.xml, an activation config property or a message-destination-link");
}
catch (NamingException e)
{
if (createJBossMQDestination == false)
throw new DeploymentException("Could not find the topic destination-jndi-name=" + destinationJNDI, e);
log.warn("Could not find the topic destination-jndi-name=" + destinationJNDI, e);
}
catch (ClassCastException e)
{
throw new DeploymentException("Expected a Topic destination-jndi-name=" + destinationJNDI, e);
}
if (topic == null)
topic = (Topic) createDestination(Topic.class,
context,
"topic/" + jndiSuffix,
jndiSuffix);
pool = createSessionPool(
topic,
tConnection,
minPoolSize,
maxPoolSize,
keepAlive,
true, acknowledgeMode,
new MessageListenerImpl(this));
int subscriptionDurablity = config.getSubscriptionDurability();
activationConfig = getActivationConfigProperty("subscriptionDurability");
if (activationConfig != null)
{
if (activationConfig.equals("Durable"))
subscriptionDurablity = MessageDrivenMetaData.DURABLE_SUBSCRIPTION;
else
subscriptionDurablity = MessageDrivenMetaData.NON_DURABLE_SUBSCRIPTION;
}
if (subscriptionDurablity != MessageDrivenMetaData.DURABLE_SUBSCRIPTION)
{
connectionConsumer =
tConnection.createConnectionConsumer(topic,
messageSelector,
pool,
maxMessagesNr);
}
else
{
String durableName = config.getSubscriptionId();
activationConfig = getActivationConfigProperty("subscriptionName");
if (activationConfig != null)
durableName = activationConfig;
connectionConsumer =
tConnection.createDurableConnectionConsumer(topic,
durableName,
messageSelector,
pool,
maxMessagesNr);
}
log.debug("Topic connectionConsumer set up");
}
catch (Throwable t)
{
try
{
tConnection.close();
}
catch (Throwable ignored)
{
}
DeploymentException.rethrowAsDeploymentException("Error during topic setup", t);
}
}
else if ("javax.jms.Queue".equals(destinationType))
{
log.debug("Got destination type Queue for " + config.getEjbName());
Object qFactory = context.lookup(adapter.getQueueFactoryRef());
QueueConnection qConnection = null;
try
{
qConnection = ConnectionFactoryHelper.createQueueConnection(qFactory, user, password);
connection = qConnection;
}
catch (ClassCastException e)
{
throw new DeploymentException("Expected a QueueConnection check your provider adaptor: "
+ adapter.getQueueFactoryRef());
}
try
{
String clientId = config.getClientId();
activationConfig = getActivationConfigProperty("clientID");
if (activationConfig != null)
clientId = activationConfig;
log.debug("Using client id: " + clientId);
if (clientId != null && clientId.length() > 0)
connection.setClientID(clientId);
Queue queue = null;
try
{
if (destinationJNDI != null)
queue = (Queue) context.lookup(destinationJNDI);
else if (createJBossMQDestination == false)
throw new DeploymentException("Unable to determine destination for '" + container.getBeanMetaData().getEjbName()
+ "' use destination-jndi-name in jboss.xml, an activation config property or a message-destination-link");
}
catch (NamingException e)
{
if (createJBossMQDestination == false)
throw new DeploymentException("Could not find the queue destination-jndi-name=" + destinationJNDI, e);
log.warn("Could not find the queue destination-jndi-name=" + destinationJNDI);
}
catch (ClassCastException e)
{
throw new DeploymentException("Expected a Queue destination-jndi-name=" + destinationJNDI);
}
if (queue == null)
queue = (Queue) createDestination(Queue.class,
context,
"queue/" + jndiSuffix,
jndiSuffix);
pool = createSessionPool(
queue,
qConnection,
minPoolSize,
maxPoolSize,
keepAlive,
true, acknowledgeMode,
new MessageListenerImpl(this));
log.debug("Server session pool: " + pool);
connectionConsumer =
qConnection.createConnectionConsumer(queue,
messageSelector,
pool,
maxMessagesNr);
log.debug("Connection consumer: " + connectionConsumer);
}
catch (Throwable t)
{
try
{
qConnection.close();
}
catch (Throwable ignored)
{
}
DeploymentException.rethrowAsDeploymentException("Error during queue setup", t);
}
}
else
throw new DeploymentException("Unknown destination-type " + destinationType);
log.debug("Initialized with config " + toString());
context.close();
}
protected void startService() throws Exception
{
if (deliveryActive == false)
{
log.debug("Delivery is disabled");
return;
}
sendNotification(CONNECTING_NOTIFICATION, null);
try
{
innerCreate();
}
catch (final Throwable t)
{
exListener.handleFailure(t);
return;
}
finally
{
SecurityActions.clear();
}
try
{
if (dlqHandler != null)
dlqHandler.start();
if (connection != null)
{
connection.setExceptionListener(exListener);
connection.start();
}
sendNotification(CONNECTED_NOTIFICATION, null);
}
catch (Throwable t)
{
exListener.handleFailure(t);
}
}
protected void stopService() throws Exception
{
if (exListener != null)
{
exListener.stop();
}
innerStop();
if (dlqHandler != null)
{
dlqHandler.stop();
}
}
protected void innerStop()
{
log.debug("innerStop");
sendNotification(DISCONNECTING_NOTIFICATION, null);
try
{
if (connection != null)
{
connection.setExceptionListener(null);
log.debug("unset exception listener");
}
}
catch (Throwable t)
{
log.error("Could not set ExceptionListener to null", t);
}
try
{
if (connection != null)
{
connection.stop();
log.debug("connection stopped");
}
}
catch (Throwable t)
{
log.error("Could not stop JMS connection", t);
}
}
protected void destroyService() throws Exception
{
log.debug("destroyService");
try
{
if (connectionConsumer != null)
{
connectionConsumer.close();
}
}
catch (Throwable t)
{
log.error("Failed to close connection consumer", t);
}
connectionConsumer = null;
try
{
if (pool instanceof StdServerSessionPool)
{
StdServerSessionPool p = (StdServerSessionPool) pool;
p.clear();
}
}
catch (Throwable t)
{
log.error("Failed to clear session pool", t);
}
if (connection != null)
{
try
{
connection.close();
}
catch (Throwable t)
{
log.error("Failed to close connection", t);
}
}
connection = null;
try
{
if (dlqHandler != null)
{
dlqHandler.destroy();
}
}
catch (Throwable t)
{
log.error("Failed to close the dlq handler", t);
}
dlqHandler = null;
sendNotification(DISCONNECTED_NOTIFICATION, null);
}
public Object invoke(Object id,
Method m,
Object[] args,
Transaction tx,
Principal identity,
Object credential)
throws Exception
{
Invocation invocation = new Invocation(id, m, args, tx, identity, credential);
invocation.setType(InvocationType.LOCAL);
ClassLoader oldCL = TCLAction.UTIL.getContextClassLoader();
TCLAction.UTIL.setContextClassLoader(container.getClassLoader());
try
{
return container.invoke(invocation);
}
finally
{
TCLAction.UTIL.setContextClassLoader(oldCL);
}
}
protected String getDestinationType(Context ctx, String destinationJNDI)
{
String destType = null;
if (destinationJNDI != null)
{
try
{
Destination dest = (Destination) ctx.lookup(destinationJNDI);
if (dest instanceof javax.jms.Topic)
{
destType = "javax.jms.Topic";
}
else if (dest instanceof javax.jms.Queue)
{
destType = "javax.jms.Queue";
}
}
catch (NamingException ex)
{
log.debug("Could not do heristic lookup of destination ", ex);
}
}
if (destType == null)
{
log.warn("Could not determine destination type, defaults to: " +
DEFAULT_DESTINATION_TYPE);
destType = DEFAULT_DESTINATION_TYPE;
}
return destType;
}
protected JMSProviderAdapter getJMSProviderAdapter()
throws NamingException
{
Context context = new InitialContext();
try
{
log.debug("Looking up provider adapter: " + providerAdapterJNDI);
return (JMSProviderAdapter) context.lookup(providerAdapterJNDI);
}
finally
{
context.close();
}
}
protected Destination createDestination(final Class type,
final Context ctx,
final String jndiName,
final String jndiSuffix)
throws Exception
{
try
{
return (Destination) ctx.lookup(jndiName);
}
catch (NamingException e)
{
log.warn("destination not found: " + jndiName + " reason: " + e);
log.warn("creating a new temporary destination: " + jndiName);
MBeanServer server = org.jboss.mx.util.MBeanServerLocator.locateJBoss();
String methodName;
if (type == Topic.class)
{
methodName = "createTopic";
}
else if (type == Queue.class)
{
methodName = "createQueue";
}
else
{
throw new IllegalArgumentException
("Expected javax.jms.Queue or javax.jms.Topic: " + type);
}
server.invoke(new ObjectName("jboss.mq:service=DestinationManager"),
methodName,
new Object[]{jndiSuffix},
new String[]{"java.lang.String"});
return (Destination) ctx.lookup(jndiName);
}
}
protected String getActivationConfigProperty(String property)
{
MessageDrivenMetaData mdmd = getMetaData();
ActivationConfigPropertyMetaData acpmd = mdmd.getActivationConfigProperty(property);
if (acpmd != null)
return acpmd.getValue();
else
return null;
}
protected ServerSessionPool createSessionPool(
final Destination destination,
final Connection connection,
final int minSession,
final int maxSession,
final long keepAlive,
final boolean isTransacted,
final int ack,
final MessageListener listener)
throws NamingException, JMSException
{
ServerSessionPool pool;
Context context = new InitialContext();
try
{
log.debug("looking up session pool factory: " +
serverSessionPoolFactoryJNDI);
ServerSessionPoolFactory factory = (ServerSessionPoolFactory)
context.lookup(serverSessionPoolFactoryJNDI);
pool = factory.getServerSessionPool(destination, connection, minSession, maxSession, keepAlive, isTransacted, ack, !isContainerManagedTx || isNotSupportedTx, listener);
}
finally
{
context.close();
}
return pool;
}
protected void sendNotification(String event, Object userData)
{
Notification notif = new Notification(event, getServiceName(), getNextNotificationSequenceNumber());
notif.setUserData(userData);
sendNotification(notif);
}
protected String parseJndiSuffix(final String jndiname,
final String defautSuffix)
{
String jndiSuffix = "";
if (jndiname != null)
{
int indexOfSlash = jndiname.indexOf("/");
if (indexOfSlash != -1)
{
jndiSuffix = jndiname.substring(indexOfSlash + 1);
}
else
{
jndiSuffix = jndiname;
}
}
else
{
jndiSuffix = defautSuffix;
}
return jndiSuffix;
}
class MessageListenerImpl
implements MessageListener
{
JMSContainerInvoker invoker;
MessageListenerImpl(final JMSContainerInvoker invoker)
{
this.invoker = invoker;
}
public void onMessage(final Message message)
{
if (log.isTraceEnabled())
{
log.trace("processing message: " + message);
}
Object id;
try
{
id = message.getJMSMessageID();
}
catch (JMSException e)
{
id = "JMSContainerInvoker";
}
try
{
Transaction tx = tm.getTransaction();
if (useDLQ && message.getJMSRedelivered() && dlqHandler.handleRedeliveredMessage(message, tx)) {
return;
}
invoker.invoke(id, ON_MESSAGE, new Object[]{message}, tx, null, null);
}
catch (Exception e)
{
log.error("Exception in JMSCI message listener", e);
}
}
}
class ExceptionListenerImpl
implements ExceptionListener, Runnable
{
JMSContainerInvoker invoker;
Thread currentThread;
boolean notStoped = true;
Throwable failure = null;
ExceptionListenerImpl(final JMSContainerInvoker invoker)
{
this.invoker = invoker;
}
public void onException(JMSException ex)
{
handleFailure(ex);
}
public void handleFailure(Throwable t)
{
log.warn("JMS provider failure detected: ", t);
failure = t;
MessageDrivenMetaData metaData = invoker.getMetaData();
String name = "JMSContainerInvoker("+metaData.getEjbName()+") Reconnect";
Thread retryThread = new Thread(this, name);
retryThread.setDaemon(true);
retryThread.start();
}
public void run()
{
currentThread = Thread.currentThread();
boolean tryIt = true;
while (tryIt && notStoped)
{
try
{
invoker.innerStop();
}
catch (Throwable t)
{
log.error("Unhandled error stopping connection", t);
}
try
{
invoker.destroyService();
}
catch (Throwable t)
{
log.error("Unhandled error closing connection", t);
}
sendNotification(FAILURE_NOTIFICATION, failure);
try
{
log.debug("Waiting for reconnect internal " + reconnectInterval + " ms");
try
{
Thread.sleep(reconnectInterval);
}
catch (InterruptedException ie)
{
tryIt = false;
return;
}
log.info("Trying to reconnect to JMS provider");
invoker.startService();
tryIt = false;
log.info("Reconnected to JMS provider");
}
catch (Exception e)
{
log.error("Reconnect failed: JMS provider failure detected:", e);
}
}
currentThread = null;
}
void stop()
{
log.debug("Stop requested");
notStoped = false;
if (currentThread != null)
{
currentThread.interrupt();
log.debug("Current thread interrupted");
}
}
}
public String toString()
{
MessageDrivenMetaData metaData = getMetaData();
String destinationJNDI = metaData.getDestinationJndiName();
return super.toString() +
"{ maxMessagesNr=" + maxMessagesNr +
", maxPoolSize=" + maxPoolSize +
", reconnectInterval=" + reconnectInterval +
", providerAdapterJNDI=" + providerAdapterJNDI +
", serverSessionPoolFactoryJNDI=" + serverSessionPoolFactoryJNDI +
", acknowledgeMode=" + acknowledgeMode +
", isContainerManagedTx=" + isContainerManagedTx +
", isNotSupportedTx=" + isNotSupportedTx +
", useDLQ=" + useDLQ +
", dlqHandler=" + dlqHandler +
", destinationJNDI=" + destinationJNDI +
" }";
}
interface TCLAction
{
class UTIL
{
static TCLAction getTCLAction()
{
return System.getSecurityManager() == null ? NON_PRIVILEGED : PRIVILEGED;
}
static ClassLoader getContextClassLoader()
{
return getTCLAction().getContextClassLoader();
}
static ClassLoader getContextClassLoader(Thread thread)
{
return getTCLAction().getContextClassLoader(thread);
}
static void setContextClassLoader(ClassLoader cl)
{
getTCLAction().setContextClassLoader(cl);
}
static void setContextClassLoader(Thread thread, ClassLoader cl)
{
getTCLAction().setContextClassLoader(thread, cl);
}
}
TCLAction NON_PRIVILEGED = new TCLAction()
{
public ClassLoader getContextClassLoader()
{
return Thread.currentThread().getContextClassLoader();
}
public ClassLoader getContextClassLoader(Thread thread)
{
return thread.getContextClassLoader();
}
public void setContextClassLoader(ClassLoader cl)
{
Thread.currentThread().setContextClassLoader(cl);
}
public void setContextClassLoader(Thread thread, ClassLoader cl)
{
thread.setContextClassLoader(cl);
}
};
TCLAction PRIVILEGED = new TCLAction()
{
private final PrivilegedAction getTCLPrivilegedAction = new PrivilegedAction()
{
public Object run()
{
return Thread.currentThread().getContextClassLoader();
}
};
public ClassLoader getContextClassLoader()
{
return (ClassLoader) AccessController.doPrivileged(getTCLPrivilegedAction);
}
public ClassLoader getContextClassLoader(final Thread thread)
{
return (ClassLoader) AccessController.doPrivileged(new PrivilegedAction()
{
public Object run()
{
return thread.getContextClassLoader();
}
});
}
public void setContextClassLoader(final ClassLoader cl)
{
AccessController.doPrivileged(new PrivilegedAction()
{
public Object run()
{
Thread.currentThread().setContextClassLoader(cl);
return null;
}
});
}
public void setContextClassLoader(final Thread thread, final ClassLoader cl)
{
AccessController.doPrivileged(new PrivilegedAction()
{
public Object run()
{
thread.setContextClassLoader(cl);
return null;
}
});
}
};
ClassLoader getContextClassLoader();
ClassLoader getContextClassLoader(Thread thread);
void setContextClassLoader(ClassLoader cl);
void setContextClassLoader(Thread thread, ClassLoader cl);
}
}