|
||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Object java.util.concurrent.AbstractExecutorService java.util.concurrent.ThreadPoolExecutor org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor
public class OrderedMemoryAwareThreadPoolExecutor
A MemoryAwareThreadPoolExecutor
which maintains the
ChannelEvent
order for the same Channel
.
OrderedMemoryAwareThreadPoolExecutor
executes the queued task in the
same thread if an existing thread is running a task associated with the same
Channel
. This behavior is to make sure the events from the same
Channel
are handled in a correct order. A different thread might be
chosen only when the task queue of the Channel
is empty.
Although OrderedMemoryAwareThreadPoolExecutor
guarantees the order
of ChannelEvent
s. It does not guarantee that the invocation will be
made by the same thread for the same channel, but it does guarantee that
the invocation will be made sequentially for the events of the same channel.
For example, the events can be processed as depicted below:
-----------------------------------> Timeline -----------------------------------> Thread X: --- Channel A (Event 1) --. .-- Channel B (Event 2) --- Channel B (Event 3) ---> \ / X / \ Thread Y: --- Channel B (Event 1) --' '-- Channel A (Event 2) --- Channel A (Event 3) --->Please note that the events of different channels are independent from each other. That is, an event of Channel B will not be blocked by an event of Channel A and vice versa, unless the thread pool is exhausted.
If you want the events associated with the same channel to be executed
simultaneously, please use MemoryAwareThreadPoolExecutor
instead.
Channel
to maintain event order
OrderedMemoryAwareThreadPoolExecutor
uses a Channel
as a key
that is used for maintaining the event execution order, as explained in the
previous section. Alternatively, you can extend it to change its behavior.
For example, you can change the key to the remote IP of the peer:
public class RemoteAddressBasedOMATPE extends OrderedMemoryAwareThreadPoolExecutor {
... Constructors ...
protected ConcurrentMap<Object, Executor> new ChildExecutorMap() {
// The default implementation returns a special ConcurrentMap that
// uses identity comparison only (see IdentityHashMap
).
// Because SocketAddress does not work with identity comparison,
// we need to employ more generic implementation.
return new ConcurrentHashMap<Object, Executor>
}
protected Object getChildExecutorKey(ChannelEvent e) {
// Use the IP of the remote peer as a key.
return ((InetSocketAddress) e.getChannel().getRemoteAddress()).getAddress();
}
// Make public so that you can call from anywhere.
public boolean removeChildExecutor(Object key) {
super.removeChildExecutor(key);
}
}
Please be very careful of memory leak of the child executor map. You must
call removeChildExecutor(Object)
when the life cycle of the key
ends (e.g. all connections from the same IP were closed.) Also, please
keep in mind that the key can appear again after calling removeChildExecutor(Object)
(e.g. a new connection could come in from the same old IP after removal.)
If in doubt, prune the old unused or stall keys from the child executor map
periodically:
RemoteAddressBasedOMATPE executor = ...; on every 3 seconds: for (Iterator<Object> i = executor.getChildExecutorKeySet().iterator; i.hasNext();) { InetAddress ip = (InetAddress) i.next(); if (there is no active connection from 'ip' now && there has been no incoming connection from 'ip' for last 10 minutes) { i.remove(); } }If the expected maximum number of keys is small and deterministic, you could use a weak key map such as ConcurrentWeakHashMap or synchronized
WeakHashMap
instead of managing the life cycle of the
keys by yourself.
Nested Class Summary |
---|
Nested classes/interfaces inherited from class java.util.concurrent.ThreadPoolExecutor |
---|
ThreadPoolExecutor.AbortPolicy, ThreadPoolExecutor.CallerRunsPolicy, ThreadPoolExecutor.DiscardOldestPolicy, ThreadPoolExecutor.DiscardPolicy |
Constructor Summary | |
---|---|
OrderedMemoryAwareThreadPoolExecutor(int corePoolSize,
long maxChannelMemorySize,
long maxTotalMemorySize)
Creates a new instance. |
|
OrderedMemoryAwareThreadPoolExecutor(int corePoolSize,
long maxChannelMemorySize,
long maxTotalMemorySize,
long keepAliveTime,
TimeUnit unit)
Creates a new instance. |
|
OrderedMemoryAwareThreadPoolExecutor(int corePoolSize,
long maxChannelMemorySize,
long maxTotalMemorySize,
long keepAliveTime,
TimeUnit unit,
ObjectSizeEstimator objectSizeEstimator,
ThreadFactory threadFactory)
Creates a new instance. |
|
OrderedMemoryAwareThreadPoolExecutor(int corePoolSize,
long maxChannelMemorySize,
long maxTotalMemorySize,
long keepAliveTime,
TimeUnit unit,
ThreadFactory threadFactory)
Creates a new instance. |
Method Summary | |
---|---|
protected void |
doExecute(Runnable task)
Executes the specified task concurrently while maintaining the event order. |
protected Object |
getChildExecutorKey(ChannelEvent e)
|
protected Set<Object> |
getChildExecutorKeySet()
|
protected ConcurrentMap<Object,Executor> |
newChildExecutorMap()
|
protected boolean |
removeChildExecutor(Object key)
|
protected boolean |
shouldCount(Runnable task)
Returns true if and only if the specified task should
be counted to limit the global and per-channel memory consumption. |
Methods inherited from class org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor |
---|
beforeExecute, decreaseCounter, doUnorderedExecute, execute, getMaxChannelMemorySize, getMaxTotalMemorySize, getObjectSizeEstimator, increaseCounter, remove, setMaxChannelMemorySize, setMaxTotalMemorySize, setObjectSizeEstimator, terminated |
Methods inherited from class java.util.concurrent.AbstractExecutorService |
---|
invokeAll, invokeAll, invokeAny, invokeAny, newTaskFor, newTaskFor, submit, submit, submit |
Methods inherited from class java.lang.Object |
---|
clone, equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Constructor Detail |
---|
public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize)
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel.
Specify 0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool
Specify 0
to disable.public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit)
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel.
Specify 0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool
Specify 0
to disable.keepAliveTime
- the amount of time for an inactive thread to shut itself downunit
- the TimeUnit
of keepAliveTime
public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory)
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel.
Specify 0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool
Specify 0
to disable.keepAliveTime
- the amount of time for an inactive thread to shut itself downunit
- the TimeUnit
of keepAliveTime
threadFactory
- the ThreadFactory
of this poolpublic OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory)
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel.
Specify 0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool
Specify 0
to disable.keepAliveTime
- the amount of time for an inactive thread to shut itself downunit
- the TimeUnit
of keepAliveTime
threadFactory
- the ThreadFactory
of this poolobjectSizeEstimator
- the ObjectSizeEstimator
of this poolMethod Detail |
---|
protected ConcurrentMap<Object,Executor> newChildExecutorMap()
protected Object getChildExecutorKey(ChannelEvent e)
protected Set<Object> getChildExecutorKeySet()
protected boolean removeChildExecutor(Object key)
protected void doExecute(Runnable task)
doExecute
in class MemoryAwareThreadPoolExecutor
protected boolean shouldCount(Runnable task)
MemoryAwareThreadPoolExecutor
true
if and only if the specified task
should
be counted to limit the global and per-channel memory consumption.
To override this method, you must call super.shouldCount()
to
make sure important tasks are not counted.
shouldCount
in class MemoryAwareThreadPoolExecutor
|
||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |