org.jboss.messaging.core
Class ChannelSupport

java.lang.Object
  extended byorg.jboss.messaging.core.ChannelSupport
All Implemented Interfaces:
Channel, DeliveryObserver, Distributor, Receiver
Direct Known Subclasses:
Pipe, Queue

public abstract class ChannelSupport
extends java.lang.Object
implements Channel

Channel implementation. It supports atomicity, isolation and recoverability of reliable messages. The channel implementation here uses a "SEDA-type" approach, where requests to handle messages, deliver to receivers or acknowledge messages are not executed concurrently but placed on an event queue and executed serially by a single thread. This prevents lock contention since requests are executed serially, resulting in better scalability and higher throughput at the expense of some latency.

Version:
$Revision: 2238 $ $Id: ChannelSupport.java 2238 2007-02-09 12:31:23Z ovidiu.feodorov@jboss.com $
Author:
Ovidiu Feodorov, Tim Fox

Field Summary
protected  boolean acceptReliableMessages
           
protected  long channelID
           
protected  java.util.Set deliveries
           
protected  java.util.List downCache
           
protected  int downCacheSize
           
protected  QueuedExecutor executor
           
protected  int fullSize
           
protected  SynchronizedLong messageOrdering
           
protected  PrioritizedDeque messageRefs
           
protected  MemoryManager mm
           
protected  MessageStore ms
           
protected  int pageSize
           
protected  boolean paging
           
protected  PersistenceManager pm
           
protected  boolean receiversReady
           
protected  boolean recoverable
           
protected  int refsInStorage
           
protected  Router router
           
 
Constructor Summary
protected ChannelSupport(long channelID, MessageStore ms, PersistenceManager pm, MemoryManager mm, boolean acceptReliableMessages, boolean recoverable, int fullSize, int pageSize, int downCacheSize, QueuedExecutor executor)
           
 
Method Summary
 boolean acceptReliableMessages()
          A non-recoverable channel cannot guarantee recoverability for reliable messages so by default it won't accept reliable messages.
 void acknowledge(Delivery d, Transaction tx)
           
protected  boolean acknowledgeInMemory(Delivery d)
           
protected  void acknowledgeInternal(Delivery d)
           
 boolean add(Receiver r)
          Add a local receiver to this distributor.
protected  void addReferenceInMemory(MessageReference ref)
           
protected  void addToDownCache(MessageReference ref)
           
 java.util.List browse()
           
 java.util.List browse(Filter filter)
           
 void cancel(Delivery d)
           
protected  void cancelInternal(Delivery del)
           
protected  void checkMemory()
           
 void clear()
          Clears non-recoverable state but not persisted state, so a recovery of the channel is possible TODO really?
 void close()
          Close the channel
 boolean contains(Receiver r)
           
 void deliver(boolean synchronous)
          Delivers as many references as possible to it's router until no more deliveries are returned
 java.util.List delivering(Filter filter)
          Get a list of message references of messages in the process of being delivered, subject to the filter
protected  void deliverInternal()
           
 int downCacheCount()
           
protected  void flushDownCache()
           
protected  org.jboss.messaging.core.ChannelSupport.InMemoryCallback getCallback(Transaction tx)
           
 long getChannelID()
           
 Delivery handle(DeliveryObserver sender, Routable r, Transaction tx)
          A receiver can return an active, "done" or null delivery.
protected  Delivery handleInternal(DeliveryObserver sender, Routable r, Transaction tx)
           
 boolean isPaging()
           
 boolean isRecoverable()
           
 java.util.Iterator iterator()
           
 void load()
          Load the channel state from storage
protected  void load(int number)
           
 int memoryDeliveryCount()
           
 int memoryRefCount()
           
 int messageCount()
          Returns the count of messages stored AND being delivered.
protected  MessageReference obtainReference(Routable r)
           
protected  void processMessageBeforeStorage(MessageReference reference)
          Give subclass a chance to process the message before storing it internally.
 boolean remove(Receiver r)
          Remove a local receiver from this distributor.
 void removeAllReferences()
          Remove all the references in the channel
