View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.embedder;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.lang.reflect.Array;
21  import java.util.ConcurrentModificationException;
22  import java.util.LinkedList;
23  import java.util.Queue;
24  
25  import org.jboss.netty.buffer.ChannelBufferFactory;
26  import org.jboss.netty.channel.Channel;
27  import org.jboss.netty.channel.ChannelEvent;
28  import org.jboss.netty.channel.ChannelHandler;
29  import org.jboss.netty.channel.ChannelHandlerContext;
30  import org.jboss.netty.channel.ChannelPipeline;
31  import org.jboss.netty.channel.ChannelPipelineException;
32  import org.jboss.netty.channel.ChannelSink;
33  import org.jboss.netty.channel.ChannelUpstreamHandler;
34  import org.jboss.netty.channel.DefaultChannelPipeline;
35  import org.jboss.netty.channel.ExceptionEvent;
36  import org.jboss.netty.channel.MessageEvent;
37  
38  /**
39   * A skeletal {@link CodecEmbedder} implementation.
40   *
41   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
42   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
43   * @version $Rev: 2355 $, $Date: 2010-08-26 14:38:34 +0900 (Thu, 26 Aug 2010) $
44   */
45  abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
46  
47      private final Channel channel;
48      private final ChannelPipeline pipeline;
49      private final EmbeddedChannelSink sink = new EmbeddedChannelSink();
50  
51      final Queue<Object> productQueue = new LinkedList<Object>();
52  
53      /**
54       * Creates a new embedder whose pipeline is composed of the specified
55       * handlers.
56       */
57      protected AbstractCodecEmbedder(ChannelHandler... handlers) {
58          pipeline = new EmbeddedChannelPipeline();
59          configurePipeline(handlers);
60          channel = new EmbeddedChannel(pipeline, sink);
61          fireInitialEvents();
62      }
63  
64      /**
65       * Creates a new embedder whose pipeline is composed of the specified
66       * handlers.
67       *
68       * @param bufferFactory the {@link ChannelBufferFactory} to be used when
69       *                      creating a new buffer.
70       */
71      protected AbstractCodecEmbedder(ChannelBufferFactory bufferFactory, ChannelHandler... handlers) {
72          this(handlers);
73          getChannel().getConfig().setBufferFactory(bufferFactory);
74      }
75  
76      private void fireInitialEvents() {
77          // Fire the typical initial events.
78          fireChannelOpen(channel);
79          fireChannelBound(channel, channel.getLocalAddress());
80          fireChannelConnected(channel, channel.getRemoteAddress());
81      }
82  
83      private void configurePipeline(ChannelHandler... handlers) {
84          if (handlers == null) {
85              throw new NullPointerException("handlers");
86          }
87  
88          if (handlers.length == 0) {
89              throw new IllegalArgumentException(
90                      "handlers should contain at least one " +
91                      ChannelHandler.class.getSimpleName() + '.');
92          }
93  
94          for (int i = 0; i < handlers.length; i ++) {
95              ChannelHandler h = handlers[i];
96              if (h == null) {
97                  throw new NullPointerException("handlers[" + i + "]");
98              }
99              pipeline.addLast(String.valueOf(i), handlers[i]);
100         }
101         pipeline.addLast("SINK", sink);
102     }
103 
104     public boolean finish() {
105         close(channel);
106         fireChannelDisconnected(channel);
107         fireChannelUnbound(channel);
108         fireChannelClosed(channel);
109         return !productQueue.isEmpty();
110     }
111 
112     /**
113      * Returns the virtual {@link Channel} which will be used as a mock
114      * during encoding and decoding.
115      */
116     protected final Channel getChannel() {
117         return channel;
118     }
119 
120     /**
121      * Returns {@code true} if and only if the produce queue is empty and
122      * therefore {@link #poll()} will return {@code null}.
123      */
124     protected final boolean isEmpty() {
125         return productQueue.isEmpty();
126     }
127 
128     @SuppressWarnings("unchecked")
129     public final E poll() {
130         return (E) productQueue.poll();
131     }
132 
133     @SuppressWarnings("unchecked")
134     public final E peek() {
135         return (E) productQueue.peek();
136     }
137 
138     public final Object[] pollAll() {
139         final int size = size();
140         Object[] a = new Object[size];
141         for (int i = 0; i < size; i ++) {
142             E product = poll();
143             if (product == null) {
144                 throw new ConcurrentModificationException();
145             }
146             a[i] = product;
147         }
148         return a;
149     }
150 
151     @SuppressWarnings("unchecked")
152     public final <T> T[] pollAll(T[] a) {
153         if (a == null) {
154             throw new NullPointerException("a");
155         }
156 
157         final int size = size();
158 
159         // Create a new array if the specified one is too small.
160         if (a.length < size) {
161             a = (T[]) Array.newInstance(a.getClass().getComponentType(), size);
162         }
163 
164         for (int i = 0;; i ++) {
165             T product = (T) poll();
166             if (product == null) {
167                 break;
168             }
169             a[i] = product;
170         }
171 
172         // Put the terminator if necessary.
173         if (a.length > size) {
174             a[size] = null;
175         }
176 
177         return a;
178     }
179 
180     public final int size() {
181         return productQueue.size();
182     }
183 
184     public ChannelPipeline getPipeline() {
185         return pipeline;
186     }
187 
188     private final class EmbeddedChannelSink implements ChannelSink, ChannelUpstreamHandler {
189         EmbeddedChannelSink() {
190             super();
191         }
192 
193         public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) {
194             handleEvent(e);
195         }
196 
197         public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) {
198             handleEvent(e);
199         }
200 
201         private void handleEvent(ChannelEvent e) {
202             if (e instanceof MessageEvent) {
203                 boolean offered = productQueue.offer(((MessageEvent) e).getMessage());
204                 assert offered;
205             } else if (e instanceof ExceptionEvent) {
206                 throw new CodecEmbedderException(((ExceptionEvent) e).getCause());
207             }
208 
209             // Swallow otherwise.
210         }
211 
212         public void exceptionCaught(
213                 ChannelPipeline pipeline, ChannelEvent e,
214                 ChannelPipelineException cause) throws Exception {
215             Throwable actualCause = cause.getCause();
216             if (actualCause == null) {
217                 actualCause = cause;
218             }
219 
220             throw new CodecEmbedderException(actualCause);
221         }
222     }
223 
224     private static final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
225 
226         EmbeddedChannelPipeline() {
227             super();
228         }
229 
230         @Override
231         protected void notifyHandlerException(ChannelEvent e, Throwable t) {
232             while (t instanceof ChannelPipelineException && t.getCause() != null) {
233                 t = t.getCause();
234             }
235             if (t instanceof CodecEmbedderException) {
236                 throw (CodecEmbedderException) t;
237             } else {
238                 throw new CodecEmbedderException(t);
239             }
240         }
241     }
242 }