View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.timeout;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.util.concurrent.TimeUnit;
21  
22  import org.jboss.netty.bootstrap.ServerBootstrap;
23  import org.jboss.netty.channel.Channel;
24  import org.jboss.netty.channel.ChannelHandler;
25  import org.jboss.netty.channel.ChannelHandler.Sharable;
26  import org.jboss.netty.channel.ChannelHandlerContext;
27  import org.jboss.netty.channel.ChannelPipeline;
28  import org.jboss.netty.channel.ChannelPipelineFactory;
29  import org.jboss.netty.channel.ChannelStateEvent;
30  import org.jboss.netty.channel.Channels;
31  import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
32  import org.jboss.netty.channel.MessageEvent;
33  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
34  import org.jboss.netty.channel.WriteCompletionEvent;
35  import org.jboss.netty.util.ExternalResourceReleasable;
36  import org.jboss.netty.util.HashedWheelTimer;
37  import org.jboss.netty.util.Timeout;
38  import org.jboss.netty.util.Timer;
39  import org.jboss.netty.util.TimerTask;
40  
41  /**
42   * Triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
43   * read, write, or both operation for a while.
44   *
45   * <h3>Supported idle states</h3>
46   * <table border="1">
47   * <tr>
48   * <th>Property</th><th>Meaning</th>
49   * </tr>
50   * <tr>
51   * <td>{@code readerIdleTime}</td>
52   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
53   *     will be triggered when no read was performed for the specified period of
54   *     time.  Specify {@code 0} to disable.</td>
55   * </tr>
56   * <tr>
57   * <td>{@code writerIdleTime}</td>
58   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
59   *     will be triggered when no write was performed for the specified period of
60   *     time.  Specify {@code 0} to disable.</td>
61   * </tr>
62   * <tr>
63   * <td>{@code allIdleTime}</td>
64   * <td>an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
65   *     will be triggered when neither read nor write was performed for the
66   *     specified period of time.  Specify {@code 0} to disable.</td>
67   * </tr>
68   * </table>
69   *
70   * <pre>
71   * // An example that sends a ping message when there is no outbound traffic
72   * // for 30 seconds.  The connection is closed when there is no inbound traffic
73   * // for 60 seconds.
74   *
75   * public class MyPipelineFactory implements {@link ChannelPipelineFactory} {
76   *
77   *     private final {@link Timer} timer;
78   *     private final {@link ChannelHandler} idleStateHandler;
79   *
80   *     public MyPipelineFactory({@link Timer} timer) {
81   *         this.timer = timer;
82   *         this.idleStateHandler = <b>new {@link IdleStateHandler}(timer, 60, 30, 0), // timer must be shared.</b>
83   *     }
84   *
85   *     public {@link ChannelPipeline} getPipeline() {
86   *         return {@link Channels}.pipeline(
87   *             idleStateHandler,
88   *             new MyHandler());
89   *     }
90   * }
91   *
92   * // Handler should handle the {@link IdleStateEvent} triggered by {@link IdleStateHandler}.
93   * public class MyHandler extends {@link IdleStateAwareChannelHandler} {
94   *
95   *     {@code @Override}
96   *     public void channelIdle({@link ChannelHandlerContext} ctx, {@link IdleStateEvent} e) {
97   *         if (e.getState() == {@link IdleState}.READER_IDLE) {
98   *             e.getChannel().close();
99   *         } else if (e.getState() == {@link IdleState}.WRITER_IDLE) {
100  *             e.getChannel().write(new PingMessage());
101  *         }
102  *     }
103  * }
104  *
105  * {@link ServerBootstrap} bootstrap = ...;
106  * {@link Timer} timer = new {@link HashedWheelTimer}();
107  * ...
108  * bootstrap.setPipelineFactory(new MyPipelineFactory(timer));
109  * ...
110  * </pre>
111  *
112  * The {@link Timer} which was specified when the {@link IdleStateHandler} is
113  * created should be stopped manually by calling {@link #releaseExternalResources()}
114  * or {@link Timer#stop()} when your application shuts down.
115  *
116  * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
117  * @author <a href="http://gleamynode.net/">Trustin Lee</a>
118  * @version $Rev: 2224 $, $Date: 2010-03-30 17:02:32 +0900 (Tue, 30 Mar 2010) $
119  *
120  * @see ReadTimeoutHandler
121  * @see WriteTimeoutHandler
122  *
123  * @apiviz.landmark
124  * @apiviz.uses org.jboss.netty.util.HashedWheelTimer
125  * @apiviz.has org.jboss.netty.handler.timeout.IdleStateEvent oneway - - triggers
126  */
127 @Sharable
128 public class IdleStateHandler extends SimpleChannelUpstreamHandler
129                              implements LifeCycleAwareChannelHandler,
130                                         ExternalResourceReleasable {
131 
132     final Timer timer;
133 
134     final long readerIdleTimeMillis;
135     final long writerIdleTimeMillis;
136     final long allIdleTimeMillis;
137 
138     /**
139      * Creates a new instance.
140      *
141      * @param timer
142      *        the {@link Timer} that is used to trigger the scheduled event.
143      *        The recommended {@link Timer} implementation is {@link HashedWheelTimer}.
144      * @param readerIdleTimeSeconds
145      *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
146      *        will be triggered when no read was performed for the specified
147      *        period of time.  Specify {@code 0} to disable.
148      * @param writerIdleTimeSeconds
149      *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
150      *        will be triggered when no write was performed for the specified
151      *        period of time.  Specify {@code 0} to disable.
152      * @param allIdleTimeSeconds
153      *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
154      *        will be triggered when neither read nor write was performed for
155      *        the specified period of time.  Specify {@code 0} to disable.
156      */
157     public IdleStateHandler(
158             Timer timer,
159             int readerIdleTimeSeconds,
160             int writerIdleTimeSeconds,
161             int allIdleTimeSeconds) {
162 
163         this(timer,
164              readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
165              TimeUnit.SECONDS);
166     }
167 
168     /**
169      * Creates a new instance.
170      *
171      * @param timer
172      *        the {@link Timer} that is used to trigger the scheduled event.
173      *        The recommended {@link Timer} implementation is {@link HashedWheelTimer}.
174      * @param readerIdleTime
175      *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
176      *        will be triggered when no read was performed for the specified
177      *        period of time.  Specify {@code 0} to disable.
178      * @param writerIdleTime
179      *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
180      *        will be triggered when no write was performed for the specified
181      *        period of time.  Specify {@code 0} to disable.
182      * @param allIdleTime
183      *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
184      *        will be triggered when neither read nor write was performed for
185      *        the specified period of time.  Specify {@code 0} to disable.
186      * @param unit
187      *        the {@link TimeUnit} of {@code readerIdleTime},
188      *        {@code writeIdleTime}, and {@code allIdleTime}
189      */
190     public IdleStateHandler(
191             Timer timer,
192             long readerIdleTime, long writerIdleTime, long allIdleTime,
193             TimeUnit unit) {
194 
195         if (timer == null) {
196             throw new NullPointerException("timer");
197         }
198         if (unit == null) {
199             throw new NullPointerException("unit");
200         }
201 
202         this.timer = timer;
203         if (readerIdleTime <= 0) {
204             readerIdleTimeMillis = 0;
205         } else {
206             readerIdleTimeMillis = Math.max(unit.toMillis(readerIdleTime), 1);
207         }
208         if (writerIdleTime <= 0) {
209             writerIdleTimeMillis = 0;
210         } else {
211             writerIdleTimeMillis = Math.max(unit.toMillis(writerIdleTime), 1);
212         }
213         if (allIdleTime <= 0) {
214             allIdleTimeMillis = 0;
215         } else {
216             allIdleTimeMillis = Math.max(unit.toMillis(allIdleTime), 1);
217         }
218     }
219 
220     /**
221      * Stops the {@link Timer} which was specified in the constructor of this
222      * handler.  You should not call this method if the {@link Timer} is in use
223      * by other objects.
224      */
225     public void releaseExternalResources() {
226         timer.stop();
227     }
228 
229     public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
230         if (ctx.getPipeline().isAttached()) {
231             // channelOpen event has been fired already, which means
232             // this.channelOpen() will not be invoked.
233             // We have to initialize here instead.
234             initialize(ctx);
235         } else {
236             // channelOpen event has not been fired yet.
237             // this.channelOpen() will be invoked and initialization will occur there.
238         }
239     }
240 
241     public void afterAdd(ChannelHandlerContext ctx) throws Exception {
242         // NOOP
243     }
244 
245     public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
246         destroy(ctx);
247     }
248 
249     public void afterRemove(ChannelHandlerContext ctx) throws Exception {
250         // NOOP
251     }
252 
253     @Override
254     public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
255             throws Exception {
256         // This method will be invoked only if this handler was added
257         // before channelOpen event is fired.  If a user adds this handler
258         // after the channelOpen event, initialize() will be called by beforeAdd().
259         initialize(ctx);
260         ctx.sendUpstream(e);
261     }
262 
263     @Override
264     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
265             throws Exception {
266         destroy(ctx);
267         ctx.sendUpstream(e);
268     }
269 
270     @Override
271     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
272             throws Exception {
273         State state = (State) ctx.getAttachment();
274         state.lastReadTime = System.currentTimeMillis();
275         ctx.sendUpstream(e);
276     }
277 
278     @Override
279     public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e)
280             throws Exception {
281         if (e.getWrittenAmount() > 0) {
282             State state = (State) ctx.getAttachment();
283             state.lastWriteTime = System.currentTimeMillis();
284         }
285         ctx.sendUpstream(e);
286     }
287 
288     private void initialize(ChannelHandlerContext ctx) {
289         State state = new State();
290         ctx.setAttachment(state);
291 
292         state.lastReadTime = state.lastWriteTime = System.currentTimeMillis();
293         if (readerIdleTimeMillis > 0) {
294             state.readerIdleTimeout = timer.newTimeout(
295                     new ReaderIdleTimeoutTask(ctx),
296                     readerIdleTimeMillis, TimeUnit.MILLISECONDS);
297         }
298         if (writerIdleTimeMillis > 0) {
299             state.writerIdleTimeout = timer.newTimeout(
300                     new WriterIdleTimeoutTask(ctx),
301                     writerIdleTimeMillis, TimeUnit.MILLISECONDS);
302         }
303         if (allIdleTimeMillis > 0) {
304             state.allIdleTimeout = timer.newTimeout(
305                     new AllIdleTimeoutTask(ctx),
306                     allIdleTimeMillis, TimeUnit.MILLISECONDS);
307         }
308     }
309 
310     private void destroy(ChannelHandlerContext ctx) {
311         State state = (State) ctx.getAttachment();
312         if (state.readerIdleTimeout != null) {
313             state.readerIdleTimeout.cancel();
314             state.readerIdleTimeout = null;
315         }
316         if (state.writerIdleTimeout != null) {
317             state.writerIdleTimeout.cancel();
318             state.writerIdleTimeout = null;
319         }
320         if (state.allIdleTimeout != null) {
321             state.allIdleTimeout.cancel();
322             state.allIdleTimeout = null;
323         }
324     }
325 
326     protected void channelIdle(
327             ChannelHandlerContext ctx, IdleState state, long lastActivityTimeMillis) throws Exception {
328         ctx.sendUpstream(new DefaultIdleStateEvent(ctx.getChannel(), state, lastActivityTimeMillis));
329     }
330 
331     private final class ReaderIdleTimeoutTask implements TimerTask {
332 
333         private final ChannelHandlerContext ctx;
334 
335         ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
336             this.ctx = ctx;
337         }
338 
339         public void run(Timeout timeout) throws Exception {
340             if (timeout.isCancelled() || !ctx.getChannel().isOpen()) {
341                 return;
342             }
343 
344             State state = (State) ctx.getAttachment();
345             long currentTime = System.currentTimeMillis();
346             long lastReadTime = state.lastReadTime;
347             long nextDelay = readerIdleTimeMillis - (currentTime - lastReadTime);
348             if (nextDelay <= 0) {
349                 // Reader is idle - set a new timeout and notify the callback.
350                 state.readerIdleTimeout =
351                     timer.newTimeout(this, readerIdleTimeMillis, TimeUnit.MILLISECONDS);
352                 try {
353                     channelIdle(ctx, IdleState.READER_IDLE, lastReadTime);
354                 } catch (Throwable t) {
355                     fireExceptionCaught(ctx, t);
356                 }
357             } else {
358                 // Read occurred before the timeout - set a new timeout with shorter delay.
359                 state.readerIdleTimeout =
360                     timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
361             }
362         }
363 
364     }
365 
366     private final class WriterIdleTimeoutTask implements TimerTask {
367 
368         private final ChannelHandlerContext ctx;
369 
370         WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
371             this.ctx = ctx;
372         }
373 
374         public void run(Timeout timeout) throws Exception {
375             if (timeout.isCancelled() || !ctx.getChannel().isOpen()) {
376                 return;
377             }
378 
379             State state = (State) ctx.getAttachment();
380             long currentTime = System.currentTimeMillis();
381             long lastWriteTime = state.lastWriteTime;
382             long nextDelay = writerIdleTimeMillis - (currentTime - lastWriteTime);
383             if (nextDelay <= 0) {
384                 // Writer is idle - set a new timeout and notify the callback.
385                 state.writerIdleTimeout =
386                     timer.newTimeout(this, writerIdleTimeMillis, TimeUnit.MILLISECONDS);
387                 try {
388                     channelIdle(ctx, IdleState.WRITER_IDLE, lastWriteTime);
389                 } catch (Throwable t) {
390                     fireExceptionCaught(ctx, t);
391                 }
392             } else {
393                 // Write occurred before the timeout - set a new timeout with shorter delay.
394                 state.writerIdleTimeout =
395                     timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
396             }
397         }
398     }
399 
400     private final class AllIdleTimeoutTask implements TimerTask {
401 
402         private final ChannelHandlerContext ctx;
403 
404         AllIdleTimeoutTask(ChannelHandlerContext ctx) {
405             this.ctx = ctx;
406         }
407 
408         public void run(Timeout timeout) throws Exception {
409             if (timeout.isCancelled() || !ctx.getChannel().isOpen()) {
410                 return;
411             }
412 
413             State state = (State) ctx.getAttachment();
414             long currentTime = System.currentTimeMillis();
415             long lastIoTime = Math.max(state.lastReadTime, state.lastWriteTime);
416             long nextDelay = allIdleTimeMillis - (currentTime - lastIoTime);
417             if (nextDelay <= 0) {
418                 // Both reader and writer are idle - set a new timeout and
419                 // notify the callback.
420                 state.allIdleTimeout =
421                     timer.newTimeout(this, allIdleTimeMillis, TimeUnit.MILLISECONDS);
422                 try {
423                     channelIdle(ctx, IdleState.ALL_IDLE, lastIoTime);
424                 } catch (Throwable t) {
425                     fireExceptionCaught(ctx, t);
426                 }
427             } else {
428                 // Either read or write occurred before the timeout - set a new
429                 // timeout with shorter delay.
430                 state.allIdleTimeout =
431                     timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
432             }
433         }
434     }
435 
436     private static final class State {
437         State() {
438             super();
439         }
440 
441         volatile Timeout readerIdleTimeout;
442         volatile long lastReadTime;
443 
444         volatile Timeout writerIdleTimeout;
445         volatile long lastWriteTime;
446 
447         volatile Timeout allIdleTimeout;
448     }
449 }