protected  MessageReference removeFirstInMemory()
           
 java.lang.String toString()
           
 java.util.List undelivered(Filter filter)
          Get a list of message references of messages not in the process of being delivered, subject to the filter
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

channelID

protected long channelID

router

protected Router router

ms

protected MessageStore ms

executor

protected QueuedExecutor executor

receiversReady

protected boolean receiversReady

messageRefs

protected PrioritizedDeque messageRefs

deliveries

protected java.util.Set deliveries

downCache

protected java.util.List downCache

acceptReliableMessages

protected boolean acceptReliableMessages

recoverable

protected boolean recoverable

messageOrdering

protected SynchronizedLong messageOrdering

pm

protected PersistenceManager pm

mm

protected MemoryManager mm

fullSize

protected int fullSize

pageSize

protected int pageSize

downCacheSize

protected int downCacheSize

paging

protected boolean paging

refsInStorage

protected int refsInStorage
Constructor Detail

ChannelSupport

protected ChannelSupport(long channelID,
                         MessageStore ms,
                         PersistenceManager pm,
                         MemoryManager mm,
                         boolean acceptReliableMessages,
                         boolean recoverable,
                         int fullSize,
                         int pageSize,
                         int downCacheSize,
                         QueuedExecutor executor)
Parameters:
acceptReliableMessages - - it only makes sense if tl is null. Otherwise ignored (a recoverable channel always accepts reliable messages)
Method Detail

handle

public Delivery handle(DeliveryObserver sender,
                       Routable r,
                       Transaction tx)
Description copied from interface: Receiver
A receiver can return an active, "done" or null delivery. The method returns null in case the receiver doesn't accept the message. The return value is unspecified when the message is submitted in the context of a transaction (tx not null).

Specified by:
handle in interface Receiver
Parameters:
sender - - the component the delivery should be acknowledged to.
See Also:
Delivery, DeliveryObserver

acknowledge

public void acknowledge(Delivery d,
                        Transaction tx)
                 throws java.lang.Throwable
Specified by:
acknowledge in interface DeliveryObserver
Throws:
java.lang.Throwable

cancel

public void cancel(Delivery d)
            throws java.lang.Throwable
Specified by:
cancel in interface DeliveryObserver
Throws:
java.lang.Throwable

add

public boolean add(Receiver r)
Description copied from interface: Distributor
Add a local receiver to this distributor.

Specified by:
add in interface Distributor
Returns:
true if the distributor did not already contain the specified receiver and the receiver was added to the distributor, false otherwise.

remove

public boolean remove(Receiver r)
Description copied from interface: Distributor
Remove a local receiver from this distributor.

Specified by:
remove in interface Distributor
Returns:
true if this distributor contained the specified receiver.

clear

public void clear()
Description copied from interface: Channel
Clears non-recoverable state but not persisted state, so a recovery of the channel is possible TODO really?

Specified by:
clear in interface Channel

contains

public boolean contains(Receiver r)
Specified by:
contains in interface Distributor

iterator

public java.util.Iterator iterator()
Specified by:
iterator in interface Distributor
Returns:
an iterator of local receivers

getChannelID

public long getChannelID()
Specified by:
getChannelID in interface Channel
Returns:
the unique ID of the channel

isRecoverable

public boolean isRecoverable()
Specified by:
isRecoverable in interface Channel
Returns:
true if the channel can guarantee recoverability for reliable messages. Recoverability is not guaranteed for non-reliable messages (and should not be provided by default, for performance reasons), even if the channel is recoverable.

acceptReliableMessages

public boolean acceptReliableMessages()
Description copied from interface: Channel
A non-recoverable channel cannot guarantee recoverability for reliable messages so by default it won't accept reliable messages. However, there are situations when discarding a reliable message is acceptable for a specific instance of a channel, so it should be a way to configure the channel to do so. A channel indicates unequivocally whether it accepts reliable messages or not returning true or false as result of this method. A recoverable channel must always accept reliable messages, so this method must always return true for a recoverable channel.

