1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.timeout;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.util.concurrent.TimeUnit;
21
22 import org.jboss.netty.bootstrap.ServerBootstrap;
23 import org.jboss.netty.channel.Channel;
24 import org.jboss.netty.channel.ChannelHandler;
25 import org.jboss.netty.channel.ChannelHandler.Sharable;
26 import org.jboss.netty.channel.ChannelHandlerContext;
27 import org.jboss.netty.channel.ChannelPipeline;
28 import org.jboss.netty.channel.ChannelPipelineFactory;
29 import org.jboss.netty.channel.ChannelStateEvent;
30 import org.jboss.netty.channel.Channels;
31 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
32 import org.jboss.netty.channel.MessageEvent;
33 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
34 import org.jboss.netty.channel.WriteCompletionEvent;
35 import org.jboss.netty.util.ExternalResourceReleasable;
36 import org.jboss.netty.util.HashedWheelTimer;
37 import org.jboss.netty.util.Timeout;
38 import org.jboss.netty.util.Timer;
39 import org.jboss.netty.util.TimerTask;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 @Sharable
128 public class IdleStateHandler extends SimpleChannelUpstreamHandler
129 implements LifeCycleAwareChannelHandler,
130 ExternalResourceReleasable {
131
132 final Timer timer;
133
134 final long readerIdleTimeMillis;
135 final long writerIdleTimeMillis;
136 final long allIdleTimeMillis;
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157 public IdleStateHandler(
158 Timer timer,
159 int readerIdleTimeSeconds,
160 int writerIdleTimeSeconds,
161 int allIdleTimeSeconds) {
162
163 this(timer,
164 readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
165 TimeUnit.SECONDS);
166 }
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190 public IdleStateHandler(
191 Timer timer,
192 long readerIdleTime, long writerIdleTime, long allIdleTime,
193 TimeUnit unit) {
194
195 if (timer == null) {
196 throw new NullPointerException("timer");
197 }
198 if (unit == null) {
199 throw new NullPointerException("unit");
200 }
201
202 this.timer = timer;
203 if (readerIdleTime <= 0) {
204 readerIdleTimeMillis = 0;
205 } else {
206 readerIdleTimeMillis = Math.max(unit.toMillis(readerIdleTime), 1);
207 }
208 if (writerIdleTime <= 0) {
209 writerIdleTimeMillis = 0;
210 } else {
211 writerIdleTimeMillis = Math.max(unit.toMillis(writerIdleTime), 1);
212 }
213 if (allIdleTime <= 0) {
214 allIdleTimeMillis = 0;
215 } else {
216 allIdleTimeMillis = Math.max(unit.toMillis(allIdleTime), 1);
217 }
218 }
219
220
221
222
223
224
225 public void releaseExternalResources() {
226 timer.stop();
227 }
228
229 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
230 if (ctx.getPipeline().isAttached()) {
231
232
233
234 initialize(ctx);
235 } else {
236
237
238 }
239 }
240
241 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
242
243 }
244
245 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
246 destroy(ctx);
247 }
248
249 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
250
251 }
252
253 @Override
254 public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
255 throws Exception {
256
257
258
259 initialize(ctx);
260 ctx.sendUpstream(e);
261 }
262
263 @Override
264 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
265 throws Exception {
266 destroy(ctx);
267 ctx.sendUpstream(e);
268 }
269
270 @Override
271 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
272 throws Exception {
273 State state = (State) ctx.getAttachment();
274 state.lastReadTime = System.currentTimeMillis();
275 ctx.sendUpstream(e);
276 }
277
278 @Override
279 public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e)
280 throws Exception {
281 if (e.getWrittenAmount() > 0) {
282 State state = (State) ctx.getAttachment();
283 state.lastWriteTime = System.currentTimeMillis();
284 }
285 ctx.sendUpstream(e);
286 }
287
288 private void initialize(ChannelHandlerContext ctx) {
289 State state = new State();
290 ctx.setAttachment(state);
291
292 state.lastReadTime = state.lastWriteTime = System.currentTimeMillis();
293 if (readerIdleTimeMillis > 0) {
294 state.readerIdleTimeout = timer.newTimeout(
295 new ReaderIdleTimeoutTask(ctx),
296 readerIdleTimeMillis, TimeUnit.MILLISECONDS);
297 }
298 if (writerIdleTimeMillis > 0) {
299 state.writerIdleTimeout = timer.newTimeout(
300 new WriterIdleTimeoutTask(ctx),
301 writerIdleTimeMillis, TimeUnit.MILLISECONDS);
302 }
303 if (allIdleTimeMillis > 0) {
304 state.allIdleTimeout = timer.newTimeout(
305 new AllIdleTimeoutTask(ctx),
306 allIdleTimeMillis, TimeUnit.MILLISECONDS);
307 }
308 }
309
310 private void destroy(ChannelHandlerContext ctx) {
311 State state = (State) ctx.getAttachment();
312 if (state.readerIdleTimeout != null) {
313 state.readerIdleTimeout.cancel();
314 state.readerIdleTimeout = null;
315 }
316 if (state.writerIdleTimeout != null) {
317 state.writerIdleTimeout.cancel();
318 state.writerIdleTimeout = null;
319 }
320 if (state.allIdleTimeout != null) {
321 state.allIdleTimeout.cancel();
322 state.allIdleTimeout = null;
323 }
324 }
325
326 protected void channelIdle(
327 ChannelHandlerContext ctx, IdleState state, long lastActivityTimeMillis) throws Exception {
328 ctx.sendUpstream(new DefaultIdleStateEvent(ctx.getChannel(), state, lastActivityTimeMillis));
329 }
330
331 private final class ReaderIdleTimeoutTask implements TimerTask {
332
333 private final ChannelHandlerContext ctx;
334
335 ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
336 this.ctx = ctx;
337 }
338
339 public void run(Timeout timeout) throws Exception {
340 if (timeout.isCancelled() || !ctx.getChannel().isOpen()) {
341 return;
342 }
343
344 State state = (State) ctx.getAttachment();
345 long currentTime = System.currentTimeMillis();
346 long lastReadTime = state.lastReadTime;
347 long nextDelay = readerIdleTimeMillis - (currentTime - lastReadTime);
348 if (nextDelay <= 0) {
349
350 state.readerIdleTimeout =
351 timer.newTimeout(this, readerIdleTimeMillis, TimeUnit.MILLISECONDS);
352 try {
353 channelIdle(ctx, IdleState.READER_IDLE, lastReadTime);
354 } catch (Throwable t) {
355 fireExceptionCaught(ctx, t);
356 }
357 } else {
358
359 state.readerIdleTimeout =
360 timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
361 }
362 }
363
364 }
365
366 private final class WriterIdleTimeoutTask implements TimerTask {
367
368 private final ChannelHandlerContext ctx;
369
370 WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
371 this.ctx = ctx;
372 }
373
374 public void run(Timeout timeout) throws Exception {
375 if (timeout.isCancelled() || !ctx.getChannel().isOpen()) {
376 return;
377 }
378
379 State state = (State) ctx.getAttachment();
380 long currentTime = System.currentTimeMillis();
381 long lastWriteTime = state.lastWriteTime;
382 long nextDelay = writerIdleTimeMillis - (currentTime - lastWriteTime);
383 if (nextDelay <= 0) {
384
385 state.writerIdleTimeout =
386 timer.newTimeout(this, writerIdleTimeMillis, TimeUnit.MILLISECONDS);
387 try {
388 channelIdle(ctx, IdleState.WRITER_IDLE, lastWriteTime);
389 } catch (Throwable t) {
390 fireExceptionCaught(ctx, t);
391 }
392 } else {
393
394 state.writerIdleTimeout =
395 timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
396 }
397 }
398 }
399
400 private final class AllIdleTimeoutTask implements TimerTask {
401
402 private final ChannelHandlerContext ctx;
403
404 AllIdleTimeoutTask(ChannelHandlerContext ctx) {
405 this.ctx = ctx;
406 }
407
408 public void run(Timeout timeout) throws Exception {
409 if (timeout.isCancelled() || !ctx.getChannel().isOpen()) {
410 return;
411 }
412
413 State state = (State) ctx.getAttachment();
414 long currentTime = System.currentTimeMillis();
415 long lastIoTime = Math.max(state.lastReadTime, state.lastWriteTime);
416 long nextDelay = allIdleTimeMillis - (currentTime - lastIoTime);
417 if (nextDelay <= 0) {
418
419
420 state.allIdleTimeout =
421 timer.newTimeout(this, allIdleTimeMillis, TimeUnit.MILLISECONDS);
422 try {
423 channelIdle(ctx, IdleState.ALL_IDLE, lastIoTime);
424 } catch (Throwable t) {
425 fireExceptionCaught(ctx, t);
426 }
427 } else {
428
429
430 state.allIdleTimeout =
431 timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
432 }
433 }
434 }
435
436 private static final class State {
437 State() {
438 super();
439 }
440
441 volatile Timeout readerIdleTimeout;
442 volatile long lastReadTime;
443
444 volatile Timeout writerIdleTimeout;
445 volatile long lastWriteTime;
446
447 volatile Timeout allIdleTimeout;
448 }
449 }