View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.frame;
17  
18  import java.net.SocketAddress;
19  
20  import org.jboss.netty.buffer.ChannelBuffer;
21  import org.jboss.netty.buffer.ChannelBuffers;
22  import org.jboss.netty.channel.Channel;
23  import org.jboss.netty.channel.ChannelHandler;
24  import org.jboss.netty.channel.ChannelHandlerContext;
25  import org.jboss.netty.channel.ChannelPipeline;
26  import org.jboss.netty.channel.ChannelStateEvent;
27  import org.jboss.netty.channel.ChannelUpstreamHandler;
28  import org.jboss.netty.channel.Channels;
29  import org.jboss.netty.channel.ExceptionEvent;
30  import org.jboss.netty.channel.MessageEvent;
31  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
32  import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
33  
34  /**
35   * Decodes the received {@link ChannelBuffer}s into a meaningful frame object.
36   * <p>
37   * In a stream-based transport such as TCP/IP, packets can be fragmented and
38   * reassembled during transmission even in a LAN environment.  For example,
39   * let us assume you have received three packets:
40   * <pre>
41   * +-----+-----+-----+
42   * | ABC | DEF | GHI |
43   * +-----+-----+-----+
44   * </pre>
45   * because of the packet fragmentation, a server can receive them like the
46   * following:
47   * <pre>
48   * +----+-------+---+---+
49   * | AB | CDEFG | H | I |
50   * +----+-------+---+---+
51   * </pre>
52   * <p>
53   * {@link FrameDecoder} helps you defrag the received packets into one or more
54   * meaningful <strong>frames</strong> that could be easily understood by the
55   * application logic.  In case of the example above, your {@link FrameDecoder}
56   * implementation could defrag the received packets like the following:
57   * <pre>
58   * +-----+-----+-----+
59   * | ABC | DEF | GHI |
60   * +-----+-----+-----+
61   * </pre>
62   * <p>
63   * The following code shows an example handler which decodes a frame whose
64   * first 4 bytes header represents the length of the frame, excluding the
65   * header.
66   * <pre>
67   * MESSAGE FORMAT
68   * ==============
69   *
70   * Offset:  0        4                   (Length + 4)
71   *          +--------+------------------------+
72   * Fields:  | Length | Actual message content |
73   *          +--------+------------------------+
74   *
75   * DECODER IMPLEMENTATION
76   * ======================
77   *
78   * public class IntegerHeaderFrameDecoder extends {@link FrameDecoder} {
79   *
80   *   {@code @Override}
81   *   protected Object decode({@link ChannelHandlerContext} ctx,
82   *                           {@link Channel channel},
83   *                           {@link ChannelBuffer} buf) throws Exception {
84   *
85   *     // Make sure if the length field was received.
86   *     if (buf.readableBytes() &lt; 4) {
87   *        // The length field was not received yet - return null.
88   *        // This method will be invoked again when more packets are
89   *        // received and appended to the buffer.
90   *        return <strong>null</strong>;
91   *     }
92   *
93   *     // The length field is in the buffer.
94   *
95   *     // Mark the current buffer position before reading the length field
96   *     // because the whole frame might not be in the buffer yet.
97   *     // We will reset the buffer position to the marked position if
98   *     // there's not enough bytes in the buffer.
99   *     buf.markReaderIndex();
100  *
101  *     // Read the length field.
102  *     int length = buf.readInt();
103  *
104  *     // Make sure if there's enough bytes in the buffer.
105  *     if (buf.readableBytes() &lt; length) {
106  *        // The whole bytes were not received yet - return null.
107  *        // This method will be invoked again when more packets are
108  *        // received and appended to the buffer.
109  *
110  *        // Reset to the marked position to read the length field again
111  *        // next time.
112  *        buf.resetReaderIndex();
113  *
114  *        return <strong>null</strong>;
115  *     }
116  *
117  *     // There's enough bytes in the buffer. Read it.
118  *     {@link ChannelBuffer} frame = buf.readBytes(length);
119  *
120  *     // Successfully decoded a frame.  Return the decoded frame.
121  *     return <strong>frame</strong>;
122  *   }
123  * }
124  * </pre>
125  *
126  * <h3>Returning a POJO rather than a {@link ChannelBuffer}</h3>
127  * <p>
128  * Please note that you can return an object of a different type than
129  * {@link ChannelBuffer} in your {@code decode()} and {@code decodeLast()}
130  * implementation.  For example, you could return a
131  * <a href="http://en.wikipedia.org/wiki/POJO">POJO</a> so that the next
132  * {@link ChannelUpstreamHandler} receives a {@link MessageEvent} which
133  * contains a POJO rather than a {@link ChannelBuffer}.
134  *
135  * <h3>Replacing a decoder with another decoder in a pipeline</h3>
136  * <p>
137  * If you are going to write a protocol multiplexer, you will probably want to
138  * replace a {@link FrameDecoder} (protocol detector) with another
139  * {@link FrameDecoder} or {@link ReplayingDecoder} (actual protocol decoder).
140  * It is not possible to achieve this simply by calling
141  * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but
142  * some additional steps are required:
143  * <pre>
144  * public class FirstDecoder extends {@link FrameDecoder} {
145  *
146  *     public FirstDecoder() {
147  *         super(true); // Enable unfold
148  *     }
149  *
150  *     {@code @Override}
151  *     protected Object decode({@link ChannelHandlerContext} ctx,
152  *                             {@link Channel} channel,
153  *                             {@link ChannelBuffer} buf) {
154  *         ...
155  *         // Decode the first message
156  *         Object firstMessage = ...;
157  *
158  *         // Add the second decoder
159  *         ctx.getPipeline().addLast("second", new SecondDecoder());
160  *
161  *         // Remove the first decoder (me)
162  *         ctx.getPipeline().remove(this);
163  *
164  *         if (buf.readable()) {
165  *             // Hand off the remaining data to the second decoder
166  *             return new Object[] { firstMessage, buf.readBytes(buf.readableBytes()) };
167  *         } else {
168  *             // Nothing to hand off
169  *             return firstMessage;
170  *         }
171  *     }
172  * }
173  * </pre>
174  *
175  * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
176  * @author <a href="http://gleamynode.net/">Trustin Lee</a>
177  *
178  * @version $Rev:231 $, $Date:2008-06-12 16:44:50 +0900 (목, 12 6월 2008) $
179  *
180  * @apiviz.landmark
181  */
182 public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
183 
184     private final boolean unfold;
185     private ChannelBuffer cumulation;
186 
187     protected FrameDecoder() {
188         this(false);
189     }
190 
191     protected FrameDecoder(boolean unfold) {
192         this.unfold = unfold;
193     }
194 
195     @Override
196     public void messageReceived(
197             ChannelHandlerContext ctx, MessageEvent e) throws Exception {
198 
199         Object m = e.getMessage();
200         if (!(m instanceof ChannelBuffer)) {
201             ctx.sendUpstream(e);
202             return;
203         }
204 
205         ChannelBuffer input = (ChannelBuffer) m;
206         if (!input.readable()) {
207             return;
208         }
209 
210         ChannelBuffer cumulation = cumulation(ctx);
211         if (cumulation.readable()) {
212             cumulation.discardReadBytes();
213             cumulation.writeBytes(input);
214             callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
215         } else {
216             callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
217             if (input.readable()) {
218                 cumulation.writeBytes(input);
219             }
220         }
221     }
222 
223     @Override
224     public void channelDisconnected(
225             ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
226         cleanup(ctx, e);
227     }
228 
229     @Override
230     public void channelClosed(
231             ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
232         cleanup(ctx, e);
233     }
234 
235     @Override
236     public void exceptionCaught(
237             ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
238         ctx.sendUpstream(e);
239     }
240 
241     /**
242      * Decodes the received packets so far into a frame.
243      *
244      * @param ctx      the context of this handler
245      * @param channel  the current channel
246      * @param buffer   the cumulative buffer of received packets so far.
247      *                 Note that the buffer might be empty, which means you
248      *                 should not make an assumption that the buffer contains
249      *                 at least one byte in your decoder implementation.
250      *
251      * @return the decoded frame if a full frame was received and decoded.
252      *         {@code null} if there's not enough data in the buffer to decode a frame.
253      */
254     protected abstract Object decode(
255             ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception;
256 
257     /**
258      * Decodes the received data so far into a frame when the channel is
259      * disconnected.
260      *
261      * @param ctx      the context of this handler
262      * @param channel  the current channel
263      * @param buffer   the cumulative buffer of received packets so far.
264      *                 Note that the buffer might be empty, which means you
265      *                 should not make an assumption that the buffer contains
266      *                 at least one byte in your decoder implementation.
267      *
268      * @return the decoded frame if a full frame was received and decoded.
269      *         {@code null} if there's not enough data in the buffer to decode a frame.
270      */
271     protected Object decodeLast(
272             ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
273         return decode(ctx, channel, buffer);
274     }
275 
276     private void callDecode(
277             ChannelHandlerContext context, Channel channel,
278             ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception {
279 
280         while (cumulation.readable()) {
281             int oldReaderIndex = cumulation.readerIndex();
282             Object frame = decode(context, channel, cumulation);
283             if (frame == null) {
284                 if (oldReaderIndex == cumulation.readerIndex()) {
285                     // Seems like more data is required.
286                     // Let us wait for the next notification.
287                     break;
288                 } else {
289                     // Previous data has been discarded.
290                     // Probably it is reading on.
291                     continue;
292                 }
293             } else if (oldReaderIndex == cumulation.readerIndex()) {
294                 throw new IllegalStateException(
295                         "decode() method must read at least one byte " +
296                         "if it returned a frame (caused by: " + getClass() + ")");
297             }
298 
299             unfoldAndFireMessageReceived(context, remoteAddress, frame);
300         }
301     }
302 
303     private void unfoldAndFireMessageReceived(ChannelHandlerContext context, SocketAddress remoteAddress, Object result) {
304         if (unfold) {
305             if (result instanceof Object[]) {
306                 for (Object r: (Object[]) result) {
307                     Channels.fireMessageReceived(context, r, remoteAddress);
308                 }
309             } else if (result instanceof Iterable<?>) {
310                 for (Object r: (Iterable<?>) result) {
311                     Channels.fireMessageReceived(context, r, remoteAddress);
312                 }
313             } else {
314                 Channels.fireMessageReceived(context, result, remoteAddress);
315             }
316         } else {
317             Channels.fireMessageReceived(context, result, remoteAddress);
318         }
319     }
320 
321     private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
322             throws Exception {
323         try {
324             ChannelBuffer cumulation = this.cumulation;
325             if (cumulation == null) {
326                 return;
327             } else {
328                 this.cumulation = null;
329             }
330 
331             if (cumulation.readable()) {
332                 // Make sure all frames are read before notifying a closed channel.
333                 callDecode(ctx, ctx.getChannel(), cumulation, null);
334             }
335 
336             // Call decodeLast() finally.  Please note that decodeLast() is
337             // called even if there's nothing more to read from the buffer to
338             // notify a user that the connection was closed explicitly.
339             Object partialFrame = decodeLast(ctx, ctx.getChannel(), cumulation);
340             if (partialFrame != null) {
341                 unfoldAndFireMessageReceived(ctx, null, partialFrame);
342             }
343         } finally {
344             ctx.sendUpstream(e);
345         }
346     }
347 
348     private ChannelBuffer cumulation(ChannelHandlerContext ctx) {
349         ChannelBuffer c = cumulation;
350         if (c == null) {
351             c = ChannelBuffers.dynamicBuffer(
352                     ctx.getChannel().getConfig().getBufferFactory());
353             cumulation = c;
354         }
355         return c;
356     }
357 }