package org.jboss.messaging.channel.plugins.handler;
import org.jboss.messaging.interfaces.*;
import org.jboss.messaging.interfaces.Consumer;
import org.jboss.messaging.interfaces.MessageReference;
public abstract class AbstractChannelHandler implements ChannelHandler
{
protected MessageSet messages;
public AbstractChannelHandler(MessageSet messages)
{
this.messages = messages;
messages.setConsumer(this);
}
public boolean accepts(MessageReference reference, boolean active)
{
return true;
}
public void onMessage(MessageReference reference)
{
Consumer consumer;
messages.lock();
try
{
consumer = findConsumer(reference);
}
finally
{
messages.unlock();
}
if (consumer != null)
consumer.onMessage(reference);
}
public void addMessage(MessageReference reference)
{
Consumer consumer;
messages.lock();
try
{
consumer = findConsumer(reference);
if (consumer == null)
messages.add(reference);
}
finally
{
messages.unlock();
}
if (consumer != null)
consumer.onMessage(reference);
}
public MessageReference removeMessage(Consumer consumer)
{
messages.lock();
try
{
return messages.remove(consumer);
}
finally
{
messages.unlock();
}
}
public void waitMessage(Consumer consumer, long wait)
{
MessageReference message;
messages.lock();
try
{
message = messages.remove(consumer);
if (message == null)
addConsumer(consumer, wait);
}
finally
{
messages.unlock();
}
if (message != null)
consumer.onMessage(message);
}
public void stopWaitMessage(Consumer consumer)
{
messages.lock();
try
{
removeConsumer(consumer);
}
finally
{
messages.unlock();
}
}
protected abstract void addConsumer(Consumer consumer, long wait);
protected abstract void removeConsumer(Consumer consumer);
protected abstract Consumer findConsumer(MessageReference reference);
}