1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel;
17
18 import java.util.ArrayList;
19 import java.util.HashMap;
20 import java.util.LinkedHashMap;
21 import java.util.List;
22 import java.util.Map;
23
24 import org.jboss.netty.logging.InternalLogger;
25 import org.jboss.netty.logging.InternalLoggerFactory;
26 import org.jboss.netty.util.internal.ConversionUtil;
27
28
29
30
31
32
33
34
35
36
37
38
39
40 public class StaticChannelPipeline implements ChannelPipeline {
41
42
43 static final InternalLogger logger = InternalLoggerFactory.getInstance(StaticChannelPipeline.class);
44
45 private volatile Channel channel;
46 private volatile ChannelSink sink;
47 private final StaticChannelHandlerContext[] contexts;
48 private final int lastIndex;
49 private final Map<String, StaticChannelHandlerContext> name2ctx =
50 new HashMap<String, StaticChannelHandlerContext>(4);
51
52
53
54
55
56
57
58 public StaticChannelPipeline(ChannelHandler... handlers) {
59 if (handlers == null) {
60 throw new NullPointerException("handlers");
61 }
62 if (handlers.length == 0) {
63 throw new IllegalArgumentException("no handlers specified");
64 }
65
66
67 StaticChannelHandlerContext[] contexts =
68 new StaticChannelHandlerContext[handlers.length];
69 int nContexts;
70 for (nContexts = 0; nContexts < contexts.length; nContexts ++) {
71 ChannelHandler h = handlers[nContexts];
72 if (h == null) {
73 break;
74 }
75 }
76
77 if (nContexts == contexts.length) {
78 this.contexts = contexts;
79 lastIndex = contexts.length - 1;
80 } else {
81 this.contexts = contexts =
82 new StaticChannelHandlerContext[nContexts];
83 lastIndex = nContexts - 1;
84 }
85
86
87 for (int i = 0; i < nContexts; i ++) {
88 ChannelHandler h = handlers[i];
89 String name = ConversionUtil.toString(i);
90 StaticChannelHandlerContext ctx =
91 new StaticChannelHandlerContext(i, name, h);
92 contexts[i] = ctx;
93 name2ctx.put(name, ctx);
94 }
95
96 for (ChannelHandlerContext ctx: contexts) {
97 callBeforeAdd(ctx);
98 callAfterAdd(ctx);
99 }
100 }
101
102 public Channel getChannel() {
103 return channel;
104 }
105
106 public ChannelSink getSink() {
107 ChannelSink sink = this.sink;
108 if (sink == null) {
109 return DefaultChannelPipeline.discardingSink;
110 }
111 return sink;
112 }
113
114 public void attach(Channel channel, ChannelSink sink) {
115 if (channel == null) {
116 throw new NullPointerException("channel");
117 }
118 if (sink == null) {
119 throw new NullPointerException("sink");
120 }
121 if (this.channel != null || this.sink != null) {
122 throw new IllegalStateException("attached already");
123 }
124 this.channel = channel;
125 this.sink = sink;
126 }
127
128 public boolean isAttached() {
129 return sink != null;
130 }
131
132 public void addFirst(String name, ChannelHandler handler) {
133 throw new UnsupportedOperationException();
134 }
135
136 public void addLast(String name, ChannelHandler handler) {
137 throw new UnsupportedOperationException();
138 }
139
140 public void addBefore(String baseName, String name, ChannelHandler handler) {
141 throw new UnsupportedOperationException();
142 }
143
144 public void addAfter(String baseName, String name, ChannelHandler handler) {
145 throw new UnsupportedOperationException();
146 }
147
148 public void remove(ChannelHandler handler) {
149 throw new UnsupportedOperationException();
150 }
151
152 public ChannelHandler remove(String name) {
153 throw new UnsupportedOperationException();
154 }
155
156 public <T extends ChannelHandler> T remove(Class<T> handlerType) {
157 throw new UnsupportedOperationException();
158 }
159
160 public ChannelHandler removeFirst() {
161 throw new UnsupportedOperationException();
162 }
163
164 public ChannelHandler removeLast() {
165 throw new UnsupportedOperationException();
166 }
167
168 public void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
169 throw new UnsupportedOperationException();
170 }
171
172 public ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
173 throw new UnsupportedOperationException();
174 }
175
176 public <T extends ChannelHandler> T replace(
177 Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
178 throw new UnsupportedOperationException();
179 }
180
181 private void callBeforeAdd(ChannelHandlerContext ctx) {
182 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
183 return;
184 }
185
186 LifeCycleAwareChannelHandler h =
187 (LifeCycleAwareChannelHandler) ctx.getHandler();
188
189 try {
190 h.beforeAdd(ctx);
191 } catch (Throwable t) {
192 throw new ChannelHandlerLifeCycleException(
193 h.getClass().getName() +
194 ".beforeAdd() has thrown an exception; not adding.", t);
195 }
196 }
197
198 private void callAfterAdd(ChannelHandlerContext ctx) {
199 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
200 return;
201 }
202
203 LifeCycleAwareChannelHandler h =
204 (LifeCycleAwareChannelHandler) ctx.getHandler();
205
206 try {
207 h.afterAdd(ctx);
208 } catch (Throwable t) {
209 boolean removed = false;
210 try {
211 callBeforeRemove(ctx);
212 callAfterRemove(ctx);
213 removed = true;
214 } catch (Throwable t2) {
215 logger.warn("Failed to remove a handler: " + ctx.getName(), t2);
216 }
217
218 if (removed) {
219 throw new ChannelHandlerLifeCycleException(
220 h.getClass().getName() +
221 ".afterAdd() has thrown an exception; removed.", t);
222 } else {
223 throw new ChannelHandlerLifeCycleException(
224 h.getClass().getName() +
225 ".afterAdd() has thrown an exception; also failed to remove.", t);
226 }
227 }
228 }
229
230 private void callBeforeRemove(ChannelHandlerContext ctx) {
231 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
232 return;
233 }
234
235 LifeCycleAwareChannelHandler h =
236 (LifeCycleAwareChannelHandler) ctx.getHandler();
237
238 try {
239 h.beforeRemove(ctx);
240 } catch (Throwable t) {
241 throw new ChannelHandlerLifeCycleException(
242 h.getClass().getName() +
243 ".beforeRemove() has thrown an exception; not removing.", t);
244 }
245 }
246
247 private void callAfterRemove(ChannelHandlerContext ctx) {
248 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
249 return;
250 }
251
252 LifeCycleAwareChannelHandler h =
253 (LifeCycleAwareChannelHandler) ctx.getHandler();
254
255 try {
256 h.afterRemove(ctx);
257 } catch (Throwable t) {
258 throw new ChannelHandlerLifeCycleException(
259 h.getClass().getName() +
260 ".afterRemove() has thrown an exception.", t);
261 }
262 }
263
264 public ChannelHandler getFirst() {
265 return contexts[0].getHandler();
266 }
267
268 public ChannelHandler getLast() {
269 return contexts[contexts.length - 1].getHandler();
270 }
271
272 public ChannelHandler get(String name) {
273 StaticChannelHandlerContext ctx = name2ctx.get(name);
274 if (ctx == null) {
275 return null;
276 } else {
277 return ctx.getHandler();
278 }
279 }
280
281 @SuppressWarnings("unchecked")
282 public <T extends ChannelHandler> T get(Class<T> handlerType) {
283 ChannelHandlerContext ctx = getContext(handlerType);
284 if (ctx == null) {
285 return null;
286 } else {
287 return (T) ctx.getHandler();
288 }
289 }
290
291 public ChannelHandlerContext getContext(String name) {
292 if (name == null) {
293 throw new NullPointerException("name");
294 }
295 return name2ctx.get(name);
296 }
297
298 public ChannelHandlerContext getContext(ChannelHandler handler) {
299 if (handler == null) {
300 throw new NullPointerException("handler");
301 }
302 for (StaticChannelHandlerContext ctx: contexts) {
303 if (ctx.getHandler() == handler) {
304 return ctx;
305 }
306 }
307 return null;
308 }
309
310 public ChannelHandlerContext getContext(Class<? extends ChannelHandler> handlerType) {
311 if (handlerType == null) {
312 throw new NullPointerException("handlerType");
313 }
314 for (StaticChannelHandlerContext ctx: contexts) {
315 if (handlerType.isAssignableFrom(ctx.getHandler().getClass())) {
316 return ctx;
317 }
318 }
319 return null;
320 }
321
322 public List<String> getNames() {
323 List<String> list = new ArrayList<String>();
324 for (StaticChannelHandlerContext ctx: contexts) {
325 list.add(ctx.getName());
326 }
327 return list;
328 }
329
330 public Map<String, ChannelHandler> toMap() {
331 Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
332 for (StaticChannelHandlerContext ctx: contexts) {
333 map.put(ctx.getName(), ctx.getHandler());
334 }
335 return map;
336 }
337
338
339
340
341 @Override
342 public String toString() {
343 StringBuilder buf = new StringBuilder();
344 buf.append(getClass().getSimpleName());
345 buf.append('{');
346
347 for (StaticChannelHandlerContext ctx: contexts) {
348 buf.append('(');
349 buf.append(ctx.getName());
350 buf.append(" = ");
351 buf.append(ctx.getHandler().getClass().getName());
352 buf.append(')');
353 buf.append(", ");
354 }
355 buf.replace(buf.length() - 2, buf.length(), "}");
356 return buf.toString();
357 }
358
359 public void sendUpstream(ChannelEvent e) {
360 StaticChannelHandlerContext head = getActualUpstreamContext(0);
361 if (head == null) {
362 logger.warn(
363 "The pipeline contains no upstream handlers; discarding: " + e);
364 return;
365 }
366
367 sendUpstream(head, e);
368 }
369
370 void sendUpstream(StaticChannelHandlerContext ctx, ChannelEvent e) {
371 try {
372 ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
373 } catch (Throwable t) {
374 notifyHandlerException(e, t);
375 }
376 }
377
378 public void sendDownstream(ChannelEvent e) {
379 StaticChannelHandlerContext tail = getActualDownstreamContext(lastIndex);
380 if (tail == null) {
381 try {
382 getSink().eventSunk(this, e);
383 return;
384 } catch (Throwable t) {
385 notifyHandlerException(e, t);
386 return;
387 }
388 }
389
390 sendDownstream(tail, e);
391 }
392
393 void sendDownstream(StaticChannelHandlerContext ctx, ChannelEvent e) {
394 if (e instanceof UpstreamMessageEvent) {
395 throw new IllegalArgumentException("cannot send an upstream event to downstream");
396 }
397
398 try {
399 ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
400 } catch (Throwable t) {
401
402
403
404
405
406 e.getFuture().setFailure(t);
407 notifyHandlerException(e, t);
408 }
409 }
410
411 StaticChannelHandlerContext getActualUpstreamContext(int index) {
412 for (int i = index; i < contexts.length; i ++) {
413 StaticChannelHandlerContext ctx = contexts[i];
414 if (ctx.canHandleUpstream()) {
415 return ctx;
416 }
417 }
418 return null;
419 }
420
421 StaticChannelHandlerContext getActualDownstreamContext(int index) {
422 for (int i = index; i >= 0; i --) {
423 StaticChannelHandlerContext ctx = contexts[i];
424 if (ctx.canHandleDownstream()) {
425 return ctx;
426 }
427 }
428 return null;
429 }
430
431 protected void notifyHandlerException(ChannelEvent e, Throwable t) {
432 if (e instanceof ExceptionEvent) {
433 logger.warn(
434 "An exception was thrown by a user handler " +
435 "while handling an exception event (" + e + ")", t);
436 return;
437 }
438
439 ChannelPipelineException pe;
440 if (t instanceof ChannelPipelineException) {
441 pe = (ChannelPipelineException) t;
442 } else {
443 pe = new ChannelPipelineException(t);
444 }
445
446 try {
447 sink.exceptionCaught(this, e, pe);
448 } catch (Exception e1) {
449 logger.warn("An exception was thrown by an exception handler.", e1);
450 }
451 }
452
453 private final class StaticChannelHandlerContext implements ChannelHandlerContext {
454 private final int index;
455 private final String name;
456 private final ChannelHandler handler;
457 private final boolean canHandleUpstream;
458 private final boolean canHandleDownstream;
459 private volatile Object attachment;
460
461 StaticChannelHandlerContext(
462 int index, String name, ChannelHandler handler) {
463
464 if (name == null) {
465 throw new NullPointerException("name");
466 }
467 if (handler == null) {
468 throw new NullPointerException("handler");
469 }
470 canHandleUpstream = handler instanceof ChannelUpstreamHandler;
471 canHandleDownstream = handler instanceof ChannelDownstreamHandler;
472
473
474 if (!canHandleUpstream && !canHandleDownstream) {
475 throw new IllegalArgumentException(
476 "handler must be either " +
477 ChannelUpstreamHandler.class.getName() + " or " +
478 ChannelDownstreamHandler.class.getName() + '.');
479 }
480
481 this.index = index;
482 this.name = name;
483 this.handler = handler;
484 }
485
486 public Channel getChannel() {
487 return getPipeline().getChannel();
488 }
489
490 public ChannelPipeline getPipeline() {
491 return StaticChannelPipeline.this;
492 }
493
494 public boolean canHandleDownstream() {
495 return canHandleDownstream;
496 }
497
498 public boolean canHandleUpstream() {
499 return canHandleUpstream;
500 }
501
502 public ChannelHandler getHandler() {
503 return handler;
504 }
505
506 public String getName() {
507 return name;
508 }
509
510 public Object getAttachment() {
511 return attachment;
512 }
513
514 public void setAttachment(Object attachment) {
515 this.attachment = attachment;
516 }
517
518 public void sendDownstream(ChannelEvent e) {
519 StaticChannelHandlerContext prev = getActualDownstreamContext(index - 1);
520 if (prev == null) {
521 try {
522 getSink().eventSunk(StaticChannelPipeline.this, e);
523 } catch (Throwable t) {
524 notifyHandlerException(e, t);
525 }
526 } else {
527 StaticChannelPipeline.this.sendDownstream(prev, e);
528 }
529 }
530
531 public void sendUpstream(ChannelEvent e) {
532 StaticChannelHandlerContext next = getActualUpstreamContext(index + 1);
533 if (next != null) {
534 StaticChannelPipeline.this.sendUpstream(next, e);
535 }
536 }
537 }
538 }