View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.execution;
17  
18  import java.util.concurrent.Executor;
19  
20  import org.jboss.netty.bootstrap.ServerBootstrap;
21  import org.jboss.netty.channel.Channel;
22  import org.jboss.netty.channel.ChannelDownstreamHandler;
23  import org.jboss.netty.channel.ChannelEvent;
24  import org.jboss.netty.channel.ChannelHandler;
25  import org.jboss.netty.channel.ChannelHandlerContext;
26  import org.jboss.netty.channel.ChannelPipeline;
27  import org.jboss.netty.channel.ChannelPipelineFactory;
28  import org.jboss.netty.channel.ChannelState;
29  import org.jboss.netty.channel.ChannelStateEvent;
30  import org.jboss.netty.channel.ChannelUpstreamHandler;
31  import org.jboss.netty.channel.Channels;
32  import org.jboss.netty.channel.ChannelHandler.Sharable;
33  import org.jboss.netty.util.ExternalResourceReleasable;
34  import org.jboss.netty.util.internal.ExecutorUtil;
35  
36  /**
37   * Forwards an upstream {@link ChannelEvent} to an {@link Executor}.
38   * <p>
39   * {@link ExecutionHandler} is often used when your {@link ChannelHandler}
40   * performs a blocking operation that takes long time or accesses a resource
41   * which is not CPU-bound business logic such as DB access.  Running such
42   * operations in a pipeline without an {@link ExecutionHandler} will result in
43   * unwanted hiccup during I/O because an I/O thread cannot perform I/O until
44   * your handler returns the control to the I/O thread.
45   * <p>
46   * In most cases, an {@link ExecutionHandler} is coupled with an
47   * {@link OrderedMemoryAwareThreadPoolExecutor} because it guarantees the
48   * correct event execution order and prevents an {@link OutOfMemoryError}
49   * under load:
50   * <pre>
51   * public class DatabaseGatewayPipelineFactory implements {@link ChannelPipelineFactory} {
52   *
53   *     <b>private final {@link ExecutionHandler} executionHandler;</b>
54   *
55   *     public DatabaseGatewayPipelineFactory({@link ExecutionHandler} executionHandler) {
56   *         this.executionHandler = executionHandler;
57   *     }
58   *
59   *     public {@link ChannelPipeline} getPipeline() {
60   *         return {@link Channels}.pipeline(
61   *                 new DatabaseGatewayProtocolEncoder(),
62   *                 new DatabaseGatewayProtocolDecoder(),
63   *                 <b>executionHandler, // Must be shared</b>
64   *                 new DatabaseQueryingHandler());
65   *     }
66   * }
67   * ...
68   *
69   * public static void main(String[] args) {
70   *     {@link ServerBootstrap} bootstrap = ...;
71   *     ...
72   *     <b>{@link ExecutionHandler} executionHandler = new {@link ExecutionHandler}(
73   *             new {@link OrderedMemoryAwareThreadPoolExecutor}(16, 1048576, 1048576))
74   *     bootstrap.setPipelineFactory(
75   *             new DatabaseGatewayPipelineFactory(executionHandler));</b>
76   *     ...
77   *     bootstrap.bind(...);
78   *     ...
79   *
80   *     while (!isServerReadyToShutDown()) {
81   *         // ... wait ...
82   *     }
83   *
84   *     bootstrap.releaseExternalResources();
85   *     <b>executionHandler.releaseExternalResources();</b>
86   * }
87   * </pre>
88   *
89   * Please refer to {@link OrderedMemoryAwareThreadPoolExecutor} for the
90   * detailed information about how the event order is guaranteed.
91   *
92   * <h3>SEDA (Staged Event-Driven Architecture)</h3>
93   * You can implement an alternative thread model such as
94   * <a href="http://en.wikipedia.org/wiki/Staged_event-driven_architecture">SEDA</a>
95   * by adding more than one {@link ExecutionHandler} to the pipeline.
96   *
97   * <h3>Using other {@link Executor} implementation</h3>
98   *
99   * Although it's recommended to use {@link OrderedMemoryAwareThreadPoolExecutor},
100  * you can use other {@link Executor} implementations.  However, you must note
101  * that other {@link Executor} implementation might break your application
102  * because they often do not maintain event execution order nor interact with
103  * I/O threads to control the incoming traffic and avoid {@link OutOfMemoryError}.
104  *
105  * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
106  * @author <a href="http://gleamynode.net/">Trustin Lee</a>
107  *
108  * @version $Rev: 2295 $, $Date: 2010-06-03 18:01:46 +0900 (Thu, 03 Jun 2010) $
109  *
110  * @apiviz.landmark
111  * @apiviz.has java.util.concurrent.ThreadPoolExecutor
112  */
113 @Sharable
114 public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler, ExternalResourceReleasable {
115 
116     private final Executor executor;
117 
118     /**
119      * Creates a new instance with the specified {@link Executor}.
120      * Specify an {@link OrderedMemoryAwareThreadPoolExecutor} if unsure.
121      */
122     public ExecutionHandler(Executor executor) {
123         if (executor == null) {
124             throw new NullPointerException("executor");
125         }
126         this.executor = executor;
127     }
128 
129     /**
130      * Returns the {@link Executor} which was specified with the constructor.
131      */
132     public Executor getExecutor() {
133         return executor;
134     }
135 
136     /**
137      * Shuts down the {@link Executor} which was specified with the constructor
138      * and wait for its termination.
139      */
140     public void releaseExternalResources() {
141         ExecutorUtil.terminate(getExecutor());
142     }
143 
144     public void handleUpstream(
145             ChannelHandlerContext context, ChannelEvent e) throws Exception {
146         executor.execute(new ChannelEventRunnable(context, e));
147     }
148 
149     public void handleDownstream(
150             ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
151         if (e instanceof ChannelStateEvent) {
152             ChannelStateEvent cse = (ChannelStateEvent) e;
153             if (cse.getState() == ChannelState.INTEREST_OPS &&
154                 (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
155 
156                 // setReadable(true) requested
157                 boolean readSuspended = ctx.getAttachment() != null;
158                 if (readSuspended) {
159                     // Drop the request silently if MemoryAwareThreadPool has
160                     // set the flag.
161                     e.getFuture().setSuccess();
162                     return;
163                 }
164             }
165         }
166 
167         ctx.sendDownstream(e);
168     }
169 }