1 /* 2 * Copyright 2009 Red Hat, Inc. 3 * 4 * Red Hat licenses this file to you under the Apache License, version 2.0 5 * (the "License"); you may not use this file except in compliance with the 6 * License. You may obtain a copy of the License at: 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations 14 * under the License. 15 */ 16 package org.jboss.netty.handler.execution; 17 18 import java.util.IdentityHashMap; 19 import java.util.LinkedList; 20 import java.util.Set; 21 import java.util.WeakHashMap; 22 import java.util.concurrent.ConcurrentMap; 23 import java.util.concurrent.Executor; 24 import java.util.concurrent.ThreadFactory; 25 import java.util.concurrent.TimeUnit; 26 27 import org.jboss.netty.channel.Channel; 28 import org.jboss.netty.channel.ChannelEvent; 29 import org.jboss.netty.channel.ChannelState; 30 import org.jboss.netty.channel.ChannelStateEvent; 31 import org.jboss.netty.util.ObjectSizeEstimator; 32 import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap; 33 34 /** 35 * A {@link MemoryAwareThreadPoolExecutor} which makes sure the events from the 36 * same {@link Channel} are executed sequentially. 37 * <p> 38 * <b>NOTE</b>: This thread pool inherits most characteristics of its super 39 * type, so please make sure to refer to {@link MemoryAwareThreadPoolExecutor} 40 * to understand how it works basically. 41 * 42 * <h3>Event execution order</h3> 43 * 44 * For example, let's say there are two executor threads that handle the events 45 * from the two channels: 46 * <pre> 47 * -------------------------------------> Timeline ------------------------------------> 48 * 49 * Thread X: --- Channel A (Event A1) --. .-- Channel B (Event B2) --- Channel B (Event B3) ---> 50 * \ / 51 * X 52 * / \ 53 * Thread Y: --- Channel B (Event B1) --' '-- Channel A (Event A2) --- Channel A (Event A3) ---> 54 * </pre> 55 * As you see, the events from different channels are independent from each 56 * other. That is, an event of Channel B will not be blocked by an event of 57 * Channel A and vice versa, unless the thread pool is exhausted. 58 * <p> 59 * Also, it is guaranteed that the invocation will be made sequentially for the 60 * events from the same channel. For example, the event A2 is never executed 61 * before the event A1 is finished. (Although not recommended, if you want the 62 * events from the same channel to be executed simultaneously, please use 63 * {@link MemoryAwareThreadPoolExecutor} instead.) 64 * <p> 65 * However, it is not guaranteed that the invocation will be made by the same 66 * thread for the same channel. The events from the same channel can be 67 * executed by different threads. For example, the Event A2 is executed by the 68 * thread Y while the event A1 was executed by the thread X. 69 * 70 * <h3>Using a different key other than {@link Channel} to maintain event order</h3> 71 * <p> 72 * {@link OrderedMemoryAwareThreadPoolExecutor} uses a {@link Channel} as a key 73 * that is used for maintaining the event execution order, as explained in the 74 * previous section. Alternatively, you can extend it to change its behavior. 75 * For example, you can change the key to the remote IP of the peer: 76 * 77 * <pre> 78 * public class RemoteAddressBasedOMATPE extends {@link OrderedMemoryAwareThreadPoolExecutor} { 79 * 80 * ... Constructors ... 81 * 82 * {@code @Override} 83 * protected ConcurrentMap<Object, Executor> newChildExecutorMap() { 84 * // The default implementation returns a special ConcurrentMap that 85 * // uses identity comparison only (see {@link IdentityHashMap}). 86 * // Because SocketAddress does not work with identity comparison, 87 * // we need to employ more generic implementation. 88 * return new ConcurrentHashMap<Object, Executor> 89 * } 90 * 91 * protected Object getChildExecutorKey({@link ChannelEvent} e) { 92 * // Use the IP of the remote peer as a key. 93 * return ((InetSocketAddress) e.getChannel().getRemoteAddress()).getAddress(); 94 * } 95 * 96 * // Make public so that you can call from anywhere. 97 * public boolean removeChildExecutor(Object key) { 98 * super.removeChildExecutor(key); 99 * } 100 * } 101 * </pre> 102 * 103 * Please be very careful of memory leak of the child executor map. You must 104 * call {@link #removeChildExecutor(Object)} when the life cycle of the key 105 * ends (e.g. all connections from the same IP were closed.) Also, please 106 * keep in mind that the key can appear again after calling {@link #removeChildExecutor(Object)} 107 * (e.g. a new connection could come in from the same old IP after removal.) 108 * If in doubt, prune the old unused or stall keys from the child executor map 109 * periodically: 110 * 111 * <pre> 112 * RemoteAddressBasedOMATPE executor = ...; 113 * 114 * on every 3 seconds: 115 * 116 * for (Iterator<Object> i = executor.getChildExecutorKeySet().iterator; i.hasNext();) { 117 * InetAddress ip = (InetAddress) i.next(); 118 * if (there is no active connection from 'ip' now && 119 * there has been no incoming connection from 'ip' for last 10 minutes) { 120 * i.remove(); 121 * } 122 * } 123 * </pre> 124 * 125 * If the expected maximum number of keys is small and deterministic, you could 126 * use a weak key map such as <a href="http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/src/jsr166y/ConcurrentWeakHashMap.java?view=markup">ConcurrentWeakHashMap</a> 127 * or synchronized {@link WeakHashMap} instead of managing the life cycle of the 128 * keys by yourself. 129 * 130 * @author <a href="http://www.jboss.org/netty/">The Netty Project</a> 131 * @author <a href="http://gleamynode.net/">Trustin Lee</a> 132 * @author David M. Lloyd (david.lloyd@redhat.com) 133 * 134 * @version $Rev: 2308 $, $Date: 2010-06-17 23:23:59 +0900 (Thu, 17 Jun 2010) $ 135 * 136 * @apiviz.landmark 137 */ 138 public class OrderedMemoryAwareThreadPoolExecutor extends 139 MemoryAwareThreadPoolExecutor { 140 141 // TODO Make OMATPE focus on the case where Channel is the key. 142 // Add a new less-efficient TPE that allows custom key. 143 144 private final ConcurrentMap<Object, Executor> childExecutors = newChildExecutorMap(); 145 146 /** 147 * Creates a new instance. 148 * 149 * @param corePoolSize the maximum number of active threads 150 * @param maxChannelMemorySize the maximum total size of the queued events per channel. 151 * Specify {@code 0} to disable. 152 * @param maxTotalMemorySize the maximum total size of the queued events for this pool 153 * Specify {@code 0} to disable. 154 */ 155 public OrderedMemoryAwareThreadPoolExecutor( 156 int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) { 157 super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize); 158 } 159 160 /** 161 * Creates a new instance. 162 * 163 * @param corePoolSize the maximum number of active threads 164 * @param maxChannelMemorySize the maximum total size of the queued events per channel. 165 * Specify {@code 0} to disable. 166 * @param maxTotalMemorySize the maximum total size of the queued events for this pool 167 * Specify {@code 0} to disable. 168 * @param keepAliveTime the amount of time for an inactive thread to shut itself down 169 * @param unit the {@link TimeUnit} of {@code keepAliveTime} 170 */ 171 public OrderedMemoryAwareThreadPoolExecutor( 172 int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, 173 long keepAliveTime, TimeUnit unit) { 174 super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, 175 keepAliveTime, unit); 176 } 177 178 /** 179 * Creates a new instance. 180 * 181 * @param corePoolSize the maximum number of active threads 182 * @param maxChannelMemorySize the maximum total size of the queued events per channel. 183 * Specify {@code 0} to disable. 184 * @param maxTotalMemorySize the maximum total size of the queued events for this pool 185 * Specify {@code 0} to disable. 186 * @param keepAliveTime the amount of time for an inactive thread to shut itself down 187 * @param unit the {@link TimeUnit} of {@code keepAliveTime} 188 * @param threadFactory the {@link ThreadFactory} of this pool 189 */ 190 public OrderedMemoryAwareThreadPoolExecutor( 191 int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, 192 long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { 193 super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, 194 keepAliveTime, unit, threadFactory); 195 } 196 197 /** 198 * Creates a new instance. 199 * 200 * @param corePoolSize the maximum number of active threads 201 * @param maxChannelMemorySize the maximum total size of the queued events per channel. 202 * Specify {@code 0} to disable. 203 * @param maxTotalMemorySize the maximum total size of the queued events for this pool 204 * Specify {@code 0} to disable. 205 * @param keepAliveTime the amount of time for an inactive thread to shut itself down 206 * @param unit the {@link TimeUnit} of {@code keepAliveTime} 207 * @param threadFactory the {@link ThreadFactory} of this pool 208 * @param objectSizeEstimator the {@link ObjectSizeEstimator} of this pool 209 */ 210 public OrderedMemoryAwareThreadPoolExecutor( 211 int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, 212 long keepAliveTime, TimeUnit unit, 213 ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) { 214 super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, 215 keepAliveTime, unit, objectSizeEstimator, threadFactory); 216 } 217 218 protected ConcurrentMap<Object, Executor> newChildExecutorMap() { 219 return new ConcurrentIdentityWeakKeyHashMap<Object, Executor>(); 220 } 221 222 protected Object getChildExecutorKey(ChannelEvent e) { 223 return e.getChannel(); 224 } 225 226 protected Set<Object> getChildExecutorKeySet() { 227 return childExecutors.keySet(); 228 } 229 230 protected boolean removeChildExecutor(Object key) { 231 // FIXME: Succeed only when there is no task in the ChildExecutor's queue. 232 // Note that it will need locking which might slow down task submission. 233 return childExecutors.remove(key) != null; 234 } 235 236 /** 237 * Executes the specified task concurrently while maintaining the event 238 * order. 239 */ 240 @Override 241 protected void doExecute(Runnable task) { 242 if (!(task instanceof ChannelEventRunnable)) { 243 doUnorderedExecute(task); 244 } else { 245 ChannelEventRunnable r = (ChannelEventRunnable) task; 246 getChildExecutor(r.getEvent()).execute(task); 247 } 248 } 249 250 private Executor getChildExecutor(ChannelEvent e) { 251 Object key = getChildExecutorKey(e); 252 Executor executor = childExecutors.get(key); 253 if (executor == null) { 254 executor = new ChildExecutor(); 255 Executor oldExecutor = childExecutors.putIfAbsent(key, executor); 256 if (oldExecutor != null) { 257 executor = oldExecutor; 258 } 259 } 260 261 // Remove the entry when the channel closes. 262 if (e instanceof ChannelStateEvent) { 263 Channel channel = e.getChannel(); 264 ChannelStateEvent se = (ChannelStateEvent) e; 265 if (se.getState() == ChannelState.OPEN && 266 !channel.isOpen()) { 267 childExecutors.remove(channel); 268 } 269 } 270 return executor; 271 } 272 273 @Override 274 protected boolean shouldCount(Runnable task) { 275 if (task instanceof ChildExecutor) { 276 return false; 277 } 278 279 return super.shouldCount(task); 280 } 281 282 void onAfterExecute(Runnable r, Throwable t) { 283 afterExecute(r, t); 284 } 285 286 private final class ChildExecutor implements Executor, Runnable { 287 private final LinkedList<Runnable> tasks = new LinkedList<Runnable>(); 288 289 ChildExecutor() { 290 super(); 291 } 292 293 public void execute(Runnable command) { 294 boolean needsExecution; 295 synchronized (tasks) { 296 needsExecution = tasks.isEmpty(); 297 tasks.add(command); 298 } 299 300 if (needsExecution) { 301 doUnorderedExecute(this); 302 } 303 } 304 305 public void run() { 306 Thread thread = Thread.currentThread(); 307 for (;;) { 308 final Runnable task; 309 synchronized (tasks) { 310 task = tasks.getFirst(); 311 } 312 313 boolean ran = false; 314 beforeExecute(thread, task); 315 try { 316 task.run(); 317 ran = true; 318 onAfterExecute(task, null); 319 } catch (RuntimeException e) { 320 if (!ran) { 321 onAfterExecute(task, e); 322 } 323 throw e; 324 } finally { 325 synchronized (tasks) { 326 tasks.removeFirst(); 327 if (tasks.isEmpty()) { 328 break; 329 } 330 } 331 } 332 } 333 } 334 } 335 }