org.jboss.netty.handler.execution
Class OrderedMemoryAwareThreadPoolExecutor

java.lang.Object
  extended by java.util.concurrent.AbstractExecutorService
      extended by java.util.concurrent.ThreadPoolExecutor
          extended by org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor
              extended by org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor
All Implemented Interfaces:
Executor, ExecutorService

public class OrderedMemoryAwareThreadPoolExecutor
extends MemoryAwareThreadPoolExecutor

A MemoryAwareThreadPoolExecutor which maintains the ChannelEvent order for the same Channel.

OrderedMemoryAwareThreadPoolExecutor executes the queued task in the same thread if an existing thread is running a task associated with the same Channel. This behavior is to make sure the events from the same Channel are handled in a correct order. A different thread might be chosen only when the task queue of the Channel is empty.

Although OrderedMemoryAwareThreadPoolExecutor guarantees the order of ChannelEvents. It does not guarantee that the invocation will be made by the same thread for the same channel, but it does guarantee that the invocation will be made sequentially for the events of the same channel. For example, the events can be processed as depicted below:

           -----------------------------------> Timeline ----------------------------------->

 Thread X: --- Channel A (Event 1) --.   .-- Channel B (Event 2) --- Channel B (Event 3) --->
                                      \ /
                                       X
                                      / \
 Thread Y: --- Channel B (Event 1) --'   '-- Channel A (Event 2) --- Channel A (Event 3) --->
 
Please note that the events of different channels are independent from each other. That is, an event of Channel B will not be blocked by an event of Channel A and vice versa, unless the thread pool is exhausted.

If you want the events associated with the same channel to be executed simultaneously, please use MemoryAwareThreadPoolExecutor instead.

Using a different key other than Channel to maintain event order

OrderedMemoryAwareThreadPoolExecutor uses a Channel as a key that is used for maintaining the event execution order, as explained in the previous section. Alternatively, you can extend it to change its behavior. For example, you can change the key to the remote IP of the peer:

 public class RemoteAddressBasedOMATPE extends OrderedMemoryAwareThreadPoolExecutor {

     ... Constructors ...

     protected ConcurrentMap<Object, Executor> new ChildExecutorMap() {
         // The default implementation returns a special ConcurrentMap that
         // uses identity comparison only (see IdentityHashMap).
         // Because SocketAddress does not work with identity comparison,
         // we need to employ more generic implementation.
         return new ConcurrentHashMap<Object, Executor>
     }

     protected Object getChildExecutorKey(ChannelEvent e) {
         // Use the IP of the remote peer as a key.
         return ((InetSocketAddress) e.getChannel().getRemoteAddress()).getAddress();
     }

     // Make public so that you can call from anywhere.
     public boolean removeChildExecutor(Object key) {
         super.removeChildExecutor(key);
     }
 }
 
Please be very careful of memory leak of the child executor map. You must call removeChildExecutor(Object) when the life cycle of the key ends (e.g. all connections from the same IP were closed.) Also, please keep in mind that the key can appear again after calling removeChildExecutor(Object) (e.g. a new connection could come in from the same old IP after removal.) If in doubt, prune the old unused or stall keys from the child executor map periodically:
 RemoteAddressBasedOMATPE executor = ...;

 on every 3 seconds:

   for (Iterator<Object> i = executor.getChildExecutorKeySet().iterator; i.hasNext();) {
       InetAddress ip = (InetAddress) i.next();
       if (there is no active connection from 'ip' now &&
           there has been no incoming connection from 'ip' for last 10 minutes) {
           i.remove();
       }
   }
 
If the expected maximum number of keys is small and deterministic, you could use a weak key map such as ConcurrentWeakHashMap or synchronized WeakHashMap instead of managing the life cycle of the keys by yourself.

Version:
$Rev: 1685 $, $Date: 2009-08-28 16:15:49 +0900 (금, 28 8 2009) $
Author:
The Netty Project (netty-dev@lists.jboss.org), Trustin Lee (tlee@redhat.com), David M. Lloyd (david.lloyd@redhat.com)

Nested Class Summary
 
Nested classes/interfaces inherited from class java.util.concurrent.ThreadPoolExecutor
ThreadPoolExecutor.AbortPolicy, ThreadPoolExecutor.CallerRunsPolicy, ThreadPoolExecutor.DiscardOldestPolicy, ThreadPoolExecutor.DiscardPolicy
 
Constructor Summary
OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize)
          Creates a new instance.
OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit)
          Creates a new instance.
OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory)
          Creates a new instance.
OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory)
          Creates a new instance.
 
Method Summary
protected  void doExecute(Runnable task)
          Executes the specified task concurrently while maintaining the event order.
protected  Object getChildExecutorKey(ChannelEvent e)
           
protected  Set<Object> getChildExecutorKeySet()
           
protected  ConcurrentMap<Object,Executor> newChildExecutorMap()
           
protected  boolean removeChildExecutor(Object key)
           
