1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel;
17
18 import java.util.ArrayList;
19 import java.util.HashMap;
20 import java.util.LinkedHashMap;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.NoSuchElementException;
24
25 import org.jboss.netty.logging.InternalLogger;
26 import org.jboss.netty.logging.InternalLoggerFactory;
27
28
29
30
31
32
33
34
35
36
37
38
39 public class DefaultChannelPipeline implements ChannelPipeline {
40
41 static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
42 static final ChannelSink discardingSink = new DiscardingChannelSink();
43
44 private volatile Channel channel;
45 private volatile ChannelSink sink;
46 private volatile DefaultChannelHandlerContext head;
47 private volatile DefaultChannelHandlerContext tail;
48 private final Map<String, DefaultChannelHandlerContext> name2ctx =
49 new HashMap<String, DefaultChannelHandlerContext>(4);
50
51
52
53
54 public DefaultChannelPipeline() {
55 super();
56 }
57
58 public Channel getChannel() {
59 return channel;
60 }
61
62 public ChannelSink getSink() {
63 ChannelSink sink = this.sink;
64 if (sink == null) {
65 return discardingSink;
66 }
67 return sink;
68 }
69
70 public void attach(Channel channel, ChannelSink sink) {
71 if (channel == null) {
72 throw new NullPointerException("channel");
73 }
74 if (sink == null) {
75 throw new NullPointerException("sink");
76 }
77 if (this.channel != null || this.sink != null) {
78 throw new IllegalStateException("attached already");
79 }
80 this.channel = channel;
81 this.sink = sink;
82 }
83
84 public boolean isAttached() {
85 return sink != null;
86 }
87
88 public synchronized void addFirst(String name, ChannelHandler handler) {
89 if (name2ctx.isEmpty()) {
90 init(name, handler);
91 } else {
92 checkDuplicateName(name);
93 DefaultChannelHandlerContext oldHead = head;
94 DefaultChannelHandlerContext newHead = new DefaultChannelHandlerContext(null, oldHead, name, handler);
95
96 callBeforeAdd(newHead);
97
98 oldHead.prev = newHead;
99 head = newHead;
100 name2ctx.put(name, newHead);
101
102 callAfterAdd(newHead);
103 }
104 }
105
106 public synchronized void addLast(String name, ChannelHandler handler) {
107 if (name2ctx.isEmpty()) {
108 init(name, handler);
109 } else {
110 checkDuplicateName(name);
111 DefaultChannelHandlerContext oldTail = tail;
112 DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler);
113
114 callBeforeAdd(newTail);
115
116 oldTail.next = newTail;
117 tail = newTail;
118 name2ctx.put(name, newTail);
119
120 callAfterAdd(newTail);
121 }
122 }
123
124 public synchronized void addBefore(String baseName, String name, ChannelHandler handler) {
125 DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
126 if (ctx == head) {
127 addFirst(name, handler);
128 } else {
129 checkDuplicateName(name);
130 DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(ctx.prev, ctx, name, handler);
131
132 callBeforeAdd(newCtx);
133
134 ctx.prev.next = newCtx;
135 ctx.prev = newCtx;
136 name2ctx.put(name, newCtx);
137
138 callAfterAdd(newCtx);
139 }
140 }
141
142 public synchronized void addAfter(String baseName, String name, ChannelHandler handler) {
143 DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
144 if (ctx == tail) {
145 addLast(name, handler);
146 } else {
147 checkDuplicateName(name);
148 DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(ctx, ctx.next, name, handler);
149
150 callBeforeAdd(newCtx);
151
152 ctx.next.prev = newCtx;
153 ctx.next = newCtx;
154 name2ctx.put(name, newCtx);
155
156 callAfterAdd(newCtx);
157 }
158 }
159
160 public synchronized void remove(ChannelHandler handler) {
161 remove(getContextOrDie(handler));
162 }
163
164 public synchronized ChannelHandler remove(String name) {
165 return remove(getContextOrDie(name)).getHandler();
166 }
167
168 @SuppressWarnings("unchecked")
169 public synchronized <T extends ChannelHandler> T remove(Class<T> handlerType) {
170 return (T) remove(getContextOrDie(handlerType)).getHandler();
171 }
172
173 private DefaultChannelHandlerContext remove(DefaultChannelHandlerContext ctx) {
174 if (head == tail) {
175 head = tail = null;
176 name2ctx.clear();
177 } else if (ctx == head) {
178 removeFirst();
179 } else if (ctx == tail) {
180 removeLast();
181 } else {
182 callBeforeRemove(ctx);
183
184 DefaultChannelHandlerContext prev = ctx.prev;
185 DefaultChannelHandlerContext next = ctx.next;
186 prev.next = next;
187 next.prev = prev;
188 name2ctx.remove(ctx.getName());
189
190 callAfterRemove(ctx);
191 }
192 return ctx;
193 }
194
195 public synchronized ChannelHandler removeFirst() {
196 if (name2ctx.isEmpty()) {
197 throw new NoSuchElementException();
198 }
199
200 DefaultChannelHandlerContext oldHead = head;
201 if (oldHead == null) {
202 throw new NoSuchElementException();
203 }
204
205 callBeforeRemove(oldHead);
206
207 if (oldHead.next == null) {
208 head = tail = null;
209 name2ctx.clear();
210 } else {
211 oldHead.next.prev = null;
212 head = oldHead.next;
213 name2ctx.remove(oldHead.getName());
214 }
215
216 callAfterRemove(oldHead);
217
218 return oldHead.getHandler();
219 }
220
221 public synchronized ChannelHandler removeLast() {
222 if (name2ctx.isEmpty()) {
223 throw new NoSuchElementException();
224 }
225
226 DefaultChannelHandlerContext oldTail = tail;
227 if (oldTail == null) {
228 throw new NoSuchElementException();
229 }
230
231 callBeforeRemove(oldTail);
232
233 if (oldTail.prev == null) {
234 head = tail = null;
235 name2ctx.clear();
236 } else {
237 oldTail.prev.next = null;
238 tail = oldTail.prev;
239 name2ctx.remove(oldTail.getName());
240 }
241
242 callBeforeRemove(oldTail);
243
244 return oldTail.getHandler();
245 }
246
247 public synchronized void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
248 replace(getContextOrDie(oldHandler), newName, newHandler);
249 }
250
251 public synchronized ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
252 return replace(getContextOrDie(oldName), newName, newHandler);
253 }
254
255 @SuppressWarnings("unchecked")
256 public synchronized <T extends ChannelHandler> T replace(
257 Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
258 return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
259 }
260
261 private ChannelHandler replace(DefaultChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
262 if (ctx == head) {
263 removeFirst();
264 addFirst(newName, newHandler);
265 } else if (ctx == tail) {
266 removeLast();
267 addLast(newName, newHandler);
268 } else {
269 boolean sameName = ctx.getName().equals(newName);
270 if (!sameName) {
271 checkDuplicateName(newName);
272 }
273
274 DefaultChannelHandlerContext prev = ctx.prev;
275 DefaultChannelHandlerContext next = ctx.next;
276 DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(prev, next, newName, newHandler);
277
278 callBeforeRemove(ctx);
279 callBeforeAdd(newCtx);
280
281 prev.next = newCtx;
282 next.prev = newCtx;
283
284 if (!sameName) {
285 name2ctx.remove(ctx.getName());
286 name2ctx.put(newName, newCtx);
287 }
288
289 ChannelHandlerLifeCycleException removeException = null;
290 ChannelHandlerLifeCycleException addException = null;
291 boolean removed = false;
292 try {
293 callAfterRemove(ctx);
294 removed = true;
295 } catch (ChannelHandlerLifeCycleException e) {
296 removeException = e;
297 }
298
299 boolean added = false;
300 try {
301 callAfterAdd(newCtx);
302 added = true;
303 } catch (ChannelHandlerLifeCycleException e) {
304 addException = e;
305 }
306
307 if (!removed && !added) {
308 logger.warn(removeException.getMessage(), removeException);
309 logger.warn(addException.getMessage(), addException);
310 throw new ChannelHandlerLifeCycleException(
311 "Both " + ctx.getHandler().getClass().getName() +
312 ".afterRemove() and " + newCtx.getHandler().getClass().getName() +
313 ".afterAdd() failed; see logs.");
314 } else if (!removed) {
315 throw removeException;
316 } else if (!added) {
317 throw addException;
318 }
319 }
320
321 return ctx.getHandler();
322 }
323
324 private void callBeforeAdd(ChannelHandlerContext ctx) {
325 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
326 return;
327 }
328
329 LifeCycleAwareChannelHandler h =
330 (LifeCycleAwareChannelHandler) ctx.getHandler();
331
332 try {
333 h.beforeAdd(ctx);
334 } catch (Throwable t) {
335 throw new ChannelHandlerLifeCycleException(
336 h.getClass().getName() +
337 ".beforeAdd() has thrown an exception; not adding.", t);
338 }
339 }
340
341 private void callAfterAdd(ChannelHandlerContext ctx) {
342 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
343 return;
344 }
345
346 LifeCycleAwareChannelHandler h =
347 (LifeCycleAwareChannelHandler) ctx.getHandler();
348
349 try {
350 h.afterAdd(ctx);
351 } catch (Throwable t) {
352 boolean removed = false;
353 try {
354 remove((DefaultChannelHandlerContext) ctx);
355 removed = true;
356 } catch (Throwable t2) {
357 logger.warn("Failed to remove a handler: " + ctx.getName(), t2);
358 }
359
360 if (removed) {
361 throw new ChannelHandlerLifeCycleException(
362 h.getClass().getName() +
363 ".afterAdd() has thrown an exception; removed.", t);
364 } else {
365 throw new ChannelHandlerLifeCycleException(
366 h.getClass().getName() +
367 ".afterAdd() has thrown an exception; also failed to remove.", t);
368 }
369 }
370 }
371
372 private void callBeforeRemove(ChannelHandlerContext ctx) {
373 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
374 return;
375 }
376
377 LifeCycleAwareChannelHandler h =
378 (LifeCycleAwareChannelHandler) ctx.getHandler();
379
380 try {
381 h.beforeRemove(ctx);
382 } catch (Throwable t) {
383 throw new ChannelHandlerLifeCycleException(
384 h.getClass().getName() +
385 ".beforeRemove() has thrown an exception; not removing.", t);
386 }
387 }
388
389 private void callAfterRemove(ChannelHandlerContext ctx) {
390 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
391 return;
392 }
393
394 LifeCycleAwareChannelHandler h =
395 (LifeCycleAwareChannelHandler) ctx.getHandler();
396
397 try {
398 h.afterRemove(ctx);
399 } catch (Throwable t) {
400 throw new ChannelHandlerLifeCycleException(
401 h.getClass().getName() +
402 ".afterRemove() has thrown an exception.", t);
403 }
404 }
405
406 public synchronized ChannelHandler getFirst() {
407 DefaultChannelHandlerContext head = this.head;
408 if (head == null) {
409 return null;
410 }
411 return head.getHandler();
412 }
413
414 public synchronized ChannelHandler getLast() {
415 DefaultChannelHandlerContext tail = this.tail;
416 if (tail == null) {
417 return null;
418 }
419 return tail.getHandler();
420 }
421
422 public synchronized ChannelHandler get(String name) {
423 DefaultChannelHandlerContext ctx = name2ctx.get(name);
424 if (ctx == null) {
425 return null;
426 } else {
427 return ctx.getHandler();
428 }
429 }
430
431 @SuppressWarnings("unchecked")
432 public synchronized <T extends ChannelHandler> T get(Class<T> handlerType) {
433 ChannelHandlerContext ctx = getContext(handlerType);
434 if (ctx == null) {
435 return null;
436 } else {
437 return (T) ctx.getHandler();
438 }
439 }
440
441 public synchronized ChannelHandlerContext getContext(String name) {
442 if (name == null) {
443 throw new NullPointerException("name");
444 }
445 return name2ctx.get(name);
446 }
447
448 public synchronized ChannelHandlerContext getContext(ChannelHandler handler) {
449 if (handler == null) {
450 throw new NullPointerException("handler");
451 }
452 if (name2ctx.isEmpty()) {
453 return null;
454 }
455 DefaultChannelHandlerContext ctx = head;
456 for (;;) {
457 if (ctx.getHandler() == handler) {
458 return ctx;
459 }
460
461 ctx = ctx.next;
462 if (ctx == null) {
463 break;
464 }
465 }
466 return null;
467 }
468
469 public synchronized ChannelHandlerContext getContext(
470 Class<? extends ChannelHandler> handlerType) {
471 if (handlerType == null) {
472 throw new NullPointerException("handlerType");
473 }
474
475 if (name2ctx.isEmpty()) {
476 return null;
477 }
478 DefaultChannelHandlerContext ctx = head;
479 for (;;) {
480 if (handlerType.isAssignableFrom(ctx.getHandler().getClass())) {
481 return ctx;
482 }
483
484 ctx = ctx.next;
485 if (ctx == null) {
486 break;
487 }
488 }
489 return null;
490 }
491
492 public List<String> getNames() {
493 List<String> list = new ArrayList<String>();
494 if (name2ctx.isEmpty()) {
495 return list;
496 }
497
498 DefaultChannelHandlerContext ctx = head;
499 for (;;) {
500 list.add(ctx.getName());
501 ctx = ctx.next;
502 if (ctx == null) {
503 break;
504 }
505 }
506 return list;
507 }
508
509 public Map<String, ChannelHandler> toMap() {
510 Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
511 if (name2ctx.isEmpty()) {
512 return map;
513 }
514
515 DefaultChannelHandlerContext ctx = head;
516 for (;;) {
517 map.put(ctx.getName(), ctx.getHandler());
518 ctx = ctx.next;
519 if (ctx == null) {
520 break;
521 }
522 }
523 return map;
524 }
525
526
527
528
529 @Override
530 public String toString() {
531 StringBuilder buf = new StringBuilder();
532 buf.append(getClass().getSimpleName());
533 buf.append('{');
534 DefaultChannelHandlerContext ctx = head;
535 for (;;) {
536 buf.append('(');
537 buf.append(ctx.getName());
538 buf.append(" = ");
539 buf.append(ctx.getHandler().getClass().getName());
540 buf.append(')');
541 ctx = ctx.next;
542 if (ctx == null) {
543 break;
544 }
545 buf.append(", ");
546 }
547 buf.append('}');
548 return buf.toString();
549 }
550
551 public void sendUpstream(ChannelEvent e) {
552 DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
553 if (head == null) {
554 logger.warn(
555 "The pipeline contains no upstream handlers; discarding: " + e);
556 return;
557 }
558
559 sendUpstream(head, e);
560 }
561
562 void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
563 try {
564 ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
565 } catch (Throwable t) {
566 notifyHandlerException(e, t);
567 }
568 }
569
570 public void sendDownstream(ChannelEvent e) {
571 DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
572 if (tail == null) {
573 try {
574 getSink().eventSunk(this, e);
575 return;
576 } catch (Throwable t) {
577 notifyHandlerException(e, t);
578 return;
579 }
580 }
581
582 sendDownstream(tail, e);
583 }
584
585 void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
586 if (e instanceof UpstreamMessageEvent) {
587 throw new IllegalArgumentException("cannot send an upstream event to downstream");
588 }
589
590 try {
591 ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
592 } catch (Throwable t) {
593
594
595
596
597
598 e.getFuture().setFailure(t);
599 notifyHandlerException(e, t);
600 }
601 }
602
603 DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) {
604 if (ctx == null) {
605 return null;
606 }
607
608 DefaultChannelHandlerContext realCtx = ctx;
609 while (!realCtx.canHandleUpstream()) {
610 realCtx = realCtx.next;
611 if (realCtx == null) {
612 return null;
613 }
614 }
615
616 return realCtx;
617 }
618
619 DefaultChannelHandlerContext getActualDownstreamContext(DefaultChannelHandlerContext ctx) {
620 if (ctx == null) {
621 return null;
622 }
623
624 DefaultChannelHandlerContext realCtx = ctx;
625 while (!realCtx.canHandleDownstream()) {
626 realCtx = realCtx.prev;
627 if (realCtx == null) {
628 return null;
629 }
630 }
631
632 return realCtx;
633 }
634
635 protected void notifyHandlerException(ChannelEvent e, Throwable t) {
636 if (e instanceof ExceptionEvent) {
637 logger.warn(
638 "An exception was thrown by a user handler " +
639 "while handling an exception event (" + e + ")", t);
640 return;
641 }
642
643 ChannelPipelineException pe;
644 if (t instanceof ChannelPipelineException) {
645 pe = (ChannelPipelineException) t;
646 } else {
647 pe = new ChannelPipelineException(t);
648 }
649
650 try {
651 sink.exceptionCaught(this, e, pe);
652 } catch (Exception e1) {
653 logger.warn("An exception was thrown by an exception handler.", e1);
654 }
655 }
656
657 private void init(String name, ChannelHandler handler) {
658 DefaultChannelHandlerContext ctx = new DefaultChannelHandlerContext(null, null, name, handler);
659 callBeforeAdd(ctx);
660 head = tail = ctx;
661 name2ctx.clear();
662 name2ctx.put(name, ctx);
663 callAfterAdd(ctx);
664 }
665
666 private void checkDuplicateName(String name) {
667 if (name2ctx.containsKey(name)) {
668 throw new IllegalArgumentException("Duplicate handler name.");
669 }
670 }
671
672 private DefaultChannelHandlerContext getContextOrDie(String name) {
673 DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(name);
674 if (ctx == null) {
675 throw new NoSuchElementException(name);
676 } else {
677 return ctx;
678 }
679 }
680
681 private DefaultChannelHandlerContext getContextOrDie(ChannelHandler handler) {
682 DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(handler);
683 if (ctx == null) {
684 throw new NoSuchElementException(handler.getClass().getName());
685 } else {
686 return ctx;
687 }
688 }
689
690 private DefaultChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
691 DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(handlerType);
692 if (ctx == null) {
693 throw new NoSuchElementException(handlerType.getName());
694 } else {
695 return ctx;
696 }
697 }
698
699 private final class DefaultChannelHandlerContext implements ChannelHandlerContext {
700 volatile DefaultChannelHandlerContext next;
701 volatile DefaultChannelHandlerContext prev;
702 private final String name;
703 private final ChannelHandler handler;
704 private final boolean canHandleUpstream;
705 private final boolean canHandleDownstream;
706 private volatile Object attachment;
707
708 DefaultChannelHandlerContext(
709 DefaultChannelHandlerContext prev, DefaultChannelHandlerContext next,
710 String name, ChannelHandler handler) {
711
712 if (name == null) {
713 throw new NullPointerException("name");
714 }
715 if (handler == null) {
716 throw new NullPointerException("handler");
717 }
718 canHandleUpstream = handler instanceof ChannelUpstreamHandler;
719 canHandleDownstream = handler instanceof ChannelDownstreamHandler;
720
721
722 if (!canHandleUpstream && !canHandleDownstream) {
723 throw new IllegalArgumentException(
724 "handler must be either " +
725 ChannelUpstreamHandler.class.getName() + " or " +
726 ChannelDownstreamHandler.class.getName() + '.');
727 }
728
729 this.prev = prev;
730 this.next = next;
731 this.name = name;
732 this.handler = handler;
733 }
734
735 public Channel getChannel() {
736 return getPipeline().getChannel();
737 }
738
739 public ChannelPipeline getPipeline() {
740 return DefaultChannelPipeline.this;
741 }
742
743 public boolean canHandleDownstream() {
744 return canHandleDownstream;
745 }
746
747 public boolean canHandleUpstream() {
748 return canHandleUpstream;
749 }
750
751 public ChannelHandler getHandler() {
752 return handler;
753 }
754
755 public String getName() {
756 return name;
757 }
758
759 public Object getAttachment() {
760 return attachment;
761 }
762
763 public void setAttachment(Object attachment) {
764 this.attachment = attachment;
765 }
766
767 public void sendDownstream(ChannelEvent e) {
768 DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
769 if (prev == null) {
770 try {
771 getSink().eventSunk(DefaultChannelPipeline.this, e);
772 } catch (Throwable t) {
773 notifyHandlerException(e, t);
774 }
775 } else {
776 DefaultChannelPipeline.this.sendDownstream(prev, e);
777 }
778 }
779
780 public void sendUpstream(ChannelEvent e) {
781 DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
782 if (next != null) {
783 DefaultChannelPipeline.this.sendUpstream(next, e);
784 }
785 }
786 }
787
788 private static final class DiscardingChannelSink implements ChannelSink {
789 DiscardingChannelSink() {
790 super();
791 }
792
793 public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) {
794 logger.warn("Not attached yet; discarding: " + e);
795 }
796
797 public void exceptionCaught(ChannelPipeline pipeline,
798 ChannelEvent e, ChannelPipelineException cause) throws Exception {
799 throw cause;
800 }
801 }
802 }