JBoss.org Community Documentation

7.1.3. A Pub-Sub With Durable Topic Example

JMS supports a messaging model that is a cross between the P2P and pub-sub models. When a pub-sub client wants to receive all messages posted to the topic it subscribes to even when it is not actively listening to the topic, the client may achieve this behavior using a durable topic. Let's look at a variation of the preceding subscriber client that uses a durable topic to ensure that it receives all messages, include those published when the client is not listening to the topic. Example 7.5, “A durable topic JMS client example” shows the durable topic client with the key differences between the Example 7.4, “A JMS subscriber client” client highlighted in bold.

package org.jboss.book.jms.ex1;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSubscriber;
import javax.jms.TopicSession;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.naming.NamingException;

/**
 *  A JMS client example program that synchronously receives a message a Topic
 *     
 *  @author Scott.Stark@jboss.org
 *  @version $Revision: 1.9 $
 */
public class DurableTopicRecvClient
{
    TopicConnection conn = null;
    TopicSession session = null;
    Topic topic = null;
    
    public void setupPubSub()
        throws JMSException, NamingException
    {
        InitialContext iniCtx = new InitialContext();
        Object tmp = iniCtx.lookup("ConnectionFactory");

        TopicConnectionFactory tcf = (TopicConnectionFactory) tmp;
        conn = tcf.createTopicConnection("john", "needle");
        topic = (Topic) iniCtx.lookup("topic/testTopic");

        session = conn.createTopicSession(false,
                                          TopicSession.AUTO_ACKNOWLEDGE);
        conn.start();
    }
    
    public void recvSync()
        throws JMSException, NamingException
    {
        System.out.println("Begin recvSync");
        // Setup the pub/sub connection, session
        setupPubSub();
        // Wait upto 5 seconds for the message
        TopicSubscriber recv = session.createDurableSubscriber(topic, "jms-ex1dtps");
        Message msg = recv.receive(5000);
        if (msg == null) {
            System.out.println("Timed out waiting for msg");
        } else {
            System.out.println("DurableTopicRecvClient.recv, msgt=" + msg);
        } 
    }
    
    public void stop() 
        throws JMSException
    {
        conn.stop();
        session.close();
        conn.close();
    }
    
    public static void main(String args[]) 
        throws Exception
    {
        System.out.println("Begin DurableTopicRecvClient, now=" + 
                           System.currentTimeMillis());
        DurableTopicRecvClient client = new DurableTopicRecvClient();
        client.recvSync();
        client.stop();
        System.out.println("End DurableTopicRecvClient");
        System.exit(0);
    }
    
}

Example 7.5. A durable topic JMS client example


Now run the previous topic publisher with the durable topic subscriber as follows:

[examples]$ ant -Dchap=jms -Dex=1psdt run-example
...                
run-example1psdt:
     [java] Begin DurableTopicSetup
     [java] End DurableTopicSetup
     [java] Begin TopicSendClient, now=1102899834273
     [java] Begin sendAsync
     [java] sendAsync, sent text=A text msg, now=1102899834345
     [java] End sendAsync
     [java] End TopicSendClient
     [java] Begin DurableTopicRecvClient, now=1102899840043
     [java] Begin recvSync
     [java] DurableTopicRecvClient.recv, msgt=SpyTextMessage {
     [java] Header { 
     [java]    jmsDestination  : TOPIC.testTopic.DurableSubscription[
               clientId=DurableSubscriberExample name=jms-ex1dtps selector=null]
     [java]    jmsDeliveryMode : 2
     [java]    jmsExpiration   : 0
     [java]    jmsPriority     : 4
     [java]    jmsMessageID    : ID:3-11028998375501
     [java]    jmsTimeStamp    : 1102899837550
     [java]    jmsCorrelationID: null
     [java]    jmsReplyTo      : null
     [java]    jmsType         : null
     [java]    jmsRedelivered  : false
     [java]    jmsProperties   : {}
     [java]    jmsPropReadWrite: false
     [java]    msgReadOnly     : true
     [java]    producerClientId: ID:3
     [java] }
     [java] Body {
     [java]    text            :A text msg, now=1102899834345
     [java] }
     [java] }
     [java] End DurableTopicRecvClient

Items of note for the durable topic example include:

  • The TopicConnectionFactory creation in the durable topic client used a username and password, and the TopicSubscriber creation was done using the createDurableSubscriber(Topic, String) method. This is a requirement of durable topic subscribers. The messaging server needs to know what client is requesting the durable topic and what the name of the durable topic subscription is. We will discuss the details of durable topic setup in the configuration section.

  • An org.jboss.book.jms.DurableTopicSetup client was run prior to the TopicSendClient. The reason for this is a durable topic subscriber must have registered a subscription at some point in the past in order for the messaging server to save messages. JBoss supports dynamic durable topic subscribers and the DurableTopicSetup client simply creates a durable subscription receiver and the exits. This leaves an active durable topic subscriber on the topic/testTopic and the messaging server knows that any messages posted to this topic must be saved for latter delivery.

  • The TopicSendClient does not change for the durable topic. The notion of a durable topic is a subscriber only notion.

  • The DurableTopicRecvClient sees the message published to the topic/testTopic even though it was not listening to the topic at the time the message was published.