protected  boolean shouldCount(Runnable task)
          Returns true if and only if the specified task should be counted to limit the global and per-channel memory consumption.
 
Methods inherited from class org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor
beforeExecute, decreaseCounter, doUnorderedExecute, execute, getMaxChannelMemorySize, getMaxTotalMemorySize, getObjectSizeEstimator, increaseCounter, remove, setMaxChannelMemorySize, setMaxTotalMemorySize, setObjectSizeEstimator, terminated
 
Methods inherited from class java.util.concurrent.ThreadPoolExecutor
afterExecute, allowCoreThreadTimeOut, allowsCoreThreadTimeOut, awaitTermination, finalize, getActiveCount, getCompletedTaskCount, getCorePoolSize, getKeepAliveTime, getLargestPoolSize, getMaximumPoolSize, getPoolSize, getQueue, getRejectedExecutionHandler, getTaskCount, getThreadFactory, isShutdown, isTerminated, isTerminating, prestartAllCoreThreads, prestartCoreThread, purge, setCorePoolSize, setKeepAliveTime, setMaximumPoolSize, setRejectedExecutionHandler, setThreadFactory, shutdown, shutdownNow
 
Methods inherited from class java.util.concurrent.AbstractExecutorService
invokeAll, invokeAll, invokeAny, invokeAny, newTaskFor, newTaskFor, submit, submit, submit
 
Methods inherited from class java.lang.Object
clone, equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

OrderedMemoryAwareThreadPoolExecutor

public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize,
                                            long maxChannelMemorySize,
                                            long maxTotalMemorySize)
Creates a new instance.

Parameters:
corePoolSize - the maximum number of active threads
maxChannelMemorySize - the maximum total size of the queued events per channel. Specify 0 to disable.
maxTotalMemorySize - the maximum total size of the queued events for this pool Specify 0 to disable.

OrderedMemoryAwareThreadPoolExecutor

public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize,
                                            long maxChannelMemorySize,
                                            long maxTotalMemorySize,
                                            long keepAliveTime,
                                            TimeUnit unit)
Creates a new instance.

Parameters:
corePoolSize - the maximum number of active threads
maxChannelMemorySize - the maximum total size of the queued events per channel. Specify 0 to disable.
maxTotalMemorySize - the maximum total size of the queued events for this pool Specify 0 to disable.
keepAliveTime - the amount of time for an inactive thread to shut itself down
unit - the TimeUnit of keepAliveTime

OrderedMemoryAwareThreadPoolExecutor

public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize,
                                            long maxChannelMemorySize,
                                            long maxTotalMemorySize,
                                            long keepAliveTime,
                                            TimeUnit unit,
                                            ThreadFactory threadFactory)
Creates a new instance.

Parameters:
corePoolSize - the maximum number of active threads
maxChannelMemorySize - the maximum total size of the queued events per channel. Specify 0 to disable.
maxTotalMemorySize - the maximum total size of the queued events for this pool Specify 0 to disable.
keepAliveTime - the amount of time for an inactive thread to shut itself down
unit - the TimeUnit of keepAliveTime
threadFactory - the ThreadFactory of this pool

OrderedMemoryAwareThreadPoolExecutor

public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize,
                                            long maxChannelMemorySize,
                                            long maxTotalMemorySize,
                                            long keepAliveTime,
                                            TimeUnit unit,
                                            ObjectSizeEstimator objectSizeEstimator,
                                            ThreadFactory threadFactory)
Creates a new instance.

Parameters:
corePoolSize - the maximum number of active threads
maxChannelMemorySize - the maximum total size of the queued events per channel. Specify 0 to disable.
maxTotalMemorySize - the maximum total size of the queued events for this pool Specify 0 to disable.
keepAliveTime - the amount of time for an inactive thread to shut itself down
unit - the TimeUnit of keepAliveTime
threadFactory - the ThreadFactory of this pool
objectSizeEstimator - the ObjectSizeEstimator of this pool
Method Detail

newChildExecutorMap

protected ConcurrentMap<Object,Executor> newChildExecutorMap()

getChildExecutorKey

protected Object getChildExecutorKey(ChannelEvent e)

getChildExecutorKeySet

protected Set<Object> getChildExecutorKeySet()

removeChildExecutor

protected boolean removeChildExecutor(Object key)

doExecute

protected void doExecute(Runnable task)
Executes the specified task concurrently while maintaining the event order.

Overrides:
doExecute in class MemoryAwareThreadPoolExecutor

shouldCount

protected boolean shouldCount(Runnable task)
Description copied from class: MemoryAwareThreadPoolExecutor
Returns true if and only if the specified task should be counted to limit the global and per-channel memory consumption. To override this method, you must call super.shouldCount() to make sure important tasks are not counted.

Overrides:
shouldCount in class MemoryAwareThreadPoolExecutor


Copyright © 2008-2009 JBoss, by Red Hat. All Rights Reserved.