Specified by:
acceptReliableMessages in interface Channel
Returns:
false if the channel doesn't accept reliable messages.
See Also:
State#acceptReliableMessages()

browse

public java.util.List browse()
Specified by:
browse in interface Channel
Returns:
a List containing messages being held by the channel. The list includes messages in process of being delivered and messages for which delivery hasn't been attempted yet.

browse

public java.util.List browse(Filter filter)
Specified by:
browse in interface Channel
Parameters:
filter - - may be null, in which case no filter is applied.
Returns:
a List containing message references of messages whose state is maintained by this State instance. The list includes references of messages in process of being delivered and references of messages for which delivery has not been attempted yet.

deliver

public void deliver(boolean synchronous)
Description copied from interface: Channel
Delivers as many references as possible to it's router until no more deliveries are returned

Specified by:
deliver in interface Channel

close

public void close()
Description copied from interface: Channel
Close the channel

Specified by:
close in interface Channel

removeAllReferences

public void removeAllReferences()
                         throws java.lang.Throwable
Description copied from interface: Channel
Remove all the references in the channel

Specified by:
removeAllReferences in interface Channel
Throws:
java.lang.Throwable

load

public void load()
          throws java.lang.Exception
Description copied from interface: Channel
Load the channel state from storage

Specified by:
load in interface Channel
Throws:
java.lang.Exception

delivering

public java.util.List delivering(Filter filter)
Description copied from interface: Channel
Get a list of message references of messages in the process of being delivered, subject to the filter

Specified by:
delivering in interface Channel
Parameters:
filter -
Returns:
the list

undelivered

public java.util.List undelivered(Filter filter)
Description copied from interface: Channel
Get a list of message references of messages not in the process of being delivered, subject to the filter

Specified by:
undelivered in interface Channel
Parameters:
filter -
Returns:
the list

messageCount

public int messageCount()
Returns the count of messages stored AND being delivered.

Specified by:
messageCount in interface Channel
Returns:
message amount.

memoryRefCount

public int memoryRefCount()

memoryDeliveryCount

public int memoryDeliveryCount()

downCacheCount

public int downCacheCount()

isPaging

public boolean isPaging()

toString

public java.lang.String toString()

deliverInternal

protected void deliverInternal()

handleInternal

protected Delivery handleInternal(DeliveryObserver sender,
                                  Routable r,
                                  Transaction tx)

acknowledgeInternal

protected void acknowledgeInternal(Delivery d)
                            throws java.lang.Exception
Throws:
java.lang.Exception

cancelInternal

protected void cancelInternal(Delivery del)
                       throws java.lang.Exception
Throws:
java.lang.Exception

removeFirstInMemory

protected MessageReference removeFirstInMemory()
                                        throws java.lang.Exception
Throws:
java.lang.Exception

obtainReference

protected MessageReference obtainReference(Routable r)

checkMemory

protected void checkMemory()

addReferenceInMemory

protected void addReferenceInMemory(MessageReference ref)
                             throws java.lang.Exception
Throws:
java.lang.Exception

addToDownCache

protected void addToDownCache(MessageReference ref)
                       throws java.lang.Exception
Throws:
java.lang.Exception

flushDownCache

protected void flushDownCache()
                       throws java.lang.Exception
Throws:
java.lang.Exception

acknowledgeInMemory

protected boolean acknowledgeInMemory(Delivery d)

load

protected void load(int number)
             throws java.lang.Exception
Throws:
java.lang.Exception

getCallback

protected org.jboss.messaging.core.ChannelSupport.InMemoryCallback getCallback(Transaction tx)

processMessageBeforeStorage

protected void processMessageBeforeStorage(MessageReference reference)
Give subclass a chance to process the message before storing it internally. Useful to get rid of the REMOTE_ROUTABLE header in a distributed case, for example.



Copyright © 2006 JBoss Inc. All Rights Reserved.