package org.jboss.test.jbossmq.stress;
import javax.jms.Message;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import junit.framework.TestSuite;
import junit.framework.Assert;
import org.jboss.test.jbossmq.MQBase;
public class MassiveTest extends MQBase
{
static byte[] PERFORMANCE_TEST_DATA_PAYLOAD = new byte[10 * 1024];
public MassiveTest(String name)
{
super(name);
}
public void runMassiveTest() throws Exception
{
drainTopic();
int ic = getIterationCount();
IntRangeMessageFilter f1 = new IntRangeMessageFilter(javax.jms.BytesMessage.class,
"MASSIVE_NR",
0,
ic);
TopicWorker sub1 = new TopicWorker(SUBSCRIBER,
TRANS_NONE,
f1);
Thread t1 = new Thread(sub1);
t1.start();
ByteIntRangeMessageCreator c1 = new ByteIntRangeMessageCreator("MASSIVE_NR",
0);
TopicWorker pub1 = new TopicWorker(PUBLISHER,
TRANS_NONE,
c1,
ic);
pub1.connect();
pub1.publish();
Assert.assertEquals("Publisher did not publish correct number of messages " + pub1.getMessageHandled(),
ic,
pub1.getMessageHandled());
log.debug("Sleeping for " + ((ic * 10) / 60000) + " minutes");
try
{
Thread.sleep(ic * 10);
}
catch (InterruptedException e)
{
}
log.debug("Woke up");
Assert.assertEquals("Subscriber did not get correct number of messages " + sub1.getMessageHandled(),
ic,
sub1.getMessageHandled());
sub1.close();
t1.interrupt();
pub1.close();
}
public void runMassivTestFailingSub() throws Exception
{
drainTopic();
int ic = getIterationCount();
IntRangeMessageFilter f1 = new IntRangeMessageFilter(javax.jms.BytesMessage.class,
"MASSIVE_NR",
0,
ic);
TopicWorker sub1 = new TopicWorker(SUBSCRIBER,
TRANS_NONE,
f1);
Thread t1 = new Thread(sub1);
t1.start();
FailingSubWorker sub2 = new FailingSubWorker();
sub2.setSubscriberAttrs(SUBSCRIBER,
TRANS_NONE,
f1);
Thread tf = new Thread(sub2);
tf.start();
ByteIntRangeMessageCreator c1 = new ByteIntRangeMessageCreator("MASSIVE_NR",
0);
TopicWorker pub1 = new TopicWorker(PUBLISHER,
TRANS_NONE,
c1,
ic);
pub1.connect();
pub1.publish();
Assert.assertEquals("Publisher did not publish correct number of messages " + pub1.getMessageHandled(),
ic,
pub1.getMessageHandled());
log.debug("Sleeping for " + ((ic * 10) / 60000) + " minutes");
try
{
Thread.sleep(ic * 10);
}
catch (InterruptedException e)
{
}
log.debug("Woke up");
Assert.assertEquals("Subscriber did not get correct number of messages " + sub1.getMessageHandled(),
ic,
sub1.getMessageHandled());
sub1.close();
t1.interrupt();
pub1.close();
sub2.setStoped();
tf.interrupt();
tf.interrupt();
sub2.close();
}
public static junit.framework.Test suite() throws Exception
{
TestSuite suite = new TestSuite();
suite.addTest(new MassiveTest("runMassiveTest"));
return suite;
}
public static void main(String[] args)
{
try
{
MassiveTest t = new MassiveTest("runMassiveTest");
t.setUp();
t.runMassiveTest();
}
catch (Exception ex)
{
System.err.println("Ex: " + ex);
ex.printStackTrace();
}
}
public class ByteIntRangeMessageCreator extends IntRangeMessageCreator
{
int start = 0;
public ByteIntRangeMessageCreator(String property)
{
super(property);
}
public ByteIntRangeMessageCreator(String property, int start)
{
super(property);
this.start = start;
}
public Message createMessage(int nr) throws JMSException
{
if (session == null)
throw new JMSException("Session not allowed to be null");
BytesMessage msg = session.createBytesMessage();
msg.writeBytes(PERFORMANCE_TEST_DATA_PAYLOAD);
msg.setStringProperty(property, String.valueOf(start + nr));
return msg;
}
}
public class FailingSubWorker extends TopicWorker
{
int check = 0;
public void onMessage(Message msg)
{
check++;
if (check > 1)
log.warn("Got called while asleep!! " + check);
while (!stopRequested)
{
sleep(2000);
}
}
}
}