package org.jboss.test.jbossmq.perf;
import javax.jms.*;
import javax.management.ObjectName;
import javax.naming.Context;
import javax.naming.InitialContext;
import junit.textui.TestRunner;
import org.apache.log4j.Category;
import org.jboss.test.JBossTestCase;
import EDU.oswego.cs.dl.util.concurrent.Semaphore;
public class InvocationLayerStressTest extends JBossTestCase
{
Context context;
QueueConnection queueConnection;
TopicConnection topicConnection;
static final int WORKER_COUNT = Integer.parseInt(System.getProperty("jbosstest.threadcount", "10"));
static final int MESSAGE_COUNT = Integer.parseInt(System.getProperty("jbosstest.iterationcount", "500"));
Semaphore exitSemaphore;
public InvocationLayerStressTest(String name) throws Exception
{
super(name);
}
public void createQueue(String name)
{
try
{
ObjectName objn = new ObjectName("jboss.mq:service=DestinationManager");
getServer().invoke(objn, "createQueue", new Object[]{name, name}, new String[]{String.class.getName(), String.class.getName()});
}
catch (Exception e)
{
e.printStackTrace();
}
}
public void createTopic(String name)
{
try
{
ObjectName objn = new ObjectName("jboss.mq:service=DestinationManager");
getServer().invoke(objn, "createTopic", new Object[]{name, name}, new String[]{String.class.getName(), String.class.getName()});
}
catch (Exception e)
{
e.printStackTrace();
}
}
public void deleteQueue(String name)
{
try
{
ObjectName objn = new ObjectName("jboss.mq:service=DestinationManager");
getServer().invoke(objn, "destroyQueue", new Object[]{name}, new String[]{String.class.getName()});
}
catch (Exception e)
{
e.printStackTrace();
}
}
public void deleteTopic(String name)
{
try
{
ObjectName objn = new ObjectName("jboss.mq:service=DestinationManager");
getServer().invoke(objn, "destroyTopic", new Object[]{name}, new String[]{String.class.getName()});
}
catch (Exception e)
{
e.printStackTrace();
}
}
protected void connect(String queueLoc, String topicLoc) throws Exception
{
context = new InitialContext();
QueueConnectionFactory queueFactory = (QueueConnectionFactory) context.lookup(queueLoc);
queueConnection = queueFactory.createQueueConnection();
TopicConnectionFactory topicFactory = (TopicConnectionFactory) context.lookup(topicLoc);
topicConnection = topicFactory.createTopicConnection();
}
protected void disconnect() throws Exception
{
queueConnection.close();
topicConnection.close();
}
class QueueWorker extends Thread
{
String queueName;
Throwable exception;
Object signal = new Object();
Category log = Category.getInstance(QueueWorker.class);
QueueWorker(String queueName, String ilType)
{
super(queueName);
this.queueName = queueName;
this.log = Category.getInstance("QueueWorker."+queueName+"."+ilType);
}
public void run()
{
log.info("QueueWorker Running: " + queueName);
try
{
work();
}
catch (Throwable e)
{
exception = e;
log.error("Exception:", e);
}
log.debug("Notifying main thread: ");
exitSemaphore.release();
log.info("QueueWorker Done: " + queueName);
}
void work() throws Exception
{
createQueue(queueName);
QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue) context.lookup(queueName);
QueueSender sender = session.createSender(queue);
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Message message = session.createTextMessage("Test Message");
for (int i = 0; i < MESSAGE_COUNT; i++)
{
sender.send(message);
log.debug("Sent message " + i + " to queue :" + queueName);
}
QueueReceiver receiver = session.createReceiver(queue);
for (int i = 0; i < MESSAGE_COUNT; i++)
{
message = receiver.receive(5000);
log.debug("Received message " + i + " from queue :" + queueName);
if( message == null )
fail("Received of msg timedout");
}
session.close();
deleteQueue(queueName);
}
}
}