package org.jboss.test.jbossmq.stress;
import junit.framework.TestSuite;
import junit.framework.Assert;
import org.jboss.test.jbossmq.MQBase;
public class DurableSubscriberTest extends MQBase
{
public DurableSubscriberTest(String name)
{
super(name);
}
public void runDurableSubscriberPartOne() throws Exception
{
try
{
drainTopic();
int ic = getIterationCount();
IntRangeMessageFilter f1 = new IntRangeMessageFilter(javax.jms.Message.class,
"DURABLE_NR",
0,
ic / 2);
TopicWorker sub1 = new TopicWorker(SUBSCRIBER,
TRANS_NONE,
f1);
sub1.setDurable("john", "needle", "sub2");
Thread t1 = new Thread(sub1);
t1.start();
IntRangeMessageCreator c1 = new IntRangeMessageCreator("DURABLE_NR",
0);
TopicWorker pub1 = new TopicWorker(PUBLISHER,
TRANS_NONE,
c1,
ic / 2);
pub1.connect();
pub1.publish();
Assert.assertEquals("Publisher did not publish correct number of messages " + pub1.getMessageHandled(),
ic / 2,
pub1.getMessageHandled());
log.debug("Sleeping for " + ((ic * 10) / 60000) + " minutes");
sleep(ic * 10);
Assert.assertEquals("Subscriber did not get correct number of messages " + sub1.getMessageHandled(),
ic / 2,
sub1.getMessageHandled());
sub1.close();
t1.interrupt();
pub1.publish(ic / 2);
Assert.assertEquals("Publisher did not publish correct number of messages " + pub1.getMessageHandled(), ic,
pub1.getMessageHandled());
pub1.close();
}
catch (Throwable t)
{
log.error("Error in test: " + t, t);
throw new Exception(t.getMessage());
}
}
public void runDurableSubscriberPartTwo() throws Exception
{
try
{
int ic = getIterationCount();
IntRangeMessageFilter f1 = new IntRangeMessageFilter(javax.jms.Message.class,
"DURABLE_NR",
0,
ic / 2);
TopicWorker sub1 = new TopicWorker(SUBSCRIBER,
TRANS_NONE,
f1);
sub1.setDurable("john", "needle", "sub2");
Thread t2 = new Thread(sub1);
t2.start();
log.debug("Sleeping for " + ((ic * 10) / 60000) + " minutes");
sleep(ic * 10);
Assert.assertEquals("Subscriber did not get correct number of messages " + sub1.getMessageHandled(), ic / 2,
sub1.getMessageHandled());
sub1.unsubscribe();
sub1.close();
t2.interrupt();
}
catch (Throwable t)
{
log.error("Error in test: " + t, t);
throw new Exception(t.getMessage());
}
}
public void testDurableSubscriber() throws Exception
{
runDurableSubscriberPartOne();
runDurableSubscriberPartTwo();
}
public void runGoodClient() throws Exception
{
TopicWorker sub1 = new TopicWorker(CONNECTOR,
TRANS_NONE,
null);
sub1.setDurable("john", "needle", "sub2");
Thread t1 = new Thread(sub1);
t1.start();
try
{
Thread.sleep(2000);
}
catch (InterruptedException e)
{
}
t1.interrupt();
sub1.close();
Assert.assertNull("Error in connecting durable sub", sub1.getException());
}
public void runBadClient() throws Exception
{
TopicWorker sub1 = new TopicWorker(CONNECTOR,
TRANS_NONE,
null);
sub1.setDurable("john", "needle", "sub2");
Thread t1 = new Thread(sub1);
t1.start();
try
{
Thread.sleep(2000);
}
catch (InterruptedException e)
{
}
t1.interrupt();
Assert.assertNull("Error in connecting durable sub", sub1.getException());
}
public static junit.framework.Test suite() throws Exception
{
TestSuite suite = new TestSuite();
suite.addTest(new DurableSubscriberTest("runGoodClient"));
suite.addTest(new DurableSubscriberTest("testDurableSubscriber"));
return suite;
}
public static void main(String[] args)
{
}
}