The JMS API stands for Java Message Service Application Programming Interface, and it is used by applications to send asynchronous business-quality messages to other applications. In the JMS world, messages are not sent directly to other applications. Instead, messages are sent to destinations, also known as queues or topics. Applications sending messages do not need to worry if the receiving applications are up and running, and conversely, receiving applications do not need to worry about the sending application's status. Both senders, and receivers only interact with the destinations.
The JMS API is the standardized interface to a JMS provider, sometimes called a Message Oriented Middleware (MOM) system. JBoss comes with a JMS 1.0.2b compliant JMS provider called JBoss Messaging or JBossMQ. When you use the JMS API with JBoss, you are using the JBoss Messaging engine transparently. JBoss Messaging fully implements the JMS specification; therefore, the best JBoss Messaging user guide is the JMS specification. For more information about the JMS API please visit the JMS Tutorial or JMS Downloads & Specifications.
This chapter focuses on the JBoss specific aspects of using JMS and message driven beans as well as the JBoss Messaging configuration and MBeans.
In this section we discuss the basics needed to use the JBoss JMS implementation. JMS leaves the details of accessing JMS connection factories and destinations as provider specific details. What you need to know to use the JBoss Messaging layer is:
In the following subsections we will look at examples of the various JMS messaging models and message driven beans. The chapter example source is located under the src/main/org/jboss/chap6 directory of the book examples.
Let's start out with a point-to-point (P2P) example. In the P2P model, a sender delivers messages to a queue and a single receiver pulls the message off of the queue. The receiver does not need to be listening to the queue at the time the message is sent. Example 6.1, “A P2P JMS client example” shows a complete P2P example that sends a javax.jms.TextMessage to a the queue queue/testQueue and asynchronously receives the message from the same queue.
Example 6.1. A P2P JMS client example
package org.jboss.chap6.ex1; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; import EDU.oswego.cs.dl.util.concurrent.CountDown; /** * A complete JMS client example program that sends a TextMessage to * a Queue and asynchronously receives the message from the same * Queue. * * @author Scott.Stark@jboss.org * @version $Revision: 1.10 $ */ public class SendRecvClient { static CountDown done = new CountDown(1); QueueConnection conn; QueueSession session; Queue que; public static class ExListener implements MessageListener { public void onMessage(Message msg) { done.release(); TextMessage tm = (TextMessage) msg; try { System.out.println("onMessage, recv text=" + tm.getText()); } catch(Throwable t) { t.printStackTrace(); } } } public void setupPTP() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); QueueConnectionFactory qcf = (QueueConnectionFactory) tmp; conn = qcf.createQueueConnection(); que = (Queue) iniCtx.lookup("queue/testQueue"); session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); conn.start(); } public void sendRecvAsync(String text) throws JMSException, NamingException { System.out.println("Begin sendRecvAsync"); // Setup the PTP connection, session setupPTP(); // Set the async listener QueueReceiver recv = session.createReceiver(que); recv.setMessageListener(new ExListener()); // Send a text msg QueueSender send = session.createSender(que); TextMessage tm = session.createTextMessage(text); send.send(tm); System.out.println("sendRecvAsync, sent text=" + tm.getText()); send.close(); System.out.println("End sendRecvAsync"); } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { SendRecvClient client = new SendRecvClient(); client.sendRecvAsync("A text msg"); client.done.acquire(); client.stop(); System.exit(0); } }
The client may be run using the following command line:
[nr@toki examples]$ ant -Dchap=chap6 -Dex=1p2p run-example ... run-example1p2p: [java] [INFO,SendRecvClient] Begin SendRecvClient, now=1098416473521 [java] [INFO,SendRecvClient] Begin sendRecvAsync [java] [INFO,SendRecvClient] onMessage, recv text=A text msg [java] [INFO,SendRecvClient] sendRecvAsync, sent text=A text msg [java] [INFO,SendRecvClient] End sendRecvAsync [java] [INFO,SendRecvClient] End SendRecvClient
The JMS publish/subscribe (Pub-Sub) message model is a one-to-many model. A publisher sends a message to a topic and all active subscribers of the topic receive the message. Subscribers that are not actively listening to the topic will miss the published message. shows a complete JMS client that sends a javax.jms.TextMessage to a topic and asynchronously receives the message from the same topic.
Example 6.2. A Pub-Sub JMS client example
package org.jboss.chap6.ex1; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSubscriber; import javax.jms.TopicSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; import EDU.oswego.cs.dl.util.concurrent.CountDown; /** * A complete JMS client example program that sends a TextMessage to * a Topic and asynchronously receives the message from the same * Topic. * * @author Scott.Stark@jboss.org * @version $Revision: 1.10 $ */ public class TopicSendRecvClient { static CountDown done = new CountDown(1); TopicConnection conn = null; TopicSession session = null; Topic topic = null; public static class ExListener implements MessageListener { public void onMessage(Message msg) { done.release(); TextMessage tm = (TextMessage) msg; try { System.out.println("onMessage, recv text=" + tm.getText()); } catch(Throwable t) { t.printStackTrace(); } } } public void setupPubSub() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; conn = tcf.createTopicConnection(); topic = (Topic) iniCtx.lookup("topic/testTopic"); session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); conn.start(); } public void sendRecvAsync(String text) throws JMSException, NamingException { System.out.println("Begin sendRecvAsync"); // Setup the PubSub connection, session setupPubSub(); // Set the async listener TopicSubscriber recv = session.createSubscriber(topic); recv.setMessageListener(new ExListener()); // Send a text msg TopicPublisher send = session.createPublisher(topic); TextMessage tm = session.createTextMessage(text); send.publish(tm); System.out.println("sendRecvAsync, sent text=" + tm.getText()); send.close(); System.out.println("End sendRecvAsync"); } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin TopicSendRecvClient, now=" + System.currentTimeMillis()); TopicSendRecvClient client = new TopicSendRecvClient(); client.sendRecvAsync("A text msg, now="+System.currentTimeMillis()); client.done.acquire(); client.stop(); System.out.println("End TopicSendRecvClient"); System.exit(0); } }
The client may be run using the following command line:
[nr@toki examples]$ ant -Dchap=chap6 -Dex=1ps run-example ... run-example1ps: [java] Begin TopicSendRecvClient, now=1098416563162 [java] Begin sendRecvAsync [java] onMessage, recv text=A text msg, now=1098416563171 [java] sendRecvAsync, sent text=A text msg, now=1098416563171 [java] End sendRecvAsync [java] End TopicSendRecvClient
Now let's break the publisher and subscribers into separate programs to demonstrate that subscribers only receive messages while they are listening to a topic. Example 6.3, “A JMS publisher client” shows a variation of the previous pub-sub client that only publishes messages to the topic/testTopic topic. The subscriber only client is shown in Example 6.3, “A JMS publisher client”.
Example 6.3. A JMS publisher client
package org.jboss.chap6.ex1; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSlistubscriber; import javax.jms.TopicSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; /** * A JMS client example program that sends a TextMessage to a Topic * * @author Scott.Stark@jboss.org * @version $Revision: 1.10 $ */ public class TopicSendClient { TopicConnection conn = null; TopicSession session = null; Topic topic = null; public void setupPubSub() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; conn = tcf.createTopicConnection(); topic = (Topic) iniCtx.lookup("topic/testTopic"); session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); conn.start(); } public void sendAsync(String text) throws JMSException, NamingException { System.out.println("Begin sendAsync"); // Setup the pub/sub connection, session setupPubSub(); // Send a text msg TopicPublisher send = session.createPublisher(topic); TextMessage tm = session.createTextMessage(text); send.publish(tm); System.out.println("sendAsync, sent text=" + tm.getText()); send.close(); System.out.println("End sendAsync"); } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin TopicSendClient, now=" + System.currentTimeMillis()); TopicSendClient client = new TopicSendClient(); client.sendAsync("A text msg, now="+System.currentTimeMillis()); client.stop(); System.out.println("End TopicSendClient"); System.exit(0); } }
Example 6.4. A JMS subscriber client
package org.jboss.chap6.ex1; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSubscriber; import javax.jms.TopicSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; /** * A JMS client example program that synchronously receives a message a Topic * * @author Scott.Stark@jboss.org * @version $Revision: 1.10 $ */ public class TopicRecvClient { TopicConnection conn = null; TopicSession session = null; Topic topic = null; public void setupPubSub() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; conn = tcf.createTopicConnection(); topic = (Topic) iniCtx.lookup("topic/testTopic"); session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); conn.start(); } public void recvSync() throws JMSException, NamingException { System.out.println("Begin recvSync"); // Setup the pub/sub connection, session setupPubSub(); // Wait upto 5 seconds for the message TopicSubscriber recv = session.createSubscriber(topic); Message msg = recv.receive(5000); if (msg == null) { System.out.println("Timed out waiting for msg"); } else { System.out.println("TopicSubscriber.recv, msgt="+msg); } } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin TopicRecvClient, now=" + System.currentTimeMillis()); TopicRecvClient client = new TopicRecvClient(); client.recvSync(); client.stop(); System.out.println("End TopicRecvClient"); System.exit(0); } }
Run the TopicSendClient followed by the TopicRecvClient as follows:
[nr@toki examples]$ ant -Dchap=chap6 -Dex=1ps2 run-example ... run-example1ps2: [java] Begin TopicSendClient, now=1098416676618 [java] Begin sendAsync [java] sendAsync, sent text=A text msg, now=1098416676621 [java] End sendAsync [java] End TopicSendClient [java] Begin TopicRecvClient, now=1098416683857 [java] Begin recvSync [java] Timed out waiting for msg [java] End TopicRecvClient
The output shows that the topic subscriber client (TopicRecvClient) fails to receive the message sent by the publisher due to a timeout.
JMS supports a messaging model that is a cross between the P2P and pub-sub models. When a pub-sub client wants to receive all messages posted to the topic it subscribes to even when it is not actively listening to the topic, the client may achieve this behavior using a durable topic. Let's look at a variation of the preceding subscriber client that uses a durable topic to ensure that it receives all messages, include those published when the client is not listening to the topic. Example 6.5, “A durable topic JMS client example” shows the durable topic client with the key differences between the Example 6.4, “A JMS subscriber client” client highlighted in bold.
Example 6.5. A durable topic JMS client example
package org.jboss.chap6.ex1; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSubscriber; import javax.jms.TopicSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; /** * A JMS client example program that synchronously receives a message a Topic * * @author Scott.Stark@jboss.org * @version $Revision: 1.10 $ */ public class DurableTopicRecvClient { TopicConnection conn = null; TopicSession session = null; Topic topic = null; public void setupPubSub() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; conn = tcf.createTopicConnection("john", "needle"); topic = (Topic) iniCtx.lookup("topic/testTopic"); session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); conn.start(); } public void recvSync() throws JMSException, NamingException { System.out.println("Begin recvSync"); // Setup the pub/sub connection, session setupPubSub(); // Wait upto 5 seconds for the message TopicSubscriber recv = session.createDurableSubscriber(topic, "chap6-ex1dtps"); Message msg = recv.receive(5000); if (msg == null) { System.out.println("Timed out waiting for msg"); } else { System.out.println("DurableTopicRecvClient.recv, msgt=" + msg); } } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin DurableTopicRecvClient, now=" + System.currentTimeMillis()); DurableTopicRecvClient client = new DurableTopicRecvClient(); client.recvSync(); client.stop(); System.out.println("End DurableTopicRecvClient"); System.exit(0); } }
Now run the previous topic publisher with the durable topic subscriber as follows:
[nr@toki examples]$ ant -Dchap=chap6 -Dex=1psdt run-example run-example1psdt: [java] Begin DurableTopicSetup [java] End DurableTopicSetup [java] Begin TopicSendClient, now=1098420531772 [java] Begin sendAsync [java] sendAsync, sent text=A text msg, now=1098420531775 [java] End sendAsync [java] End TopicSendClient [java] Begin DurableTopicRecvClient, now=1098420538269 [java] Begin recvSync [java] DurableTopicRecvClient.recv, msgt=SpyTextMessage { [java] Header { [java] jmsDestination : TOPIC.testTopic.DurableSubscription[clientId=DurableSubscriberExample name=chap6-ex1dtps selector=null] [java] jmsDeliveryMode : 2 [java] jmsExpiration : 0 [java] jmsPriority : 4 [java] jmsMessageID : ID:29-10984205372121 [java] jmsTimeStamp : 1098420537212 [java] jmsCorrelationID: null [java] jmsReplyTo : null [java] jmsType : null [java] jmsRedelivered : false [java] jmsProperties : {} [java] jmsPropReadWrite: false [java] msgReadOnly : true [java] producerClientId: ID:29 [java] } [java] Body { [java] text :A text msg, now=1098420531775 [java] } [java] } [java] End DurableTopicRecvClient
Items of note for the durable topic example include:
The EJB 2.0 specification added the notion of message driven beans (MDB). A MDB is a business component that may be invoked asynchronously. As of the EJB 2.0 specification, JMS was the only mechanism by which MDBs could be accessed. Example 6.6, “A TextMessage processing MDB” shows an MDB that transforms the TextMessages it receives and sends the transformed messages to the queue found in the incoming message JMSReplyTo header.
Example 6.6. A TextMessage processing MDB
package org.jboss.chap6.ex2; import javax.ejb.MessageDrivenBean; import javax.ejb.MessageDrivenContext; import javax.ejb.EJBException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; /** * An MDB that transforms the TextMessages it receives and send the * transformed messages to the Queue found in the incoming message * JMSReplyTo header. * * @author Scott.Stark@jboss.org * @version $Revision: 1.10 $ */ public class TextMDB implements MessageDrivenBean, MessageListener { private MessageDrivenContext ctx = null; private QueueConnection conn; private QueueSession session; public TextMDB() { System.out.println("TextMDB.ctor, this="+hashCode()); } public void setMessageDrivenContext(MessageDrivenContext ctx) { this.ctx = ctx; System.out.println("TextMDB.setMessageDrivenContext, this=" + hashCode()); } public void ejbCreate() { System.out.println("TextMDB.ejbCreate, this="+hashCode()); try { setupPTP(); } catch (Exception e) { throw new EJBException("Failed to init TextMDB", e); } } public void ejbRemove() { System.out.println("TextMDB.ejbRemove, this="+hashCode()); ctx = null; try { if (session != null) { session.close(); } if (conn != null) { conn.close(); } } catch(JMSException e) { e.printStackTrace(); } } public void onMessage(Message msg) { System.out.println("TextMDB.onMessage, this="+hashCode()); try { TextMessage tm = (TextMessage) msg; String text = tm.getText() + "processed by: "+hashCode(); Queue dest = (Queue) msg.getJMSReplyTo(); sendReply(text, dest); } catch(Throwable t) { t.printStackTrace(); } } private void setupPTP() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("java:comp/env/jms/QCF"); QueueConnectionFactory qcf = (QueueConnectionFactory) tmp; conn = qcf.createQueueConnection(); session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); conn.start(); } private void sendReply(String text, Queue dest) throws JMSException { System.out.println("TextMDB.sendReply, this=" + hashCode() + ", dest="+dest); QueueSender sender = session.createSender(dest); TextMessage tm = session.createTextMessage(text); sender.send(tm); sender.close(); } }
The MDB ejb-jar.xml and jboss.xml deployment descriptors are shown in Example 6.7, “The MDB ejb-jar.xml descriptor”.
Example 6.7. The MDB ejb-jar.xml descriptor
<?xml version="1.0"?> <!DOCTYPE ejb-jar PUBLIC "-//Sun Microsystems, Inc.//DTD Enterprise JavaBeans 2.0//EN" "http://java.sun.com/dtd/ejb-jar_2_0.dtd"> <ejb-jar> <enterprise-beans> <message-driven> <ejb-name>TextMDB</ejb-name> <ejb-class>org.jboss.chap6.ex2.TextMDB</ejb-class> <transaction-type>Container</transaction-type> <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode> <message-driven-destination> <destination-type>javax.jms.Queue</destination-type> </message-driven-destination> <res-ref-name>jms/QCF</res-ref-name> <resource-ref> <res-type>javax.jms.QueueConnectionFactory</res-type> <res-auth>Container</res-auth> </resource-ref> </message-driven> </enterprise-beans> </ejb-jar>
Example 6.8. The MDB jboss.xml descriptor
<?xml version="1.0"?> <jboss> <enterprise-beans> <message-driven> <ejb-name>TextMDB</ejb-name> <destination-jndi-name>queue/B</destination-jndi-name> <resource-ref> <res-ref-name>jms/QCF</res-ref-name> <jndi-name>ConnectionFactory</jndi-name> </resource-ref> </message-driven> </enterprise-beans> </jboss>
Example 6.9, “A JMS client that interacts with the TextMDB” shows a variation of the P2P client that sends several messages to the queue/B destination and asynchronously receives the messages as modified by TextMDB from queue A.
Example 6.9. A JMS client that interacts with the TextMDB
package org.jboss.chap6.ex2; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; import EDU.oswego.cs.dl.util.concurrent.CountDown; /** * A complete JMS client example program that sends N TextMessages to * a Queue B and asynchronously receives the messages as modified by * TextMDB from Queue A. * * @author Scott.Stark@jboss.org * @version $Revision: 1.10 $ */ public class SendRecvClient { static final int N = 10; static CountDown done = new CountDown(N); QueueConnection conn; QueueSession session; Queue queA; Queue queB; public static class ExListener implements MessageListener { public void onMessage(Message msg) { done.release(); TextMessage tm = (TextMessage) msg; try { System.out.println("onMessage, recv text="+tm.getText()); } catch(Throwable t) { t.printStackTrace(); } } } public void setupPTP() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); QueueConnectionFactory qcf = (QueueConnectionFactory) tmp; conn = qcf.createQueueConnection(); queA = (Queue) iniCtx.lookup("queue/A"); queB = (Queue) iniCtx.lookup("queue/B"); session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); conn.start(); } public void sendRecvAsync(String textBase) throws JMSException, NamingException, InterruptedException { System.out.println("Begin sendRecvAsync"); // Setup the PTP connection, session setupPTP(); // Set the async listener for queA QueueReceiver recv = session.createReceiver(queA); recv.setMessageListener(new ExListener()); // Send a few text msgs to queB QueueSender send = session.createSender(queB); for(int m = 0; m < 10; m ++) { TextMessage tm = session.createTextMessage(textBase+"#"+m); tm.setJMSReplyTo(queA); send.send(tm); System.out.println("sendRecvAsync, sent text="+tm.getText()); } System.out.println("End sendRecvAsync"); } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin SendRecvClient,now=" + System.currentTimeMillis()); SendRecvClient client = new SendRecvClient(); client.sendRecvAsync("A text msg"); client.done.acquire(); client.stop(); System.exit(0); System.out.println("End SendRecvClient"); } }
Run the client as follows:
[nr@toki examples]$ ant -Dchap=chap6 -Dex=2 run-example ... run-example2: [copy] Copying 1 file to /tmp/jboss-3.2.6/server/default/deploy [echo] Waiting 5 seconds for deploy... [java] Begin SendRecvClient, now=1098419197580 [java] Begin sendRecvAsync [java] onMessage, recv text=A text msg#0processed by: 13929978 [java] sendRecvAsync, sent text=A text msg#0 [java] sendRecvAsync, sent text=A text msg#1 [java] onMessage, recv text=A text msg#2processed by: 5495387 [java] sendRecvAsync, sent text=A text msg#2 [java] sendRecvAsync, sent text=A text msg#3 [java] onMessage, recv text=A text msg#1processed by: 13929978 [java] sendRecvAsync, sent text=A text msg#4 [java] sendRecvAsync, sent text=A text msg#5 [java] onMessage, recv text=A text msg#5processed by: 5495387 [java] sendRecvAsync, sent text=A text msg#6 [java] sendRecvAsync, sent text=A text msg#7 [java] onMessage, recv text=A text msg#4processed by: 13929978 [java] sendRecvAsync, sent text=A text msg#8 [java] sendRecvAsync, sent text=A text msg#9 [java] End sendRecvAsync [java] onMessage, recv text=A text msg#3processed by: 15690844 [java] onMessage, recv text=A text msg#8processed by: 15690844 [java] onMessage, recv text=A text msg#7processed by: 13929978 [java] onMessage, recv text=A text msg#6processed by: 5495387 [java] onMessage, recv text=A text msg#9processed by: 14089812
The corresponding JBoss server console output is:
23:26:36,720 INFO [EjbModule] Deploying TextMDB 23:26:37,073 INFO [EJBDeployer] Deployed: file:/private/tmp/jboss-3.2.6/server/default/de ploy/chap6-ex2.jar 23:26:43,216 INFO [TextMDB] TextMDB.ctor, this=13929978 23:26:43,224 INFO [TextMDB] TextMDB.setMessageDrivenContext, this=13929978 23:26:43,299 INFO [TextMDB] TextMDB.ejbCreate, this=13929978 23:26:43,401 INFO [TextMDB] TextMDB.onMessage, this=13929978 23:26:43,408 INFO [TextMDB] TextMDB.sendReply, this=13929978, dest=QUEUE.A 23:26:43,494 INFO [TextMDB] TextMDB.onMessage, this=13929978 23:26:43,518 INFO [TextMDB] TextMDB.sendReply, this=13929978, dest=QUEUE.A 23:26:43,571 INFO [TextMDB] TextMDB.ctor, this=5495387 23:26:43,573 INFO [TextMDB] TextMDB.setMessageDrivenContext, this=5495387 23:26:43,574 INFO [TextMDB] TextMDB.ejbCreate, this=5495387 23:26:43,596 INFO [TextMDB] TextMDB.onMessage, this=5495387 23:26:43,597 INFO [TextMDB] TextMDB.sendReply, this=5495387, dest=QUEUE.A 23:26:43,802 INFO [TextMDB] TextMDB.onMessage, this=13929978 23:26:43,803 INFO [TextMDB] TextMDB.sendReply, this=13929978, dest=QUEUE.A 23:26:43,825 INFO [TextMDB] TextMDB.onMessage, this=5495387 23:26:43,825 INFO [TextMDB] TextMDB.sendReply, this=5495387, dest=QUEUE.A 23:26:43,880 INFO [TextMDB] TextMDB.ctor, this=15690844 23:26:43,884 INFO [TextMDB] TextMDB.setMessageDrivenContext, this=15690844 23:26:43,887 INFO [TextMDB] TextMDB.ejbCreate, this=15690844 23:26:43,944 INFO [TextMDB] TextMDB.onMessage, this=15690844 23:26:43,945 INFO [TextMDB] TextMDB.sendReply, this=15690844, dest=QUEUE.A 23:26:44,022 INFO [TextMDB] TextMDB.onMessage, this=13929978 23:26:44,022 INFO [TextMDB] TextMDB.sendReply, this=13929978, dest=QUEUE.A 23:26:44,041 INFO [TextMDB] TextMDB.onMessage, this=15690844 23:26:44,041 INFO [TextMDB] TextMDB.sendReply, this=15690844, dest=QUEUE.A 23:26:44,065 INFO [TextMDB] TextMDB.ctor, this=14089812 23:26:44,069 INFO [TextMDB] TextMDB.setMessageDrivenContext, this=14089812 23:26:44,069 INFO [TextMDB] TextMDB.ejbCreate, this=14089812 23:26:44,200 INFO [TextMDB] TextMDB.onMessage, this=5495387 23:26:44,201 INFO [TextMDB] TextMDB.sendReply, this=5495387, dest=QUEUE.A 23:26:44,249 INFO [TextMDB] TextMDB.onMessage, this=14089812 23:26:44,250 INFO [TextMDB] TextMDB.sendReply, this=14089812, dest=QUEUE.A
Items of note in this example include:
JBossMQ is composed of several services working together to provide JMS API level services to client applications. The services that make up the JBossMQ JMS implementation are introduced in this section.
The Invocation Layer (IL) services are responsible for handling the communication protocols that clients use to send and receive messages. JBossMQ can support running different types of Invocation Layers concurrently. All Invocation Layers support bidirectional communication which allows clients to send and receive messages concurrently. ILs only handle the transport details of messaging. They delegate messages to the JMS server JMX gateway service known as the invoker. This is similar to how the detached invokers expose the EJB container via different transports.
Each IL service binds a JMS connection factory to a specific location in the JNDI tree. Clients choose the protocol they wish to use by the JNDI location used to obtain the JMS connection factory. JBossMQ currently has six different invocation layers, and they are introduced in the following sections.
The first IL that was developed was based on Java's Remote Method Invocation (RMI). This is a robust IL since it is based on standard RMI technology, but it has a high overhead compared to other ILs and will likely be dropped in future releases.
NOTE: This IL will try to establish a TCP/IP socket from the server to the client. Therefore, clients that sit behind firewalls or have security restrictions prohibiting the use of SeverSockets should not use this IL.
The next IL that was developed was the Optimized IL (OIL). The OIL uses a custom TCP/IP protocol and serialization protocol that has very low overhead. This was the recommended socket based protocol until the addition of the UIL2 protocol.
NOTE: This IL will try to establish a TCP/IP socket from the server to the client. Therefore, clients that sit behind firewalls or have security restrictions prohibiting the use of SeverSockets should not use this IL.
The Unified Invocation Layer (UIL) was developed to allow clients that cannot have a connection created from the server back to the client due to firewall or other restrictions. It is almost identical to the OIL protocol except that a multiplexing layer is used to provide the bidirectional communication. The multiplexing layer creates two virtual sockets over one physical socket. This IL is slower than the OIL due to the higher overhead incurred by the multiplexing layer. This invocation layer is now deprecated in favor of UIL2.
The Unified version 2 Invocation Layer (UIL2) is a variation of the UIL protocol that also uses a single socket between the client and server. However, unlike all other socket based invocation layers like RMI, UIL and OIL which use a blocking round-trip message at the socket level, the UIL2 protocol uses true asynchronous send and receive messaging at the transport level. This provides for improved throughput and utilization and as such, it is the preferred socket invocation layer.
The Java Virtual Machine (JVM) Invocation Layer was developed to cut out the TCP/IP overhead when the JMS client is running in the same JVM as the server. This IL uses direct method calls for the server to service the client requests. This increases efficiency since no sockets are created and there is no need for the associated worker threads. This is the IL that should be used by Message Driven Beans (MDB) or any other component that runs in the same virtual machine as the server such as servlets, MBeans, or EJBs.
The HTTP Invocation Layer (HTTPIL) allows for accessing the JBossMQ service over the HTTP or HTTPS protocols. This IL relies on the servlet deployed in the deploy/jms/jbossmq-httpil.sar to handle the http traffic. This IL is useful for access to JMS through a firewall when the only port allowed requires HTTP.
The JBossMQ SecurityManager is the service that enforces an access control list to guard access to your destinations. This subsystem works closely with the StateManager service.
The DestinationManager can be thought as being the central service in JBossMQ. It keeps track of all the destinations that have been created on the server. It also keeps track of the other key services such as the MessageCache, StateManager, and PersistenceManager.
Messages created in the server are passed to the MessageCache for memory management. JVM memory usage goes up as messages are added to a destination that does not have any receivers. These messages are held in the main memory until the receiver picks them up. If the MessageCache notices that the JVM memory usage starts passing the defined limits, the MessageCache starts moving those messages from memory to persistent storage on disk. The MessageCache uses a least recently used (LRU) algorithm to determine which messages should go to disk.
The StateManager (SM) is in charge of keeping track of who is allowed to log into the server and what their durable subscriptions are.
The PersistenceManager (PM) is used by a destination to store messages marked as being persistent. JBossMQ has several different implementations of the persistent manager, but only one can be enabled per server instance. You should enable the persistence manager that best matches your requirements.
The File PM is a robust persistence manager that comes with JBossMQ. It creates separate directories for each of the destination created on the server, and stores each persistent message as a separate file in the appropriate directory. It has poor performance characteristics since it is frequently opening and closing files.
The Rolling Logged PM is also a file based persistence manager that has better performance than the File PM because it stores multiple messages in one file, reducing the overhead of opening/closing multiple files. This is a very fast PM but it is less transactionally reliable than the File PM due to its use of the FileOutputStream.flush() method call. On some operating systems/JVMs the FileOutputStream.flush() method does not guarantee that the data has been written to disk by the time the call returns.
The JDBC2 PM is the second version of the original JDBC PM in JBossMQ 2.4.x. It has been substantially simplified and improved. This PM allows you to store persistent messages to a relational database using JDBC. The performance of this PM is directly related to the performance that can be obtained from the database. This PM has a very low memory overhead compared to the other persistence managers. Furthermore it is also highly integrated with the MessageCache to provide efficient persistence on a system that has a very active MessageCache.
A destination is the object on the JBossMQ server that clients use to send and receive messages. There are two types of destination objects, Queues and Topics. References to the destinations created by JBossMQ are stored in JNDI.
Clients that are in the Point-to-Point paradigm typically use Queues. They expect that message sent to a Queue will be receive by only one other client once and only once. If multiple clients are receiving messages from a single queue, the messages will be load balanced across the receivers. Queue objects, by default, will be stored under the JNDI queue/ sub context.
Topics are used in the publish-subscribe paradigm. When a client publishes a message to a topic, he expects that a copy of the message will be delivered to each client that has subscribed to the topic. Topic messages are delivered in the same manner a television show is delivered. Unless you have the TV on and are watching the show, you will miss it. Similarly, if the client is not up, running and receiving messages from the topics, it will miss messages published to the topic. To get around this problem of missing messages, clients can start a durable subscription. This is like having a VCR record a show you cannot watch at its scheduled time so that you can see what you missed when you turn your TV back on.
This section defines the MBean services that correspond to the components introduced in the previous section along with their MBean attributes. The configuration and service files that make up the JBossMQ system include:
We will discuss the associated MBeans in the following subsections.
The org.jboss.mq.il.jvm.JVMServerILService MBean is used to configure the JVM IL. The configurable attributes are as follows:
The org.jboss.mq.il.rmi.RMIServerILService is used to configure the RMI IL. The configurable attributes are as follows:
The org.jboss.mq.il.oil.OILServerILService is used to configure the OIL IL. The configurable attributes are as follows:
The org.jboss.mq.il.uil.UILServerILService is used to configure the UIL IL. Note that this service has been removed from the default distribution in JBoss 3.2.2, but an example configuration file can be found in the docs/examples/jca directory.
The configurable attributes of the UILServerILService are as follows:
The org.jboss.mq.il.uil2.UILServerILService is used to configure the UIL2 IL. The configurable attributes are as follows:
ClientAddress: The address passed to the client as the address that should be used to connect to the server.
The UIL2 and OIL services support the use of SSL through custom socket factories that integrate JSSE using the security domain associated with the IL service. An example UIL2 service descriptor fragment that illustrates the use of the custom JBoss SSL socket factories is shown in Example 6.10, “An example UIL2 config fragment for using SSL”.
Example 6.10. An example UIL2 config fragment for using SSL
<mbean code="org.jboss.mq.il.uil2.UILServerILService" name="jboss.mq:service=InvocationLayer,type=HTTPSUIL2"> <depends optional-attribute-name="Invoker">jboss.mq:service=Invoker</depends> <attribute name="ConnectionFactoryJNDIRef">SSLConnectionFactory</attribute> <attribute name="XAConnectionFactoryJNDIRef">SSLXAConnectionFactory</attribute> <!-- ... --> <!-- SSL Socket Factories --> <attribute name="ClientSocketFactory"> org.jboss.security.ssl.ClientSocketFactory </attribute> <attribute name="ServerSocketFactory"> org.jboss.security.ssl.DomainServerSocketFactory </attribute> <!-- Security domain - see below --> <attribute name="SecurityDomain">java:/jaas/SSL</attribute> </mbean> <!-- Configures the keystore on the "SSL" security domain This mbean is better placed in conf/jboss-service.xml where it can be used by other services, but it will work from anywhere. Use keytool from the sdk to create the keystore. --> <mbean code="org.jboss.security.plugins.JaasSecurityDomain" name="jboss.security:service=JaasSecurityDomain,domain=SSL"> <!-- This must correlate with the java:/jaas/SSL above --> <constructor> <arg type="java.lang.String" value="SSL"/> </constructor> <!-- The location of the keystore resource: loads from the classpath and the server conf dir is a good default --> <attribute name="KeyStoreURL">resource:uil2.keystore</attribute> <attribute name="KeyStorePass">changeme</attribute> </mbean>
There are several system properties that a JMS client using the UIL2 transport can set to control the client connection back to the server
org.jboss.mq.il.uil2.useServerHost: This system property allows a client to to connect to the server InetAddress.getHostName rather than theInetAddress.getHostAddress value. This will only make a difference if there different address to name resolution between the server and client environments.
org.jboss.mq.il.uil2.localAddr: This system property allows a client to define the local interface to which its sockets should be bound.
org.jboss.mq.il.uil2.localPort: This system property allows a client to define the local port to which its sockets should be bound
org.jboss.mq.il.uil2.serverAddr: This system property allows a client to override the address to which it attempts to connect to. This is useful for networks where NAT is ocurring between the client and JMS server.
org.jboss.mq.il.uil2.serverPort: This system property allows a client to override the port to which it attempts to connect. This is useful for for networks where port forwarding is ocurring between the client and jms server.
org.jboss.mq.il.uil2.retryCount: This system property controls the number of attempts to retry connecting to the JMS server. Retries are only made for java.net.ConnectException failures. A value <= 0 means no retry atempts will be made.
org.jboss.mq.il.uil2.retryDelay: This system property controls the delay in milliseconds between retries due to ConnectException failures.
The org.jboss.mq.il.http.HTTPServerILService is used to manage the HTTP/S IL. This IL allows for the use of the JMS service over HTTP or HTTPS connections. The relies on the servlet deployed in the deploy/jms/jbossmq-httpil.sar to handle the HTTP traffic. The configurable attributes are as follows:
The org.jboss.mq.server.jmx.Invoker is used to pass IL requests down to the destination manager service through an interceptor stack. The configurable attributes are as follows:
The org.jboss.mq.server.jmx.InterceptorLoader is used to load a generic interceptor and make it part of the interceptor stack. This MBean is typically used to load custom interceptors like org.jboss.mq.server.TracingInterceptor, which is can be used to efficiently log all client requests via trace level log messages. The configurable attributes are as follows:
The org.jboss.mq.sm.file.DynamicStateManager MBean is used as the default state manager assigned to the DestinationManager service. It manages an XML user security store that provides the authentication, authorization and durable subscriber information. The configurable attributes are as follows:
If the org.jboss.mq.security.SecurityManager is part of the interceptor stack, then it will enforce the access control lists assigned to the destinations. The SecurityManager uses JAAS, and as such requires that at application policy be setup for in the JBoss login-config.xml file. The default configuration is shown below.
<application-policy name="jbossmq"> <authentication> <login-module code="org.jboss.mq.sm.file.DynamicLoginModule" flag="required"> <module-option name="unauthenticatedIdentity">guest</module-option> <module-option name="sm.objectname"> jboss.mq:service=StateManager </module-option> </login-module> </authentication> </application-policy>
This integrates the DynamicStateManager jbossmq-state.xml security store into the JAAS based framework through the org.jboss.mq.sm.file.DynamicLoginModule. The configuration also maps any unauthenticated JBossMQ client to the guest role.
The configurable attributes of the SecurityManager are as follows:
You may be uncomfortable having to maintain your authentication and authorization information in an XML file. You can use any standard security store such as a database or LDAP server by simply updating the JAAS login-config.xml to provide the same username to password and user to role mappings as the DynamicStateManager. For example, to use a JDBC database, the following sample database tables and login-config.xml entry would work.
Table 6.2. An example JMSRoles username to acl roles table
username | role |
---|---|
jduke | create |
jduke | read |
jduke | write |
Example 6.11. An alternate login-config.xml configuration for JBossMQ
<application-policy name="jbossmq"> <authentication> <login-module code="org.jboss.security.auth.spi.DatabaseServerLoginModule" flag="required"> <module-option name="unauthenticatedIdentity">guest </module-option> <module-option name="dsJndiName">java:/DefaultDS </module-option> <module-option name="principalsQuery"> select password from JMSPasswords where username = ? </module-option> <module-option name="rolesQuery"> select role, "Roles" from JMSRoles where username= ? </module-option> </login-module> </authentication> </application-policy>
For a complete description of the DatabaseServerLoginModule see Section 8.4.6.4, “org.jboss.security.auth.spi.DatabaseServerLoginModule”.
The org.jboss.mq.server.jmx.DestinationManager must be the last interceptor in the interceptor stack. The configurable attributes are as follows:
Additional read-only attributes and operations that support monitoring include:
String listMessageCounter(): This operation generates an HTML table that contains:
The server determines when to move messages to secondary storage by using the org.jboss.mq.server.MessageCache MBean. The configurable attributes are as follows:
Additional read-only cache attribute that provide statistics include:
The org.jboss.mq.pm.file.CacheStore MBean should be used as the cache store for the MessageCache service when you are using the File or Rolling Logged PM. The configurable attributes are as follows:
The org.jboss.mq.pm.file.PersistenceManager should be used as the Persistence Manager assigned to the DestinationManager if you wish to use the File PM. The configurable attributes are as follows:
The org.jboss.mq.pm.rollinglogged.PersistenceManager should be used as the PersistenceManager assigned to the DestinationManager if you wish to use the Rolling Logged PM. The configurable attributes are as follows:
The org.jboss.mq.pm.jdbc.PersistenceManager should be used as the persistence manager assigned to the DestinationManager if you wish to store messages in a database. This PM has been tested against the HypersonSQL, MS SQL, Oracle, MySQL and Postgres databases. The configurable attributes are as follows:
Example 6.12. Default JDBC2 PeristenceManager SqlProperties
<attribute name="SqlProperties"> BLOB_TYPE=OBJECT_BLOB INSERT_TX = INSERT INTO JMS_TRANSACTIONS (TXID) values(?) INSERT_MESSAGE = INSERT INTO JMS_MESSAGES (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP) VALUES(?,?,?,?,?) SELECT_ALL_UNCOMMITED_TXS = SELECT TXID FROM JMS_TRANSACTIONS SELECT_MAX_TX = SELECT MAX(TXID) FROM JMS_MESSAGES SELECT_MESSAGES_IN_DEST = SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES \ WHERE DESTINATION=? SELECT_MESSAGE = SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE \ MESSAGEID=? AND DESTINATION=? MARK_MESSAGE = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE MESSAGEID=? AND \ DESTINATION=? UPDATE_MESSAGE = UPDATE JMS_MESSAGES SET MESSAGEBLOB=? WHERE MESSAGEID=? AND \ DESTINATION=? UPDATE_MARKED_MESSAGES = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? UPDATE_MARKED_MESSAGES_WITH_TX = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=? DELETE_MARKED_MESSAGES_WITH_TX = DELETE FROM JMS_MESSAGES WHERE TXID IN \ (SELECT TXID FROM JMS_TRANSACTIONS) AND TXOP=? DELETE_TX = DELETE FROM JMS_TRANSACTIONS WHERE TXID = ? DELETE_MARKED_MESSAGES = DELETE FROM JMS_MESSAGES WHERE TXID=? AND TXOP=? DELETE_MESSAGE = DELETE FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=? CREATE_MESSAGE_TABLE = CREATE TABLE JMS_MESSAGES ( MESSAGEID INTEGER NOT NULL, \ DESTINATION VARCHAR(255) NOT NULL, TXID INTEGER, TXOP CHAR(1), \ MESSAGEBLOB OBJECT, PRIMARY KEY (MESSAGEID, DESTINATION) ) CREATE_TX_TABLE = CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER ) </attribute>
Example 6.13. A sample JDBC2 PeristenceManager SqlProperties for Oracle
<attribute name="SqlProperties"> BLOB_TYPE=BINARYSTREAM_BLOB INSERT_TX = INSERT INTO JMS_TRANSACTIONS (TXID) values(?) INSERT_MESSAGE = INSERT INTO JMS_MESSAGES (MESSAGEID, DESTINATION, \ MESSAGEBLOB, TXID, TXOP) VALUES(?,?,?,?,?) SELECT_ALL_UNCOMMITED_TXS = SELECT TXID FROM JMS_TRANSACTIONS SELECT_MAX_TX = SELECT MAX(TXID) FROM JMS_MESSAGES SELECT_MESSAGES_IN_DEST = SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES \ WHERE DESTINATION=? SELECT_MESSAGE = SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE \ MESSAGEID=? AND DESTINATION=? MARK_MESSAGE = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE MESSAGEID=? \ AND DESTINATION=? UPDATE_MESSAGE = UPDATE JMS_MESSAGES SET MESSAGEBLOB=? WHERE MESSAGEID=? \ AND DESTINATION=? UPDATE_MARKED_MESSAGES = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? UPDATE_MARKED_MESSAGES_WITH_TX = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE \ TXOP=? AND TXID=? DELETE_MARKED_MESSAGES_WITH_TX = DELETE FROM JMS_MESSAGES WHERE TXID IN \ (SELECT TXID FROM JMS_TRANSACTIONS) AND TXOP=? DELETE_TX = DELETE FROM JMS_TRANSACTIONS WHERE TXID = ? DELETE_MARKED_MESSAGES = DELETE FROM JMS_MESSAGES WHERE TXID=? AND TXOP=? DELETE_MESSAGE = DELETE FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=? CREATE_MESSAGE_TABLE = CREATE TABLE JMS_MESSAGES ( MESSAGEID INTEGER NOT NULL, \ DESTINATION VARCHAR(255) NOT NULL, TXID INTEGER, TXOP CHAR(1), \ MESSAGEBLOB BLOB, PRIMARY KEY (MESSAGEID, DESTINATION) ) CREATE_TX_TABLE = CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER ) </attribute>
This section describes the destination MBeans used in the jbossmq-destinations-service.xml and jbossmq-service.xml descriptors.
The org.jboss.mq.server.jmx.Queue is used to define a Queue Destination on the JBossMQ server. The name attribute of the JMX object name of this MBean is used to determine the destination name. For example, if the JMX MBean begins with:
<mbean code="org.jboss.mq.server.jmx.Queue" name="jboss.mq.destination:service=Queue,name=testQueue">
Then, the JMX object name is jboss.mq.destination:service=Queue,name=testQueue and the name of the queue is "testQueue". The configurable attributes are as follows:
Additional read-only attributes that provide statistics information include:
String listMessageCounter(): This operation generates an HTML table that contains:
The org.jboss.mq.server.jmx.Topic is used to define a topic destination on the JBossMQ server. The name attribute of the JMX object name of this MBean is used to determine the destination name. For example, if the JMX MBean begins with:
<mbean code="org.jboss.mq.server.jmx.Topic" name="jboss.mq.destination:service=Topic,name=testTopic">
Then, the JMX object name is jboss.mq.destination:service=Topic,name=testTopic and the name of the topic is testTopic. The configurable attributes are as follows:
Additional read-only attributes that provide statistics information include:
String listMessageCounter(): This operation generates an HTML table that contains
JBossMQ statistics and several management functions are accessible via JMX. JMX can be accessed interactively via a Web Application or programmatically via the JMX API. It is recommended that you use the http://localhost:8080/jmx-console web application to get familiar with all the JBossMQ JMX MBeans running inside the server and how to invoke methods on those MBeans via the JMX Console. This section will outline the most common runtime management tasks that administrators must perform.
Applications that require the dynamic creation of queues at runtime can use the Destination Manager's MBean createQueue method: void createQueue(String name, String jndiLocation)
This method creates a queue with the given name and binds it in JNDI at the jndiLocation. Queues created via this method exist until the server is restarted. To destroy a previously created Queue, you would issue a void destroyQueue(String name)
Applications that require the dynamic creation of topics at runtime can use the Destination Manager's MBean createTopic method: void createTopic(String name, String jndiLocation)
This method creates a topic with the given name and binds it in JNDI at the jndiLocation. Topics created via this method exist until the server is restarted. To destroy a previously created Topic, you would issue a: void destroyTopic(String name)
The org.jboss.mq.sm.file.DynamicStateManager's MBean can be used to add and remove user ids and roles at runtime. To add a user id, you would use: void addUser(String name, String password, String clientID)
This method creates a user id with the given name and password and configures him to have the given clientID. To remove a previously created user id, you would call the following method: void removeUser(String name)
To manage the roles that the user ids belong to, you would use the following set of methods to create roles, remove roles, add users to roles, and remove users from roles:
Up to this point we have looked at the standard JMS client/server architecture. The JMS specification defines an advanced set of interfaces that allow for concurrent processing of a destination's messages, and collectively this functionality is referred to as application server facilities (ASF). Two of the interfaces that support concurrent message processing, javax.jms.ServerSessionPool and javax.jms.ServerSession, must be provided by the application server in which the processing will occur. Thus, the set of components that make up the JBossMQ ASF involves both JBossMQ components as well as JBoss server components. The JBoss server MDB container utilizes the JMS service's ASF to concurrently process messages sent to MDBs.
The responsibilities of the ASF domains are well defined by the JMS specification and so we won't go into a discussion of how the ASF components are implemented. Rather, we want to discuss how ASF components used by the JBoss MDB layer are integrated using MBeans that allow either the application server interfaces, or the JMS provider interfaces to be replaced with alternate implementations.
Let's start with the org.jboss.jms.jndi.JMSProviderLoader MBean. This MBean is responsible for loading an instance of the org.jboss.jms.jndi.JMSProviderAdaptor interface into the JBoss server and binding it into JNDI. The JMSProviderAdaptor interface is an abstraction that defines how to get the root JNDI context for the JMS provider, and an interface for getting and setting the JNDI names for the Context.PROVIDER_URL for the root InitialContext, and the QueueConnectionFactory and TopicConnectionFactory locations in the root context. This is all that is really necessary to bootstrap use of a JMS provider. By abstracting this information into an interface, alternate JMS ASF provider implementations can be used with the JBoss MDB container. The org.jboss.jms.jndi.JBossMQProvider is the default implementation of JMSProviderAdaptor interface, and provides the adaptor for the JBossMQ JMS provider. To replace the JBossMQ provider with an alternate JMS ASF implementation, simply create an implementation of the JMSProviderAdaptor interface and configure the JMSProviderLoader with the class name of the implementation. We'll see an example of this in the configuration section.
In addition to being able to replace the JMS provider used for MDBs, you can also replace the javax.jms.ServerSessionPool interface implementation. This is possible by configuring the class name of the org.jboss.jms.asf.ServerSessionPoolFactory implementation using the org.jboss.jms.asf.ServerSessionPoolLoader MBean PoolFactoryClass attribute. The default ServerSessionPoolFactory factory implementation is the JBoss org.jboss.jms.asf.StdServerSessionPoolFactory class.
The JMSProviderLoader MBean service creates a JMS provider adaptor and binds it into JNDI. A JMS provider adaptor is a class that implements the org.jboss.jms.jndi.JMSProviderAdapter interface. It is used by the message driven bean container to access a JMS service provider in a provider independent manner. The configurable attributes of the JMSProviderLoader service are:
Example 6.14. A JMSProviderLoader for accessing a remote JBossMQ server
<mbean code="org.jboss.jms.jndi.JMSProviderLoader" name="jboss.mq:service=JMSProviderLoader,name=RemoteJBossMQProvider"> <attribute name="ProviderName">RemoteJMSProvider</attribute> <attribute name="ProviderUrl">jnp://remotehost:1099</attribute> <attribute name="ProviderAdapterClass"> org.jboss.jms.jndi.JBossMQProvider </attribute> <attribute name="QueueFactoryRef">XAConnectionFactory</attribute> <attribute name="TopicFactoryRef">XAConnectionFactory</attribute> </mbean>
The RemoteJMSProvider can be referenced on the mdb invoker config as shown in the jboss.xml fragment given in Example 6.15, “ A jboss.xml fragment for specifying the MDB JMS provider adaptor”.
Example 6.15. A jboss.xml fragment for specifying the MDB JMS provider adaptor
<proxy-factory-config> <JMSProviderAdapterJNDI>RemoteJMSProvider</JMSProviderAdapterJNDI> <ServerSessionPoolFactoryJNDI>StdJMSPool</ServerSessionPoolFactoryJNDI> <MaximumSize>15</MaximumSize> <MaxMessages>1</MaxMessages> <MDBConfig> <ReconnectIntervalSec>10</ReconnectIntervalSec> <DLQConfig> <DestinationQueue>queue/DLQ</DestinationQueue> <MaxTimesRedelivered>10</MaxTimesRedelivered> <TimeToLive>0</TimeToLive> </DLQConfig> </MDBConfig> </proxy-factory-config>
Incidently, because one can specify multiple invoker-proxy-binding elements, this allows an MDB to listen to the same queue/topic on multiple servers by configuring multiple bindings with different JMSProviderAdapterJNDI settings.
Alternatively, one can integrate the JMS provider using JCA configuration like that shown in Example 6.16, “A jms-ds.xml descriptor for integrating a JMS provider adaptor via JCA”.
Example 6.16. A jms-ds.xml descriptor for integrating a JMS provider adaptor via JCA
<tx-connection-factory> <jndi-name>RemoteJmsXA</jndi-name> <xa-transaction/> <adapter-display-name>JMS Adapter</adapter-display-name> <config-property name="JMSProviderAdapterJNDI" type="java.lang.String">RemoteJMSProvider</config-property> <config-property name="SessionDefaultType" type="java.lang.String">javax.jms.Topic</config-property> <security-domain-and-application>JmsXARealm</security-domain-and-application> </tx-connection-factory>
The ServerSessionPoolLoader MBean service manages a factory for javax.jms.ServerSessionPool objects used by the message driven bean container. The configurable attributes of the ServerSessionPoolLoader service are:
We have mentioned that one can replace the JBossMQ JMS implementation with a foreign implementation. Here we summarize the various approaches one can take to do the replacement: