1 /*
2 * Copyright 2009 Red Hat, Inc.
3 *
4 * Red Hat licenses this file to you under the Apache License, version 2.0
5 * (the "License"); you may not use this file except in compliance with the
6 * License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.codec.frame;
17
18 import java.net.SocketAddress;
19
20 import org.jboss.netty.buffer.ChannelBuffer;
21 import org.jboss.netty.buffer.ChannelBuffers;
22 import org.jboss.netty.channel.Channel;
23 import org.jboss.netty.channel.ChannelHandler;
24 import org.jboss.netty.channel.ChannelHandlerContext;
25 import org.jboss.netty.channel.ChannelPipeline;
26 import org.jboss.netty.channel.ChannelStateEvent;
27 import org.jboss.netty.channel.ChannelUpstreamHandler;
28 import org.jboss.netty.channel.Channels;
29 import org.jboss.netty.channel.ExceptionEvent;
30 import org.jboss.netty.channel.MessageEvent;
31 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
32 import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
33
34 /**
35 * Decodes the received {@link ChannelBuffer}s into a meaningful frame object.
36 * <p>
37 * In a stream-based transport such as TCP/IP, packets can be fragmented and
38 * reassembled during transmission even in a LAN environment. For example,
39 * let us assume you have received three packets:
40 * <pre>
41 * +-----+-----+-----+
42 * | ABC | DEF | GHI |
43 * +-----+-----+-----+
44 * </pre>
45 * because of the packet fragmentation, a server can receive them like the
46 * following:
47 * <pre>
48 * +----+-------+---+---+
49 * | AB | CDEFG | H | I |
50 * +----+-------+---+---+
51 * </pre>
52 * <p>
53 * {@link FrameDecoder} helps you defrag the received packets into one or more
54 * meaningful <strong>frames</strong> that could be easily understood by the
55 * application logic. In case of the example above, your {@link FrameDecoder}
56 * implementation could defrag the received packets like the following:
57 * <pre>
58 * +-----+-----+-----+
59 * | ABC | DEF | GHI |
60 * +-----+-----+-----+
61 * </pre>
62 * <p>
63 * The following code shows an example handler which decodes a frame whose
64 * first 4 bytes header represents the length of the frame, excluding the
65 * header.
66 * <pre>
67 * MESSAGE FORMAT
68 * ==============
69 *
70 * Offset: 0 4 (Length + 4)
71 * +--------+------------------------+
72 * Fields: | Length | Actual message content |
73 * +--------+------------------------+
74 *
75 * DECODER IMPLEMENTATION
76 * ======================
77 *
78 * public class IntegerHeaderFrameDecoder extends {@link FrameDecoder} {
79 *
80 * {@code @Override}
81 * protected Object decode({@link ChannelHandlerContext} ctx,
82 * {@link Channel channel},
83 * {@link ChannelBuffer} buf) throws Exception {
84 *
85 * // Make sure if the length field was received.
86 * if (buf.readableBytes() < 4) {
87 * // The length field was not received yet - return null.
88 * // This method will be invoked again when more packets are
89 * // received and appended to the buffer.
90 * return <strong>null</strong>;
91 * }
92 *
93 * // The length field is in the buffer.
94 *
95 * // Mark the current buffer position before reading the length field
96 * // because the whole frame might not be in the buffer yet.
97 * // We will reset the buffer position to the marked position if
98 * // there's not enough bytes in the buffer.
99 * buf.markReaderIndex();
100 *
101 * // Read the length field.
102 * int length = buf.readInt();
103 *
104 * // Make sure if there's enough bytes in the buffer.
105 * if (buf.readableBytes() < length) {
106 * // The whole bytes were not received yet - return null.
107 * // This method will be invoked again when more packets are
108 * // received and appended to the buffer.
109 *
110 * // Reset to the marked position to read the length field again
111 * // next time.
112 * buf.resetReaderIndex();
113 *
114 * return <strong>null</strong>;
115 * }
116 *
117 * // There's enough bytes in the buffer. Read it.
118 * {@link ChannelBuffer} frame = buf.readBytes(length);
119 *
120 * // Successfully decoded a frame. Return the decoded frame.
121 * return <strong>frame</strong>;
122 * }
123 * }
124 * </pre>
125 *
126 * <h3>Returning a POJO rather than a {@link ChannelBuffer}</h3>
127 * <p>
128 * Please note that you can return an object of a different type than
129 * {@link ChannelBuffer} in your {@code decode()} and {@code decodeLast()}
130 * implementation. For example, you could return a
131 * <a href="http://en.wikipedia.org/wiki/POJO">POJO</a> so that the next
132 * {@link ChannelUpstreamHandler} receives a {@link MessageEvent} which
133 * contains a POJO rather than a {@link ChannelBuffer}.
134 *
135 * <h3>Replacing a decoder with another decoder in a pipeline</h3>
136 * <p>
137 * If you are going to write a protocol multiplexer, you will probably want to
138 * replace a {@link FrameDecoder} (protocol detector) with another
139 * {@link FrameDecoder} or {@link ReplayingDecoder} (actual protocol decoder).
140 * It is not possible to achieve this simply by calling
141 * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but
142 * some additional steps are required:
143 * <pre>
144 * public class FirstDecoder extends {@link FrameDecoder} {
145 *
146 * public FirstDecoder() {
147 * super(true); // Enable unfold
148 * }
149 *
150 * {@code @Override}
151 * protected Object decode({@link ChannelHandlerContext} ctx,
152 * {@link Channel} channel,
153 * {@link ChannelBuffer} buf) {
154 * ...
155 * // Decode the first message
156 * Object firstMessage = ...;
157 *
158 * // Add the second decoder
159 * ctx.getPipeline().addLast("second", new SecondDecoder());
160 *
161 * // Remove the first decoder (me)
162 * ctx.getPipeline().remove(this);
163 *
164 * if (buf.readable()) {
165 * // Hand off the remaining data to the second decoder
166 * return new Object[] { firstMessage, buf.readBytes(buf.readableBytes()) };
167 * } else {
168 * // Nothing to hand off
169 * return firstMessage;
170 * }
171 * }
172 * }
173 * </pre>
174 *
175 * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
176 * @author <a href="http://gleamynode.net/">Trustin Lee</a>
177 *
178 * @version $Rev:231 $, $Date:2008-06-12 16:44:50 +0900 (목, 12 6월 2008) $
179 *
180 * @apiviz.landmark
181 */
182 public abstract class FrameDecoder extends SimpleChannelUpstreamHandler {
183
184 private final boolean unfold;
185 private ChannelBuffer cumulation;
186
187 protected FrameDecoder() {
188 this(false);
189 }
190
191 protected FrameDecoder(boolean unfold) {
192 this.unfold = unfold;
193 }
194
195 @Override
196 public void messageReceived(
197 ChannelHandlerContext ctx, MessageEvent e) throws Exception {
198
199 Object m = e.getMessage();
200 if (!(m instanceof ChannelBuffer)) {
201 ctx.sendUpstream(e);
202 return;
203 }
204
205 ChannelBuffer input = (ChannelBuffer) m;
206 if (!input.readable()) {
207 return;
208 }
209
210 ChannelBuffer cumulation = cumulation(ctx);
211 if (cumulation.readable()) {
212 cumulation.discardReadBytes();
213 cumulation.writeBytes(input);
214 callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
215 } else {
216 callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
217 if (input.readable()) {
218 cumulation.writeBytes(input);
219 }
220 }
221 }
222
223 @Override
224 public void channelDisconnected(
225 ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
226 cleanup(ctx, e);
227 }
228
229 @Override
230 public void channelClosed(
231 ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
232 cleanup(ctx, e);
233 }
234
235 @Override
236 public void exceptionCaught(
237 ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
238 ctx.sendUpstream(e);
239 }
240
241 /**
242 * Decodes the received packets so far into a frame.
243 *
244 * @param ctx the context of this handler
245 * @param channel the current channel
246 * @param buffer the cumulative buffer of received packets so far.
247 * Note that the buffer might be empty, which means you
248 * should not make an assumption that the buffer contains
249 * at least one byte in your decoder implementation.
250 *
251 * @return the decoded frame if a full frame was received and decoded.
252 * {@code null} if there's not enough data in the buffer to decode a frame.
253 */
254 protected abstract Object decode(
255 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception;
256
257 /**
258 * Decodes the received data so far into a frame when the channel is
259 * disconnected.
260 *
261 * @param ctx the context of this handler
262 * @param channel the current channel
263 * @param buffer the cumulative buffer of received packets so far.
264 * Note that the buffer might be empty, which means you
265 * should not make an assumption that the buffer contains
266 * at least one byte in your decoder implementation.
267 *
268 * @return the decoded frame if a full frame was received and decoded.
269 * {@code null} if there's not enough data in the buffer to decode a frame.
270 */
271 protected Object decodeLast(
272 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
273 return decode(ctx, channel, buffer);
274 }
275
276 private void callDecode(
277 ChannelHandlerContext context, Channel channel,
278 ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception {
279
280 while (cumulation.readable()) {
281 int oldReaderIndex = cumulation.readerIndex();
282 Object frame = decode(context, channel, cumulation);
283 if (frame == null) {
284 if (oldReaderIndex == cumulation.readerIndex()) {
285 // Seems like more data is required.
286 // Let us wait for the next notification.
287 break;
288 } else {
289 // Previous data has been discarded.
290 // Probably it is reading on.
291 continue;
292 }
293 } else if (oldReaderIndex == cumulation.readerIndex()) {
294 throw new IllegalStateException(
295 "decode() method must read at least one byte " +
296 "if it returned a frame (caused by: " + getClass() + ")");
297 }
298
299 unfoldAndFireMessageReceived(context, remoteAddress, frame);
300 }
301 }
302
303 private void unfoldAndFireMessageReceived(ChannelHandlerContext context, SocketAddress remoteAddress, Object result) {
304 if (unfold) {
305 if (result instanceof Object[]) {
306 for (Object r: (Object[]) result) {
307 Channels.fireMessageReceived(context, r, remoteAddress);
308 }
309 } else if (result instanceof Iterable<?>) {
310 for (Object r: (Iterable<?>) result) {
311 Channels.fireMessageReceived(context, r, remoteAddress);
312 }
313 } else {
314 Channels.fireMessageReceived(context, result, remoteAddress);
315 }
316 } else {
317 Channels.fireMessageReceived(context, result, remoteAddress);
318 }
319 }
320
321 private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
322 throws Exception {
323 try {
324 ChannelBuffer cumulation = this.cumulation;
325 if (cumulation == null) {
326 return;
327 } else {
328 this.cumulation = null;
329 }
330
331 if (cumulation.readable()) {
332 // Make sure all frames are read before notifying a closed channel.
333 callDecode(ctx, ctx.getChannel(), cumulation, null);
334 }
335
336 // Call decodeLast() finally. Please note that decodeLast() is
337 // called even if there's nothing more to read from the buffer to
338 // notify a user that the connection was closed explicitly.
339 Object partialFrame = decodeLast(ctx, ctx.getChannel(), cumulation);
340 if (partialFrame != null) {
341 unfoldAndFireMessageReceived(ctx, null, partialFrame);
342 }
343 } finally {
344 ctx.sendUpstream(e);
345 }
346 }
347
348 private ChannelBuffer cumulation(ChannelHandlerContext ctx) {
349 ChannelBuffer c = cumulation;
350 if (c == null) {
351 c = ChannelBuffers.dynamicBuffer(
352 ctx.getChannel().getConfig().getBufferFactory());
353 cumulation = c;
354 }
355 return c;
356 }
357 }