| AbstractChannelHandler.java |
/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.messaging.channel.plugins.handler;
import org.jboss.messaging.interfaces.*;
import org.jboss.messaging.interfaces.Consumer;
import org.jboss.messaging.interfaces.MessageReference;
/**
* An abstract channel handler
*
* @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
* @version $Revision: 1.1 $
*/
public abstract class AbstractChannelHandler implements ChannelHandler
{
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
/** The message set */
protected MessageSet messages;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
/**
* Create a new AbstractChannelHandler.
*
* @param messages the messages
*/
public AbstractChannelHandler(MessageSet messages)
{
this.messages = messages;
messages.setConsumer(this);
}
// Public --------------------------------------------------------
// Consumer implementation ---------------------------------------
public boolean accepts(MessageReference reference, boolean active)
{
// We accept all messages
return true;
}
public void onMessage(MessageReference reference)
{
Consumer consumer;
messages.lock();
try
{
consumer = findConsumer(reference);
}
finally
{
messages.unlock();
}
if (consumer != null)
consumer.onMessage(reference);
}
// ChannelHandler implementation ---------------------------------
public void addMessage(MessageReference reference)
{
Consumer consumer;
messages.lock();
try
{
consumer = findConsumer(reference);
if (consumer == null)
messages.add(reference);
}
finally
{
messages.unlock();
}
if (consumer != null)
consumer.onMessage(reference);
}
public MessageReference removeMessage(Consumer consumer)
{
messages.lock();
try
{
return messages.remove(consumer);
}
finally
{
messages.unlock();
}
}
public void waitMessage(Consumer consumer, long wait)
{
MessageReference message;
messages.lock();
try
{
message = messages.remove(consumer);
// Nothing found, wait
if (message == null)
addConsumer(consumer, wait);
}
finally
{
messages.unlock();
}
// We found a message, deliver it
if (message != null)
consumer.onMessage(message);
}
public void stopWaitMessage(Consumer consumer)
{
messages.lock();
try
{
removeConsumer(consumer);
}
finally
{
messages.unlock();
}
}
// Protected -----------------------------------------------------
/**
* Add a consumer
*
* @param consumer the consumer to wait for a message
* @param wait the length of time to wait
*/
protected abstract void addConsumer(Consumer consumer, long wait);
/**
* Remove a consumer
*
* @param consumer the consumer to remove
*/
protected abstract void removeConsumer(Consumer consumer);
/**
* Find a consumer for a message
*
* @param reference the message
* @return the consumer or null if there are none for the message
*/
protected abstract Consumer findConsumer(MessageReference reference);
// Package Private -----------------------------------------------
// Private -------------------------------------------------------
// Inner Classes -------------------------------------------------
}
| AbstractChannelHandler.java |