JBoss.org Community Documentation
The JMS publish/subscribe (Pub-Sub) message model is a one-to-many model. A publisher sends a message to a topic and all active subscribers of the topic receive the message. Subscribers that are not actively listening to the topic will miss the published message. shows a complete JMS client that sends a javax.jms.TextMessage to a topic and asynchronously receives the message from the same topic.
package org.jboss.book.jms.ex1;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSubscriber;
import javax.jms.TopicSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import EDU.oswego.cs.dl.util.concurrent.CountDown;
/**
* A complete JMS client example program that sends a TextMessage to
* a Topic and asynchronously receives the message from the same
* Topic.
*
* @author Scott.Stark@jboss.org
* @version $Revision: 1.9 $
*/
public class TopicSendRecvClient
{
static CountDown done = new CountDown(1);
TopicConnection conn = null;
TopicSession session = null;
Topic topic = null;
public static class ExListener implements MessageListener
{
public void onMessage(Message msg)
{
done.release();
TextMessage tm = (TextMessage) msg;
try {
System.out.println("onMessage, recv text=" + tm.getText());
} catch(Throwable t) {
t.printStackTrace();
}
}
}
public void setupPubSub()
throws JMSException, NamingException
{
InitialContext iniCtx = new InitialContext();
Object tmp = iniCtx.lookup("ConnectionFactory");
TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
conn = tcf.createTopicConnection();
topic = (Topic) iniCtx.lookup("topic/testTopic");
session = conn.createTopicSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
conn.start();
}
public void sendRecvAsync(String text)
throws JMSException, NamingException
{
System.out.println("Begin sendRecvAsync");
// Setup the PubSub connection, session
setupPubSub();
// Set the async listener
TopicSubscriber recv = session.createSubscriber(topic);
recv.setMessageListener(new ExListener());
// Send a text msg
TopicPublisher send = session.createPublisher(topic);
TextMessage tm = session.createTextMessage(text);
send.publish(tm);
System.out.println("sendRecvAsync, sent text=" + tm.getText());
send.close();
System.out.println("End sendRecvAsync");
}
public void stop() throws JMSException
{
conn.stop();
session.close();
conn.close();
}
public static void main(String args[]) throws Exception
{
System.out.println("Begin TopicSendRecvClient, now=" +
System.currentTimeMillis());
TopicSendRecvClient client = new TopicSendRecvClient();
client.sendRecvAsync("A text msg, now="+System.currentTimeMillis());
client.done.acquire();
client.stop();
System.out.println("End TopicSendRecvClient");
System.exit(0);
}
}
Example 7.2. A Pub-Sub JMS client example
The client may be run using the following command line:
[examples]$ ant -Dchap=jms -Dex=1ps run-example
...
run-example1ps:
[java] Begin TopicSendRecvClient, now=1102809427043
[java] Begin sendRecvAsync
[java] onMessage, recv text=A text msg, now=1102809427071
[java] sendRecvAsync, sent text=A text msg, now=1102809427071
[java] End sendRecvAsync
[java] End TopicSendRecvClient
Now let's break the publisher and subscribers into separate programs to demonstrate that subscribers only receive messages while they are listening to a topic. Example 7.3, “A JMS publisher client” shows a variation of the previous pub-sub client that only publishes messages to the topic/testTopic topic. The subscriber only client is shown in Example 7.4, “A JMS subscriber client”.
package org.jboss.book.jms.ex1;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSlistubscriber;
import javax.jms.TopicSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
* A JMS client example program that sends a TextMessage to a Topic
*
* @author Scott.Stark@jboss.org
* @version $Revision: 1.9 $
*/
public class TopicSendClient
{
TopicConnection conn = null;
TopicSession session = null;
Topic topic = null;
public void setupPubSub()
throws JMSException, NamingException
{
InitialContext iniCtx = new InitialContext();
Object tmp = iniCtx.lookup("ConnectionFactory");
TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
conn = tcf.createTopicConnection();
topic = (Topic) iniCtx.lookup("topic/testTopic");
session = conn.createTopicSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
conn.start();
}
public void sendAsync(String text)
throws JMSException, NamingException
{
System.out.println("Begin sendAsync");
// Setup the pub/sub connection, session
setupPubSub();
// Send a text msg
TopicPublisher send = session.createPublisher(topic);
TextMessage tm = session.createTextMessage(text);
send.publish(tm);
System.out.println("sendAsync, sent text=" + tm.getText());
send.close();
System.out.println("End sendAsync");
}
public void stop()
throws JMSException
{
conn.stop();
session.close();
conn.close();
}
public static void main(String args[])
throws Exception
{
System.out.println("Begin TopicSendClient, now=" +
System.currentTimeMillis());
TopicSendClient client = new TopicSendClient();
client.sendAsync("A text msg, now="+System.currentTimeMillis());
client.stop();
System.out.println("End TopicSendClient");
System.exit(0);
}
}
Example 7.3. A JMS publisher client
package org.jboss.book.jms.ex1;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSubscriber;
import javax.jms.TopicSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
* A JMS client example program that synchronously receives a message a Topic
*
* @author Scott.Stark@jboss.org
* @version $Revision: 1.9 $
*/
public class TopicRecvClient
{
TopicConnection conn = null;
TopicSession session = null;
Topic topic = null;
public void setupPubSub()
throws JMSException, NamingException
{
InitialContext iniCtx = new InitialContext();
Object tmp = iniCtx.lookup("ConnectionFactory");
TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
conn = tcf.createTopicConnection();
topic = (Topic) iniCtx.lookup("topic/testTopic");
session = conn.createTopicSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
conn.start();
}
public void recvSync()
throws JMSException, NamingException
{
System.out.println("Begin recvSync");
// Setup the pub/sub connection, session
setupPubSub();
// Wait upto 5 seconds for the message
TopicSubscriber recv = session.createSubscriber(topic);
Message msg = recv.receive(5000);
if (msg == null) {
System.out.println("Timed out waiting for msg");
} else {
System.out.println("TopicSubscriber.recv, msgt="+msg);
}
}
public void stop()
throws JMSException
{
conn.stop();
session.close();
conn.close();
}
public static void main(String args[])
throws Exception
{
System.out.println("Begin TopicRecvClient, now=" +
System.currentTimeMillis());
TopicRecvClient client = new TopicRecvClient();
client.recvSync();
client.stop();
System.out.println("End TopicRecvClient");
System.exit(0);
}
}
Example 7.4. A JMS subscriber client
Run the TopicSendClient followed by the TopicRecvClient as follows:
[examples]$ ant -Dchap=jms -Dex=1ps2 run-example
...
run-example1ps2:
[java] Begin TopicSendClient, now=1102810007899
[java] Begin sendAsync
[java] sendAsync, sent text=A text msg, now=1102810007909
[java] End sendAsync
[java] End TopicSendClient
[java] Begin TopicRecvClient, now=1102810011524
[java] Begin recvSync
[java] Timed out waiting for msg
[java] End TopicRecvClient
The output shows that the topic subscriber client (TopicRecvClient) fails to receive the message sent by the publisher due to a timeout.