View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.execution;
17  
18  import java.util.IdentityHashMap;
19  import java.util.LinkedList;
20  import java.util.Set;
21  import java.util.WeakHashMap;
22  import java.util.concurrent.ConcurrentMap;
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.ThreadFactory;
25  import java.util.concurrent.TimeUnit;
26  
27  import org.jboss.netty.channel.Channel;
28  import org.jboss.netty.channel.ChannelEvent;
29  import org.jboss.netty.channel.ChannelState;
30  import org.jboss.netty.channel.ChannelStateEvent;
31  import org.jboss.netty.util.ObjectSizeEstimator;
32  import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap;
33  
34  /**
35   * A {@link MemoryAwareThreadPoolExecutor} which makes sure the events from the
36   * same {@link Channel} are executed sequentially.
37   * <p>
38   * <b>NOTE</b>: This thread pool inherits most characteristics of its super
39   * type, so please make sure to refer to {@link MemoryAwareThreadPoolExecutor}
40   * to understand how it works basically.
41   *
42   * <h3>Event execution order</h3>
43   *
44   * For example, let's say there are two executor threads that handle the events
45   * from the two channels:
46   * <pre>
47   *           -------------------------------------&gt; Timeline ------------------------------------&gt;
48   *
49   * Thread X: --- Channel A (Event A1) --.   .-- Channel B (Event B2) --- Channel B (Event B3) ---&gt;
50   *                                      \ /
51   *                                       X
52   *                                      / \
53   * Thread Y: --- Channel B (Event B1) --'   '-- Channel A (Event A2) --- Channel A (Event A3) ---&gt;
54   * </pre>
55   * As you see, the events from different channels are independent from each
56   * other.  That is, an event of Channel B will not be blocked by an event of
57   * Channel A and vice versa, unless the thread pool is exhausted.
58   * <p>
59   * Also, it is guaranteed that the invocation will be made sequentially for the
60   * events from the same channel.  For example, the event A2 is never executed
61   * before the event A1 is finished.  (Although not recommended, if you want the
62   * events from the same channel to be executed simultaneously, please use
63   * {@link MemoryAwareThreadPoolExecutor} instead.)
64   * <p>
65   * However, it is not guaranteed that the invocation will be made by the same
66   * thread for the same channel.  The events from the same channel can be
67   * executed by different threads.  For example, the Event A2 is executed by the
68   * thread Y while the event A1 was executed by the thread X.
69   *
70   * <h3>Using a different key other than {@link Channel} to maintain event order</h3>
71   * <p>
72   * {@link OrderedMemoryAwareThreadPoolExecutor} uses a {@link Channel} as a key
73   * that is used for maintaining the event execution order, as explained in the
74   * previous section.  Alternatively, you can extend it to change its behavior.
75   * For example, you can change the key to the remote IP of the peer:
76   *
77   * <pre>
78   * public class RemoteAddressBasedOMATPE extends {@link OrderedMemoryAwareThreadPoolExecutor} {
79   *
80   *     ... Constructors ...
81   *
82   *     {@code @Override}
83   *     protected ConcurrentMap&lt;Object, Executor&gt; newChildExecutorMap() {
84   *         // The default implementation returns a special ConcurrentMap that
85   *         // uses identity comparison only (see {@link IdentityHashMap}).
86   *         // Because SocketAddress does not work with identity comparison,
87   *         // we need to employ more generic implementation.
88   *         return new ConcurrentHashMap&lt;Object, Executor&gt;
89   *     }
90   *
91   *     protected Object getChildExecutorKey({@link ChannelEvent} e) {
92   *         // Use the IP of the remote peer as a key.
93   *         return ((InetSocketAddress) e.getChannel().getRemoteAddress()).getAddress();
94   *     }
95   *
96   *     // Make public so that you can call from anywhere.
97   *     public boolean removeChildExecutor(Object key) {
98   *         super.removeChildExecutor(key);
99   *     }
100  * }
101  * </pre>
102  *
103  * Please be very careful of memory leak of the child executor map.  You must
104  * call {@link #removeChildExecutor(Object)} when the life cycle of the key
105  * ends (e.g. all connections from the same IP were closed.)  Also, please
106  * keep in mind that the key can appear again after calling {@link #removeChildExecutor(Object)}
107  * (e.g. a new connection could come in from the same old IP after removal.)
108  * If in doubt, prune the old unused or stall keys from the child executor map
109  * periodically:
110  *
111  * <pre>
112  * RemoteAddressBasedOMATPE executor = ...;
113  *
114  * on every 3 seconds:
115  *
116  *   for (Iterator&lt;Object&gt; i = executor.getChildExecutorKeySet().iterator; i.hasNext();) {
117  *       InetAddress ip = (InetAddress) i.next();
118  *       if (there is no active connection from 'ip' now &&
119  *           there has been no incoming connection from 'ip' for last 10 minutes) {
120  *           i.remove();
121  *       }
122  *   }
123  * </pre>
124  *
125  * If the expected maximum number of keys is small and deterministic, you could
126  * use a weak key map such as <a href="http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/src/jsr166y/ConcurrentWeakHashMap.java?view=markup">ConcurrentWeakHashMap</a>
127  * or synchronized {@link WeakHashMap} instead of managing the life cycle of the
128  * keys by yourself.
129  *
130  * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
131  * @author <a href="http://gleamynode.net/">Trustin Lee</a>
132  * @author David M. Lloyd (david.lloyd@redhat.com)
133  *
134  * @version $Rev: 2308 $, $Date: 2010-06-17 23:23:59 +0900 (Thu, 17 Jun 2010) $
135  *
136  * @apiviz.landmark
137  */
138 public class OrderedMemoryAwareThreadPoolExecutor extends
139         MemoryAwareThreadPoolExecutor {
140 
141     // TODO Make OMATPE focus on the case where Channel is the key.
142     //      Add a new less-efficient TPE that allows custom key.
143 
144     private final ConcurrentMap<Object, Executor> childExecutors = newChildExecutorMap();
145 
146     /**
147      * Creates a new instance.
148      *
149      * @param corePoolSize          the maximum number of active threads
150      * @param maxChannelMemorySize  the maximum total size of the queued events per channel.
151      *                              Specify {@code 0} to disable.
152      * @param maxTotalMemorySize    the maximum total size of the queued events for this pool
153      *                              Specify {@code 0} to disable.
154      */
155     public OrderedMemoryAwareThreadPoolExecutor(
156             int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) {
157         super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize);
158     }
159 
160     /**
161      * Creates a new instance.
162      *
163      * @param corePoolSize          the maximum number of active threads
164      * @param maxChannelMemorySize  the maximum total size of the queued events per channel.
165      *                              Specify {@code 0} to disable.
166      * @param maxTotalMemorySize    the maximum total size of the queued events for this pool
167      *                              Specify {@code 0} to disable.
168      * @param keepAliveTime         the amount of time for an inactive thread to shut itself down
169      * @param unit                  the {@link TimeUnit} of {@code keepAliveTime}
170      */
171     public OrderedMemoryAwareThreadPoolExecutor(
172             int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
173             long keepAliveTime, TimeUnit unit) {
174         super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,
175                 keepAliveTime, unit);
176     }
177 
178     /**
179      * Creates a new instance.
180      *
181      * @param corePoolSize          the maximum number of active threads
182      * @param maxChannelMemorySize  the maximum total size of the queued events per channel.
183      *                              Specify {@code 0} to disable.
184      * @param maxTotalMemorySize    the maximum total size of the queued events for this pool
185      *                              Specify {@code 0} to disable.
186      * @param keepAliveTime         the amount of time for an inactive thread to shut itself down
187      * @param unit                  the {@link TimeUnit} of {@code keepAliveTime}
188      * @param threadFactory         the {@link ThreadFactory} of this pool
189      */
190     public OrderedMemoryAwareThreadPoolExecutor(
191             int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
192             long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
193         super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,
194                 keepAliveTime, unit, threadFactory);
195     }
196 
197     /**
198      * Creates a new instance.
199      *
200      * @param corePoolSize          the maximum number of active threads
201      * @param maxChannelMemorySize  the maximum total size of the queued events per channel.
202      *                              Specify {@code 0} to disable.
203      * @param maxTotalMemorySize    the maximum total size of the queued events for this pool
204      *                              Specify {@code 0} to disable.
205      * @param keepAliveTime         the amount of time for an inactive thread to shut itself down
206      * @param unit                  the {@link TimeUnit} of {@code keepAliveTime}
207      * @param threadFactory         the {@link ThreadFactory} of this pool
208      * @param objectSizeEstimator   the {@link ObjectSizeEstimator} of this pool
209      */
210     public OrderedMemoryAwareThreadPoolExecutor(
211             int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize,
212             long keepAliveTime, TimeUnit unit,
213             ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) {
214         super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,
215                 keepAliveTime, unit, objectSizeEstimator, threadFactory);
216     }
217 
218     protected ConcurrentMap<Object, Executor> newChildExecutorMap() {
219         return new ConcurrentIdentityWeakKeyHashMap<Object, Executor>();
220     }
221 
222     protected Object getChildExecutorKey(ChannelEvent e) {
223         return e.getChannel();
224     }
225 
226     protected Set<Object> getChildExecutorKeySet() {
227         return childExecutors.keySet();
228     }
229 
230     protected boolean removeChildExecutor(Object key) {
231         // FIXME: Succeed only when there is no task in the ChildExecutor's queue.
232         //        Note that it will need locking which might slow down task submission.
233         return childExecutors.remove(key) != null;
234     }
235 
236     /**
237      * Executes the specified task concurrently while maintaining the event
238      * order.
239      */
240     @Override
241     protected void doExecute(Runnable task) {
242         if (!(task instanceof ChannelEventRunnable)) {
243             doUnorderedExecute(task);
244         } else {
245             ChannelEventRunnable r = (ChannelEventRunnable) task;
246             getChildExecutor(r.getEvent()).execute(task);
247         }
248     }
249 
250     private Executor getChildExecutor(ChannelEvent e) {
251         Object key = getChildExecutorKey(e);
252         Executor executor = childExecutors.get(key);
253         if (executor == null) {
254             executor = new ChildExecutor();
255             Executor oldExecutor = childExecutors.putIfAbsent(key, executor);
256             if (oldExecutor != null) {
257                 executor = oldExecutor;
258             }
259         }
260 
261         // Remove the entry when the channel closes.
262         if (e instanceof ChannelStateEvent) {
263             Channel channel = e.getChannel();
264             ChannelStateEvent se = (ChannelStateEvent) e;
265             if (se.getState() == ChannelState.OPEN &&
266                 !channel.isOpen()) {
267                 childExecutors.remove(channel);
268             }
269         }
270         return executor;
271     }
272 
273     @Override
274     protected boolean shouldCount(Runnable task) {
275         if (task instanceof ChildExecutor) {
276             return false;
277         }
278 
279         return super.shouldCount(task);
280     }
281 
282     void onAfterExecute(Runnable r, Throwable t) {
283         afterExecute(r, t);
284     }
285 
286     private final class ChildExecutor implements Executor, Runnable {
287         private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
288 
289         ChildExecutor() {
290             super();
291         }
292 
293         public void execute(Runnable command) {
294             boolean needsExecution;
295             synchronized (tasks) {
296                 needsExecution = tasks.isEmpty();
297                 tasks.add(command);
298             }
299 
300             if (needsExecution) {
301                 doUnorderedExecute(this);
302             }
303         }
304 
305         public void run() {
306             Thread thread = Thread.currentThread();
307             for (;;) {
308                 final Runnable task;
309                 synchronized (tasks) {
310                     task = tasks.getFirst();
311                 }
312 
313                 boolean ran = false;
314                 beforeExecute(thread, task);
315                 try {
316                     task.run();
317                     ran = true;
318                     onAfterExecute(task, null);
319                 } catch (RuntimeException e) {
320                     if (!ran) {
321                         onAfterExecute(task, e);
322                     }
323                     throw e;
324                 } finally {
325                     synchronized (tasks) {
326                         tasks.removeFirst();
327                         if (tasks.isEmpty()) {
328                             break;
329                         }
330                     }
331                 }
332             }
333         }
334     }
335 }