View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel;
17  
18  import java.util.ArrayList;
19  import java.util.HashMap;
20  import java.util.LinkedHashMap;
21  import java.util.List;
22  import java.util.Map;
23  
24  import org.jboss.netty.logging.InternalLogger;
25  import org.jboss.netty.logging.InternalLoggerFactory;
26  import org.jboss.netty.util.internal.ConversionUtil;
27  
28  /**
29   * A {@link ChannelPipeline} that might perform better at the cost of
30   * disabled dynamic insertion and removal of {@link ChannelHandler}s.
31   * An attempt to insert, remove, or replace a handler in this pipeline will
32   * trigger an {@link UnsupportedOperationException}.
33   *
34   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
35   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
36   *
37   * @version $Rev: 2267 $, $Date: 2010-05-06 16:00:52 +0900 (Thu, 06 May 2010) $
38   *
39   */
40  public class StaticChannelPipeline implements ChannelPipeline {
41  
42      // FIXME Code duplication with DefaultChannelPipeline
43      static final InternalLogger logger = InternalLoggerFactory.getInstance(StaticChannelPipeline.class);
44  
45      private volatile Channel channel;
46      private volatile ChannelSink sink;
47      private final StaticChannelHandlerContext[] contexts;
48      private final int lastIndex;
49      private final Map<String, StaticChannelHandlerContext> name2ctx =
50          new HashMap<String, StaticChannelHandlerContext>(4);
51  
52      /**
53       * Creates a new pipeline from the specified handlers.
54       * The names of the specified handlers are generated automatically;
55       * the first handler's name is {@code "0"}, the second handler's name is
56       * {@code "1"}, the third handler's name is {@code "2"}, and so on.
57       */
58      public StaticChannelPipeline(ChannelHandler... handlers) {
59          if (handlers == null) {
60              throw new NullPointerException("handlers");
61          }
62          if (handlers.length == 0) {
63              throw new IllegalArgumentException("no handlers specified");
64          }
65  
66          // Get the number of first non-null handlers.
67          StaticChannelHandlerContext[] contexts =
68              new StaticChannelHandlerContext[handlers.length];
69          int nContexts;
70          for (nContexts = 0; nContexts < contexts.length; nContexts ++) {
71              ChannelHandler h = handlers[nContexts];
72              if (h == null) {
73                  break;
74              }
75          }
76  
77          if (nContexts == contexts.length) {
78              this.contexts = contexts;
79              lastIndex = contexts.length - 1;
80          } else {
81              this.contexts = contexts =
82                  new StaticChannelHandlerContext[nContexts];
83              lastIndex = nContexts - 1;
84          }
85  
86          // Initialize the first non-null handlers only.
87          for (int i = 0; i < nContexts; i ++) {
88              ChannelHandler h = handlers[i];
89              String name = ConversionUtil.toString(i);
90              StaticChannelHandlerContext ctx =
91                  new StaticChannelHandlerContext(i, name, h);
92              contexts[i] = ctx;
93              name2ctx.put(name, ctx);
94          }
95  
96          for (ChannelHandlerContext ctx: contexts) {
97              callBeforeAdd(ctx);
98              callAfterAdd(ctx);
99          }
100     }
101 
102     public Channel getChannel() {
103         return channel;
104     }
105 
106     public ChannelSink getSink() {
107         ChannelSink sink = this.sink;
108         if (sink == null) {
109             return DefaultChannelPipeline.discardingSink;
110         }
111         return sink;
112     }
113 
114     public void attach(Channel channel, ChannelSink sink) {
115         if (channel == null) {
116             throw new NullPointerException("channel");
117         }
118         if (sink == null) {
119             throw new NullPointerException("sink");
120         }
121         if (this.channel != null || this.sink != null) {
122             throw new IllegalStateException("attached already");
123         }
124         this.channel = channel;
125         this.sink = sink;
126     }
127 
128     public boolean isAttached() {
129         return sink != null;
130     }
131 
132     public void addFirst(String name, ChannelHandler handler) {
133         throw new UnsupportedOperationException();
134     }
135 
136     public void addLast(String name, ChannelHandler handler) {
137         throw new UnsupportedOperationException();
138     }
139 
140     public void addBefore(String baseName, String name, ChannelHandler handler) {
141         throw new UnsupportedOperationException();
142     }
143 
144     public void addAfter(String baseName, String name, ChannelHandler handler) {
145         throw new UnsupportedOperationException();
146     }
147 
148     public void remove(ChannelHandler handler) {
149         throw new UnsupportedOperationException();
150     }
151 
152     public ChannelHandler remove(String name) {
153         throw new UnsupportedOperationException();
154     }
155 
156     public <T extends ChannelHandler> T remove(Class<T> handlerType) {
157         throw new UnsupportedOperationException();
158     }
159 
160     public ChannelHandler removeFirst() {
161         throw new UnsupportedOperationException();
162     }
163 
164     public ChannelHandler removeLast() {
165         throw new UnsupportedOperationException();
166     }
167 
168     public void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
169         throw new UnsupportedOperationException();
170     }
171 
172     public ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
173         throw new UnsupportedOperationException();
174     }
175 
176     public <T extends ChannelHandler> T replace(
177             Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
178         throw new UnsupportedOperationException();
179     }
180 
181     private void callBeforeAdd(ChannelHandlerContext ctx) {
182         if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
183             return;
184         }
185 
186         LifeCycleAwareChannelHandler h =
187             (LifeCycleAwareChannelHandler) ctx.getHandler();
188 
189         try {
190             h.beforeAdd(ctx);
191         } catch (Throwable t) {
192             throw new ChannelHandlerLifeCycleException(
193                     h.getClass().getName() +
194                     ".beforeAdd() has thrown an exception; not adding.", t);
195         }
196     }
197 
198     private void callAfterAdd(ChannelHandlerContext ctx) {
199         if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
200             return;
201         }
202 
203         LifeCycleAwareChannelHandler h =
204             (LifeCycleAwareChannelHandler) ctx.getHandler();
205 
206         try {
207             h.afterAdd(ctx);
208         } catch (Throwable t) {
209             boolean removed = false;
210             try {
211                 callBeforeRemove(ctx);
212                 callAfterRemove(ctx);
213                 removed = true;
214             } catch (Throwable t2) {
215                 logger.warn("Failed to remove a handler: " + ctx.getName(), t2);
216             }
217 
218             if (removed) {
219                 throw new ChannelHandlerLifeCycleException(
220                         h.getClass().getName() +
221                         ".afterAdd() has thrown an exception; removed.", t);
222             } else {
223                 throw new ChannelHandlerLifeCycleException(
224                         h.getClass().getName() +
225                         ".afterAdd() has thrown an exception; also failed to remove.", t);
226             }
227         }
228     }
229 
230     private void callBeforeRemove(ChannelHandlerContext ctx) {
231         if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
232             return;
233         }
234 
235         LifeCycleAwareChannelHandler h =
236             (LifeCycleAwareChannelHandler) ctx.getHandler();
237 
238         try {
239             h.beforeRemove(ctx);
240         } catch (Throwable t) {
241             throw new ChannelHandlerLifeCycleException(
242                     h.getClass().getName() +
243                     ".beforeRemove() has thrown an exception; not removing.", t);
244         }
245     }
246 
247     private void callAfterRemove(ChannelHandlerContext ctx) {
248         if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
249             return;
250         }
251 
252         LifeCycleAwareChannelHandler h =
253             (LifeCycleAwareChannelHandler) ctx.getHandler();
254 
255         try {
256             h.afterRemove(ctx);
257         } catch (Throwable t) {
258             throw new ChannelHandlerLifeCycleException(
259                     h.getClass().getName() +
260                     ".afterRemove() has thrown an exception.", t);
261         }
262     }
263 
264     public ChannelHandler getFirst() {
265         return contexts[0].getHandler();
266     }
267 
268     public ChannelHandler getLast() {
269         return contexts[contexts.length - 1].getHandler();
270     }
271 
272     public ChannelHandler get(String name) {
273         StaticChannelHandlerContext ctx = name2ctx.get(name);
274         if (ctx == null) {
275             return null;
276         } else {
277             return ctx.getHandler();
278         }
279     }
280 
281     @SuppressWarnings("unchecked")
282     public <T extends ChannelHandler> T get(Class<T> handlerType) {
283         ChannelHandlerContext ctx = getContext(handlerType);
284         if (ctx == null) {
285             return null;
286         } else {
287             return (T) ctx.getHandler();
288         }
289     }
290 
291     public ChannelHandlerContext getContext(String name) {
292         if (name == null) {
293             throw new NullPointerException("name");
294         }
295         return name2ctx.get(name);
296     }
297 
298     public ChannelHandlerContext getContext(ChannelHandler handler) {
299         if (handler == null) {
300             throw new NullPointerException("handler");
301         }
302         for (StaticChannelHandlerContext ctx: contexts) {
303             if (ctx.getHandler() == handler) {
304                 return ctx;
305             }
306         }
307         return null;
308     }
309 
310     public ChannelHandlerContext getContext(Class<? extends ChannelHandler> handlerType) {
311         if (handlerType == null) {
312             throw new NullPointerException("handlerType");
313         }
314         for (StaticChannelHandlerContext ctx: contexts) {
315             if (handlerType.isAssignableFrom(ctx.getHandler().getClass())) {
316                 return ctx;
317             }
318         }
319         return null;
320     }
321 
322     public List<String> getNames() {
323         List<String> list = new ArrayList<String>();
324         for (StaticChannelHandlerContext ctx: contexts) {
325             list.add(ctx.getName());
326         }
327         return list;
328     }
329 
330     public Map<String, ChannelHandler> toMap() {
331         Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
332         for (StaticChannelHandlerContext ctx: contexts) {
333             map.put(ctx.getName(), ctx.getHandler());
334         }
335         return map;
336     }
337 
338     /**
339      * Returns the {@link String} representation of this pipeline.
340      */
341     @Override
342     public String toString() {
343         StringBuilder buf = new StringBuilder();
344         buf.append(getClass().getSimpleName());
345         buf.append('{');
346 
347         for (StaticChannelHandlerContext ctx: contexts) {
348             buf.append('(');
349             buf.append(ctx.getName());
350             buf.append(" = ");
351             buf.append(ctx.getHandler().getClass().getName());
352             buf.append(')');
353             buf.append(", ");
354         }
355         buf.replace(buf.length() - 2, buf.length(), "}");
356         return buf.toString();
357     }
358 
359     public void sendUpstream(ChannelEvent e) {
360         StaticChannelHandlerContext head = getActualUpstreamContext(0);
361         if (head == null) {
362             logger.warn(
363                     "The pipeline contains no upstream handlers; discarding: " + e);
364             return;
365         }
366 
367         sendUpstream(head, e);
368     }
369 
370     void sendUpstream(StaticChannelHandlerContext ctx, ChannelEvent e) {
371         try {
372             ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
373         } catch (Throwable t) {
374             notifyHandlerException(e, t);
375         }
376     }
377 
378     public void sendDownstream(ChannelEvent e) {
379         StaticChannelHandlerContext tail = getActualDownstreamContext(lastIndex);
380         if (tail == null) {
381             try {
382                 getSink().eventSunk(this, e);
383                 return;
384             } catch (Throwable t) {
385                 notifyHandlerException(e, t);
386                 return;
387             }
388         }
389 
390         sendDownstream(tail, e);
391     }
392 
393     void sendDownstream(StaticChannelHandlerContext ctx, ChannelEvent e) {
394         if (e instanceof UpstreamMessageEvent) {
395             throw new IllegalArgumentException("cannot send an upstream event to downstream");
396         }
397 
398         try {
399             ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
400         } catch (Throwable t) {
401             // Unlike an upstream event, a downstream event usually has an
402             // incomplete future which is supposed to be updated by ChannelSink.
403             // However, if an exception is raised before the event reaches at
404             // ChannelSink, the future is not going to be updated, so we update
405             // here.
406             e.getFuture().setFailure(t);
407             notifyHandlerException(e, t);
408         }
409     }
410 
411     StaticChannelHandlerContext getActualUpstreamContext(int index) {
412         for (int i = index; i < contexts.length; i ++) {
413             StaticChannelHandlerContext ctx = contexts[i];
414             if (ctx.canHandleUpstream()) {
415                 return ctx;
416             }
417         }
418         return null;
419     }
420 
421     StaticChannelHandlerContext getActualDownstreamContext(int index) {
422         for (int i = index; i >= 0; i --) {
423             StaticChannelHandlerContext ctx = contexts[i];
424             if (ctx.canHandleDownstream()) {
425                 return ctx;
426             }
427         }
428         return null;
429     }
430 
431     protected void notifyHandlerException(ChannelEvent e, Throwable t) {
432         if (e instanceof ExceptionEvent) {
433             logger.warn(
434                     "An exception was thrown by a user handler " +
435                     "while handling an exception event (" + e + ")", t);
436             return;
437         }
438 
439         ChannelPipelineException pe;
440         if (t instanceof ChannelPipelineException) {
441             pe = (ChannelPipelineException) t;
442         } else {
443             pe = new ChannelPipelineException(t);
444         }
445 
446         try {
447             sink.exceptionCaught(this, e, pe);
448         } catch (Exception e1) {
449             logger.warn("An exception was thrown by an exception handler.", e1);
450         }
451     }
452 
453     private final class StaticChannelHandlerContext implements ChannelHandlerContext {
454         private final int index;
455         private final String name;
456         private final ChannelHandler handler;
457         private final boolean canHandleUpstream;
458         private final boolean canHandleDownstream;
459         private volatile Object attachment;
460 
461         StaticChannelHandlerContext(
462                 int index, String name, ChannelHandler handler) {
463 
464             if (name == null) {
465                 throw new NullPointerException("name");
466             }
467             if (handler == null) {
468                 throw new NullPointerException("handler");
469             }
470             canHandleUpstream = handler instanceof ChannelUpstreamHandler;
471             canHandleDownstream = handler instanceof ChannelDownstreamHandler;
472 
473 
474             if (!canHandleUpstream && !canHandleDownstream) {
475                 throw new IllegalArgumentException(
476                         "handler must be either " +
477                         ChannelUpstreamHandler.class.getName() + " or " +
478                         ChannelDownstreamHandler.class.getName() + '.');
479             }
480 
481             this.index = index;
482             this.name = name;
483             this.handler = handler;
484         }
485 
486         public Channel getChannel() {
487             return getPipeline().getChannel();
488         }
489 
490         public ChannelPipeline getPipeline() {
491             return StaticChannelPipeline.this;
492         }
493 
494         public boolean canHandleDownstream() {
495             return canHandleDownstream;
496         }
497 
498         public boolean canHandleUpstream() {
499             return canHandleUpstream;
500         }
501 
502         public ChannelHandler getHandler() {
503             return handler;
504         }
505 
506         public String getName() {
507             return name;
508         }
509 
510         public Object getAttachment() {
511             return attachment;
512         }
513 
514         public void setAttachment(Object attachment) {
515             this.attachment = attachment;
516         }
517 
518         public void sendDownstream(ChannelEvent e) {
519             StaticChannelHandlerContext prev = getActualDownstreamContext(index - 1);
520             if (prev == null) {
521                 try {
522                     getSink().eventSunk(StaticChannelPipeline.this, e);
523                 } catch (Throwable t) {
524                     notifyHandlerException(e, t);
525                 }
526             } else {
527                 StaticChannelPipeline.this.sendDownstream(prev, e);
528             }
529         }
530 
531         public void sendUpstream(ChannelEvent e) {
532             StaticChannelHandlerContext next = getActualUpstreamContext(index + 1);
533             if (next != null) {
534                 StaticChannelPipeline.this.sendUpstream(next, e);
535             }
536         }
537     }
538 }