package org.jboss.test.jbossmq.test;
import org.jboss.test.JBossTestCase;
import org.jboss.test.JBossTestSetup;
import org.apache.log4j.Category;
import junit.framework.TestSuite;
import junit.framework.Test;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
import javax.jms.Message;
import javax.jms.TopicConnectionFactory;
import javax.jms.QueueConnectionFactory;
import javax.jms.MessageListener;
import javax.jms.QueueSender;
import javax.jms.ObjectMessage;
import javax.jms.JMSException;
public class JBossSessionRecoverUnitTestCase
extends JBossTestCase
{
String QUEUE_FACTORY = "ConnectionFactory";
String TEST_QUEUE = "queue/testQueue";
Context context;
QueueConnection queueConnection;
QueueSession session;
int counter=0;
Exception exception=null;
public JBossSessionRecoverUnitTestCase(String name) throws Exception
{
super(name);
}
protected void setUp()
throws Exception
{
this.getLog().debug("JBossSessionRecoverUnitTestCase, ConnectionFactory started");
}
protected void tearDown() throws Exception
{
this.getLog().debug("JBossSessionRecoverUnitTestCase, ConnectionFactory done");
}
private void drainQueue() throws Exception
{
QueueSession session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue)context.lookup(TEST_QUEUE);
QueueReceiver receiver = session.createReceiver(queue);
Message message = receiver.receive( 1000 );
int c=0;
while( message != null )
{
message = receiver.receive( 1000 );
c++;
}
if( c!=0 )
getLog().debug(" Drained "+c+" messages from the queue");
session.close();
}
static public void main ( String []args )
{
String newArgs[] = { "org.jboss.test.jbossmq.test.JBossSessionRecoverUnitTestCase" };
junit.swingui.TestRunner.main(newArgs);
}
protected void connect() throws Exception
{
if( context == null )
{
context = new InitialContext();
}
QueueConnectionFactory queueFactory = (QueueConnectionFactory) context.lookup(QUEUE_FACTORY);
queueConnection = queueFactory.createQueueConnection();
getLog().debug("Connection to JBossMQ established.");
}
public void testQueueSessionRecovermessageListener() throws Exception
{
counter = 0;
getLog().debug("Starting session.recover() Message Listener test");
connect();
queueConnection.start();
drainQueue();
session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue)context.lookup(TEST_QUEUE);
QueueSender sender = session.createSender(queue);
for ( int i=0; i<20; i++ )
{
sender.send(session.createObjectMessage(new Integer(i)));
}
session.close();
queueConnection.stop();
session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE );
QueueReceiver receiver = session.createReceiver( queue );
MessageListener messagelistener = new MessageListener()
{
public void onMessage(Message message)
{
processMessage( message );
}
};
receiver.setMessageListener( messagelistener );
queueConnection.start();
while ( counter < 40 && exception == null )
{
try
{
Thread.sleep( 500 );
}
catch ( InterruptedException ie )
{
}
}
if ( exception != null )
{
queueConnection.close();
throw exception;
}
queueConnection.close();
getLog().debug("session.recover() Message Listener passed");
}
private void processMessage ( Message message )
{
try
{
if ( message instanceof ObjectMessage )
{
counter++;
ObjectMessage objectmessage = (ObjectMessage)message;
Integer integer = (Integer)objectmessage.getObject();
int mynumber = integer.intValue();
getLog().debug("message object " + integer + " counter=" + counter );
if ( mynumber == 19 )
{
if (counter == 20)
{
session.recover();
}
else
{
message.acknowledge();
}
}
}
}
catch ( JMSException e )
{
exception = e;
}
}
class Synch
{
boolean waiting = false;
public synchronized void doWait(long timeout)
throws InterruptedException
{
waiting = true;
this.wait(timeout);
}
public synchronized void doNotify()
throws InterruptedException
{
while (waiting == false)
wait(100);
this.notifyAll();
}
}
public void testQueueSessionRecoverMessageListenerOrder()
throws Exception
{
counter = 0;
exception = null;
getLog().debug("Starting session.recover() Message Listener Order test");
connect();
queueConnection.start();
drainQueue();
session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue)context.lookup(TEST_QUEUE);
QueueSender sender = session.createSender(queue);
for (int i=0; i<4; ++i)
{
sender.send(session.createObjectMessage(new Integer(i)));
}
QueueReceiver receiver = session.createReceiver( queue );
final Synch synch = new Synch();
MessageListener messagelistener = new MessageListener()
{
public void onMessage(Message message)
{
checkMessagesInOrder(session, message, synch);
}
};
receiver.setMessageListener( messagelistener );
queueConnection.start();
synch.doWait(10000);
if ( exception != null )
{
queueConnection.close();
throw exception;
}
queueConnection.close();
getLog().debug("session.recover() Message Listener Order passed");
}
private void checkMessagesInOrder(Session session, Message message, Synch synch)
{
try
{
ObjectMessage objectmessage = (ObjectMessage)message;
Integer integer = (Integer)objectmessage.getObject();
int mynumber = integer.intValue();
if (message.getJMSRedelivered() == false)
{
log.debug("Recovering " + mynumber);
session.recover();
return;
}
log.debug("Checking " + mynumber);
assertTrue("Expected messages in order", mynumber == counter);
counter++;
if (counter == 4)
synch.doNotify();
}
catch (Exception e)
{
exception = e;
}
}
public void testQueueSessionRecoverReceive() throws Exception
{
counter = 0;
getLog().debug("Starting session.recover() receive test");
connect();
queueConnection.start();
drainQueue();
session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue)context.lookup(TEST_QUEUE);
QueueSender sender = session.createSender(queue);
for ( int i=0; i<20; i++ )
{
sender.send(session.createObjectMessage(new Integer(i)));
}
session.close();
queueConnection.stop();
session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE );
QueueReceiver receiver = session.createReceiver( queue );
queueConnection.start();
Message message = receiver.receive( 1000 );
int messagecounter=0;
while( message != null )
{
message = receiver.receive( 1000 );
messagecounter++;
}
if ( messagecounter != 20 )
{
throw new Exception ( "Not all sent messages were delivered! messagecounter=" + messagecounter );
}
session.recover();
message = receiver.receive();
messagecounter=0;
while( message != null )
{
if ( !message.getJMSRedelivered() )
{
throw new Exception ( "Message was not marked as redelivered! messagecounter=" + messagecounter );
}
message.acknowledge();
messagecounter++;
if ( messagecounter < 15 )
{
message = receiver.receive();
}
else
{
message = receiver.receive ( 1000 );
}
}
if ( messagecounter != 20 )
{
throw new Exception ( "Not all unacknowledged messages were redelivered! messagecounter=" + messagecounter );
}
queueConnection.close();
getLog().debug("session.recover() receive passed");
}
public void testQueueSessionRecoverReceiveTimeout() throws Exception
{
counter = 0;
getLog().debug("Starting session.recover() receive(timeout) test");
connect();
queueConnection.start();
drainQueue();
session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue)context.lookup(TEST_QUEUE);
QueueSender sender = session.createSender(queue);
for ( int i=0; i<20; i++ )
{
sender.send(session.createObjectMessage(new Integer(i)));
}
session.close();
queueConnection.stop();
session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE );
QueueReceiver receiver = session.createReceiver( queue );
queueConnection.start();
Message message = receiver.receive( 1000 );
int messagecounter=0;
while( message != null )
{
message = receiver.receive( 1000 );
messagecounter++;
}
if ( messagecounter != 20 )
{
throw new Exception ( "Not all sent messages were delivered! messagecounter=" + messagecounter );
}
session.recover();
message = receiver.receive(1000);
messagecounter=0;
while( message != null )
{
if ( !message.getJMSRedelivered() )
{
throw new Exception ( "Message was not marked as redelivered! messagecounter=" + messagecounter );
}
message.acknowledge();
messagecounter++;
message = receiver.receive( 1000 );
}
if ( messagecounter != 20 )
{
throw new Exception ( "Not all unacknowledged messages were redelivered! messagecounter=" + messagecounter );
}
queueConnection.close();
getLog().debug("session.recover() receive(timeout) passed");
}
public void testQueueSessionRecoverReceiveNoWait() throws Exception
{
counter = 0;
getLog().debug("Starting session.recover() receiveNoWait test");
connect();
queueConnection.start();
drainQueue();
session = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue)context.lookup(TEST_QUEUE);
QueueSender sender = session.createSender(queue);
for ( int i=0; i<20; i++ )
{
sender.send(session.createObjectMessage(new Integer(i)));
}
session.close();
queueConnection.stop();
session = queueConnection.createQueueSession( false, Session.CLIENT_ACKNOWLEDGE );
QueueReceiver receiver = session.createReceiver( queue );
queueConnection.start();
Message message = receiver.receiveNoWait();
int messagecounter=0;
while( message != null )
{
message = receiver.receiveNoWait();
messagecounter++;
}
if ( messagecounter != 20 )
{
throw new Exception ( "Not all sent messages were delivered! messagecounter=" + messagecounter );
}
session.recover();
message = receiver.receiveNoWait();
messagecounter=0;
while( message != null )
{
if ( !message.getJMSRedelivered() )
{
throw new Exception ( "Message was not marked as redelivered! messagecounter=" + messagecounter );
}
message.acknowledge();
messagecounter++;
message = receiver.receiveNoWait();
}
if ( messagecounter != 20 )
{
throw new Exception ( "Not all unacknowledged messages were redelivered! messagecounter=" + messagecounter );
}
queueConnection.close();
getLog().debug("session.recover() receiveNoWait passed");
}
}