package org.jboss.cache.invalidation.bridges;
import java.io.Serializable;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Iterator;
import javax.jms.MessageListener;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.jms.TopicPublisher;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.TopicConnectionFactory;
import javax.jms.JMSException;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.naming.Context;
import org.jboss.cache.invalidation.InvalidationManager;
import org.jboss.cache.invalidation.InvalidationBridgeListener;
import org.jboss.cache.invalidation.BatchInvalidation;
import org.jboss.system.ServiceMBeanSupport;
public class JMSCacheInvalidationBridge
extends ServiceMBeanSupport
implements JMSCacheInvalidationBridgeMBean,
InvalidationBridgeListener,
MessageListener
{
public static final String JMS_CACHE_INVALIDATION_BRIDGE = "JMS_CACHE_INVALIDATION_BRIDGE";
protected org.jboss.cache.invalidation.InvalidationManagerMBean invalMgr = null;
protected org.jboss.cache.invalidation.BridgeInvalidationSubscription invalidationSubscription = null;
protected String invalidationManagerName = InvalidationManager.DEFAULT_JMX_SERVICE_NAME;
protected boolean publishingAuthorized = false;
protected String connectionFactoryName = "java:/ConnectionFactory";
protected String topicName = "topic/JMSCacheInvalidationBridge";
protected boolean transacted = true;
protected int acknowledgeMode = TopicSession.AUTO_ACKNOWLEDGE; protected int propagationMode = JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION;
protected java.rmi.dgc.VMID serviceId = new java.rmi.dgc.VMID();
protected TopicConnection conn = null;
protected TopicSession session = null;
protected Topic topic = null;
protected TopicSubscriber subscriber = null;
protected TopicPublisher pub = null;
protected String providerUrl = null;
public JMSCacheInvalidationBridge () { super (); }
public String getInvalidationManager ()
{
return this.invalidationManagerName;
}
public void setInvalidationManager (String objectName)
{
this.invalidationManagerName = objectName;
}
public String getConnectionFactoryName ()
{
return this.connectionFactoryName;
}
public void setConnectionFactoryName (String factoryName)
{
this.connectionFactoryName = factoryName;
}
public String getTopicName ()
{
return this.topicName;
}
public void setTopicName (String topicName)
{
this.topicName = topicName;
}
public String getProviderUrl ()
{
return providerUrl;
}
public void setProviderUrl (String providerUrl)
{
this.providerUrl = providerUrl;
}
public boolean isTransacted ()
{
return this.transacted;
}
public void setTransacted (boolean isTransacted)
{
this.transacted = isTransacted;
}
public int getAcknowledgeMode ()
{
return this.acknowledgeMode;
}
public void setAcknowledgeMode (int ackMode)
{
if (ackMode > 3 || ackMode < 1)
throw new RuntimeException ("Value AcknowledgeMode must be between 1 and 3");
switch (ackMode)
{
case 1: this.acknowledgeMode = TopicSession.AUTO_ACKNOWLEDGE; break;
case 2: this.acknowledgeMode = TopicSession.CLIENT_ACKNOWLEDGE; break;
case 3: this.acknowledgeMode = TopicSession.DUPS_OK_ACKNOWLEDGE; break;
}
}
public int getPropagationMode ()
{
return this.propagationMode;
}
public void setPropagationMode (int propMode)
{
if (propMode > 3 || propMode < 1)
throw new RuntimeException ("Value PropagationMode must be between 1 and 3");
this.propagationMode = propMode;
}
public void onMessage(Message msg)
{
if (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION)
{
try
{
ObjectMessage objmsg = (ObjectMessage)msg;
if (!objmsg.getJMSType().equals(JMS_CACHE_INVALIDATION_BRIDGE)) return;
JMSCacheInvalidationMessage content = (JMSCacheInvalidationMessage)objmsg.getObject();
if (!content.emitter.equals (this.serviceId))
{
if(content.invalidateAllGroupName != null)
{
invalidationSubscription.invalidateAll(content.invalidateAllGroupName);
}
else
{
invalidationSubscription.batchInvalidate (content.getInvalidations ());
}
}
}
catch (Exception ex)
{
log.warn(ex.getMessage());
}
}
}
public void batchInvalidate (BatchInvalidation[] invalidations, boolean asynchronous)
{
if ( (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
this.propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
&& this.publishingAuthorized)
{
JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage (this.serviceId, invalidations);
this.sendJMSInvalidationEvent (msg);
}
}
public void invalidate (String invalidationGroupName, Serializable[] keys, boolean asynchronous)
{
if ( (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
this.propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
&& this.publishingAuthorized)
{
JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage (
this.serviceId,
invalidationGroupName,
keys);
this.sendJMSInvalidationEvent (msg);
}
}
public void invalidate (String invalidationGroupName, Serializable key, boolean asynchronous)
{
if ( (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
this.propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
&& this.publishingAuthorized)
{
JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage (
this.serviceId,
invalidationGroupName,
new Serializable[] {key} );
this.sendJMSInvalidationEvent (msg);
}
}
public void invalidateAll(String groupName, boolean asynchronous)
{
if ( (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
this.propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
&& this.publishingAuthorized)
{
JMSCacheInvalidationMessage msg = new JMSCacheInvalidationMessage(
this.serviceId,
groupName
);
this.sendJMSInvalidationEvent (msg);
}
}
public void newGroupCreated (String groupInvalidationName)
{
}
public void groupIsDropped (String groupInvalidationName)
{
}
protected void startService () throws Exception
{
log.info("Starting JMS cache invalidation bridge");
this.invalMgr = (org.jboss.cache.invalidation.InvalidationManagerMBean)
org.jboss.system.Registry.lookup (this.invalidationManagerName);
this.invalidationSubscription = invalMgr.registerBridgeListener (this);
InitialContext iniCtx = getInitialContext ();
Object tmp = iniCtx.lookup(this.connectionFactoryName);
TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
conn = tcf.createTopicConnection();
topic = (Topic) iniCtx.lookup(this.topicName);
session = conn.createTopicSession(this.transacted,
this.acknowledgeMode);
conn.start();
if (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION)
{
this.subscriber = session.createSubscriber(topic);
this.subscriber.setMessageListener(this);
}
if (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
this.propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
{
this.pub = session.createPublisher(topic);
this.publishingAuthorized = true;
}
}
protected void stopService ()
{
log.info ("Stoping JMS cache invalidation bridge");
try
{
if (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_ONLY_BRIDGE_PROPAGATION)
{
subscriber.close();
}
if (this.propagationMode == JMSCacheInvalidationBridgeMBean.IN_OUT_BRIDGE_PROPAGATION ||
this.propagationMode == JMSCacheInvalidationBridgeMBean.OUT_ONLY_BRIDGE_PROPAGATION)
{
this.publishingAuthorized = false;
pub.close();
}
conn.stop();
session.close();
conn.close();
}
catch (Exception ex)
{
log.warn("Failed to stop JMS resources associated with the JMS bridge: ", ex);
}
}
protected synchronized TopicSession getSession()
{
return this.session;
}
protected synchronized TopicPublisher getPublisher()
{
return this.pub;
}
protected void sendJMSInvalidationEvent(JMSCacheInvalidationMessage invalidationMsg)
{
try
{
if (log.isTraceEnabled ())
log.trace("sending JMS message for cache invalidation" + invalidationMsg);
try
{
ObjectMessage msg = getSession().createObjectMessage();
msg.setJMSType(JMS_CACHE_INVALIDATION_BRIDGE);
msg.setObject(invalidationMsg);
getPublisher().publish(msg);
}
catch (JMSException ex)
{
log.debug("failed to publish seppuku event: ", ex);
}
}
catch (Exception ex)
{
log.warn("failed to do cluster seppuku event: " , ex);
}
}
protected InitialContext getInitialContext()
throws NamingException
{
if (providerUrl == null)
{
return new InitialContext();
}
else
{
if (log.isDebugEnabled())
log.debug("Using Context.PROVIDER_URL: " + providerUrl);
java.util.Properties props = new java.util.Properties(System.getProperties());
props.put(Context.PROVIDER_URL, providerUrl);
return new InitialContext(props);
}
}
}