package org.jboss.test.jbossmq.stress;
import javax.jms.JMSException;
import junit.framework.Assert;
import org.jboss.test.jbossmq.MQBase;
public class MultipleDurableSubscribers extends MQBase
{
class PiggyBackWorker extends TopicWorker
{
public PiggyBackWorker(TopicWorker worker)
{
super();
connection = worker.connection;
destination = worker.destination;
session = worker.session;
}
public void connect()
{
log.debug("In null connect");
}
public void subscribe() throws JMSException
{
super.subscribe();
log.debug("Message consumer set up " + consumer);
}
}
public MultipleDurableSubscribers(String name)
{
super(name);
}
public void runDurableSubscriberPartOne() throws Exception
{
try
{
drainTopic();
int ic = getIterationCount();
IntRangeMessageFilter f1 = new IntRangeMessageFilter(javax.jms.Message.class,
"DURABLE_NR",
0,
ic / 2);
TopicWorker sub1 = new TopicWorker(SUBSCRIBER,
TRANS_NONE,
f1);
sub1.setDurable("john", "needle", "sub1");
Thread t1 = new Thread(sub1);
t1.start();
log.debug("Sub1 set up");
sleep(5000L);
TopicWorker sub2 = new PiggyBackWorker(sub1);
sub2.setSubscriberAttrs(SUBSCRIBER,
TRANS_NONE,
f1);
sub2.setDurable("john", "needle", "sub2");
Thread t2 = new Thread(sub2);
t2.start();
log.debug("Sub2 setup");
IntRangeMessageCreator c1 = new IntRangeMessageCreator("DURABLE_NR",
0);
TopicWorker pub1 = new TopicWorker(PUBLISHER,
TRANS_NONE,
c1,
ic / 2);
pub1.connect();
pub1.publish();
Assert.assertEquals("Publisher did not publish correct number of messages " + pub1.getMessageHandled(),
ic / 2,
pub1.getMessageHandled());
log.debug("Sleeping for " + ((ic * 10) / 60000) + " minutes");
sleep(ic * 10);
Assert.assertEquals("Subscriber1 did not get correct number of messages " + sub1.getMessageHandled(),
ic / 2,
sub1.getMessageHandled());
Assert.assertEquals("Subscriber2 did not get correct number of messages " + sub1.getMessageHandled(),
ic / 2,
sub2.getMessageHandled());
sub1.close();
t1.interrupt();
sub2.close();
t2.interrupt();
pub1.publish(ic / 2);
Assert.assertEquals("Publisher did not publish correct number of messages " + pub1.getMessageHandled(), ic,
pub1.getMessageHandled());
pub1.close();
}
catch (Throwable t)
{
log.error("Error in test: " + t, t);
throw new Exception(t.getMessage());
}
}
public void runDurableSubscriberPartTwo() throws Exception
{
try
{
int ic = getIterationCount();
IntRangeMessageFilter f1 = new IntRangeMessageFilter(javax.jms.Message.class,
"DURABLE_NR",
0,
ic / 2);
TopicWorker sub1 = new TopicWorker(SUBSCRIBER,
TRANS_NONE,
f1);
sub1.setDurable("john", "needle", "sub1");
Thread t1 = new Thread(sub1);
t1.start();
sleep(5000L);
TopicWorker sub2 = new PiggyBackWorker(sub1);
sub2.setSubscriberAttrs(SUBSCRIBER,
TRANS_NONE,
f1);
sub2.setDurable("john", "needle", "sub2");
Thread t2 = new Thread(sub2);
t2.start();
log.debug("Sleeping for " + ((ic * 10) / 60000) + " minutes");
sleep(ic * 10);
Assert.assertEquals("Subscriber did not get correct number of messages " + sub1.getMessageHandled(), ic / 2,
sub1.getMessageHandled());
Assert.assertEquals("Subscriber did not get correct number of messages " + sub1.getMessageHandled(), ic / 2,
sub2.getMessageHandled());
sub1.unsubscribe();
sub2.unsubscribe();
sub1.close();
t1.interrupt();
sub2.close();
t2.interrupt();
}
catch (Throwable t)
{
log.error("Error in test: " + t, t);
throw new Exception(t.getMessage());
}
}
public void testDurableSubscriber() throws Exception
{
runDurableSubscriberPartOne();
runDurableSubscriberPartTwo();
}
public static void main(String[] args)
{
}
}