1 /* 2 * Copyright 2009 Red Hat, Inc. 3 * 4 * Red Hat licenses this file to you under the Apache License, version 2.0 5 * (the "License"); you may not use this file except in compliance with the 6 * License. You may obtain a copy of the License at: 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations 14 * under the License. 15 */ 16 package org.jboss.netty.handler.codec.frame; 17 18 import java.net.SocketAddress; 19 20 import org.jboss.netty.buffer.ChannelBuffer; 21 import org.jboss.netty.buffer.ChannelBuffers; 22 import org.jboss.netty.channel.Channel; 23 import org.jboss.netty.channel.ChannelHandler; 24 import org.jboss.netty.channel.ChannelHandlerContext; 25 import org.jboss.netty.channel.ChannelPipeline; 26 import org.jboss.netty.channel.ChannelStateEvent; 27 import org.jboss.netty.channel.ChannelUpstreamHandler; 28 import org.jboss.netty.channel.Channels; 29 import org.jboss.netty.channel.ExceptionEvent; 30 import org.jboss.netty.channel.MessageEvent; 31 import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 32 import org.jboss.netty.handler.codec.replay.ReplayingDecoder; 33 34 /** 35 * Decodes the received {@link ChannelBuffer}s into a meaningful frame object. 36 * <p> 37 * In a stream-based transport such as TCP/IP, packets can be fragmented and 38 * reassembled during transmission even in a LAN environment. For example, 39 * let us assume you have received three packets: 40 * <pre> 41 * +-----+-----+-----+ 42 * | ABC | DEF | GHI | 43 * +-----+-----+-----+ 44 * </pre> 45 * because of the packet fragmentation, a server can receive them like the 46 * following: 47 * <pre> 48 * +----+-------+---+---+ 49 * | AB | CDEFG | H | I | 50 * +----+-------+---+---+ 51 * </pre> 52 * <p> 53 * {@link FrameDecoder} helps you defrag the received packets into one or more 54 * meaningful <strong>frames</strong> that could be easily understood by the 55 * application logic. In case of the example above, your {@link FrameDecoder} 56 * implementation could defrag the received packets like the following: 57 * <pre> 58 * +-----+-----+-----+ 59 * | ABC | DEF | GHI | 60 * +-----+-----+-----+ 61 * </pre> 62 * <p> 63 * The following code shows an example handler which decodes a frame whose 64 * first 4 bytes header represents the length of the frame, excluding the 65 * header. 66 * <pre> 67 * MESSAGE FORMAT 68 * ============== 69 * 70 * Offset: 0 4 (Length + 4) 71 * +--------+------------------------+ 72 * Fields: | Length | Actual message content | 73 * +--------+------------------------+ 74 * 75 * DECODER IMPLEMENTATION 76 * ====================== 77 * 78 * public class IntegerHeaderFrameDecoder extends {@link FrameDecoder} { 79 * 80 * {@code @Override} 81 * protected Object decode({@link ChannelHandlerContext} ctx, 82 * {@link Channel channel}, 83 * {@link ChannelBuffer} buf) throws Exception { 84 * 85 * // Make sure if the length field was received. 86 * if (buf.readableBytes() < 4) { 87 * // The length field was not received yet - return null. 88 * // This method will be invoked again when more packets are 89 * // received and appended to the buffer. 90 * return <strong>null</strong>; 91 * } 92 * 93 * // The length field is in the buffer. 94 * 95 * // Mark the current buffer position before reading the length field 96 * // because the whole frame might not be in the buffer yet. 97 * // We will reset the buffer position to the marked position if 98 * // there's not enough bytes in the buffer. 99 * buf.markReaderIndex(); 100 * 101 * // Read the length field. 102 * int length = buf.readInt(); 103 * 104 * // Make sure if there's enough bytes in the buffer. 105 * if (buf.readableBytes() < length) { 106 * // The whole bytes were not received yet - return null. 107 * // This method will be invoked again when more packets are 108 * // received and appended to the buffer. 109 * 110 * // Reset to the marked position to read the length field again 111 * // next time. 112 * buf.resetReaderIndex(); 113 * 114 * return <strong>null</strong>; 115 * } 116 * 117 * // There's enough bytes in the buffer. Read it. 118 * {@link ChannelBuffer} frame = buf.readBytes(length); 119 * 120 * // Successfully decoded a frame. Return the decoded frame. 121 * return <strong>frame</strong>; 122 * } 123 * } 124 * </pre> 125 * 126 * <h3>Returning a POJO rather than a {@link ChannelBuffer}</h3> 127 * <p> 128 * Please note that you can return an object of a different type than 129 * {@link ChannelBuffer} in your {@code decode()} and {@code decodeLast()} 130 * implementation. For example, you could return a 131 * <a href="http://en.wikipedia.org/wiki/POJO">POJO</a> so that the next 132 * {@link ChannelUpstreamHandler} receives a {@link MessageEvent} which 133 * contains a POJO rather than a {@link ChannelBuffer}. 134 * 135 * <h3>Replacing a decoder with another decoder in a pipeline</h3> 136 * <p> 137 * If you are going to write a protocol multiplexer, you will probably want to 138 * replace a {@link FrameDecoder} (protocol detector) with another 139 * {@link FrameDecoder} or {@link ReplayingDecoder} (actual protocol decoder). 140 * It is not possible to achieve this simply by calling 141 * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but 142 * some additional steps are required: 143 * <pre> 144 * public class FirstDecoder extends {@link FrameDecoder} { 145 * 146 * public FirstDecoder() { 147 * super(true); // Enable unfold 148 * } 149 * 150 * {@code @Override} 151 * protected Object decode({@link ChannelHandlerContext} ctx, 152 * {@link Channel} channel, 153 * {@link ChannelBuffer} buf) { 154 * ... 155 * // Decode the first message 156 * Object firstMessage = ...; 157 * 158 * // Add the second decoder 159 * ctx.getPipeline().addLast("second", new SecondDecoder()); 160 * 161 * // Remove the first decoder (me) 162 * ctx.getPipeline().remove(this); 163 * 164 * if (buf.readable()) { 165 * // Hand off the remaining data to the second decoder 166 * return new Object[] { firstMessage, buf.readBytes(buf.readableBytes()) }; 167 * } else { 168 * // Nothing to hand off 169 * return firstMessage; 170 * } 171 * } 172 * } 173 * </pre> 174 * 175 * @author <a href="http://www.jboss.org/netty/">The Netty Project</a> 176 * @author <a href="http://gleamynode.net/">Trustin Lee</a> 177 * 178 * @version $Rev:231 $, $Date:2008-06-12 16:44:50 +0900 (목, 12 6월 2008) $ 179 * 180 * @apiviz.landmark 181 */ 182 public abstract class FrameDecoder extends SimpleChannelUpstreamHandler { 183 184 private final boolean unfold; 185 private ChannelBuffer cumulation; 186 187 protected FrameDecoder() { 188 this(false); 189 } 190 191 protected FrameDecoder(boolean unfold) { 192 this.unfold = unfold; 193 } 194 195 @Override 196 public void messageReceived( 197 ChannelHandlerContext ctx, MessageEvent e) throws Exception { 198 199 Object m = e.getMessage(); 200 if (!(m instanceof ChannelBuffer)) { 201 ctx.sendUpstream(e); 202 return; 203 } 204 205 ChannelBuffer input = (ChannelBuffer) m; 206 if (!input.readable()) { 207 return; 208 } 209 210 ChannelBuffer cumulation = cumulation(ctx); 211 if (cumulation.readable()) { 212 cumulation.discardReadBytes(); 213 cumulation.writeBytes(input); 214 callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress()); 215 } else { 216 callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); 217 if (input.readable()) { 218 cumulation.writeBytes(input); 219 } 220 } 221 } 222 223 @Override 224 public void channelDisconnected( 225 ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 226 cleanup(ctx, e); 227 } 228 229 @Override 230 public void channelClosed( 231 ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 232 cleanup(ctx, e); 233 } 234 235 @Override 236 public void exceptionCaught( 237 ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 238 ctx.sendUpstream(e); 239 } 240 241 /** 242 * Decodes the received packets so far into a frame. 243 * 244 * @param ctx the context of this handler 245 * @param channel the current channel 246 * @param buffer the cumulative buffer of received packets so far. 247 * Note that the buffer might be empty, which means you 248 * should not make an assumption that the buffer contains 249 * at least one byte in your decoder implementation. 250 * 251 * @return the decoded frame if a full frame was received and decoded. 252 * {@code null} if there's not enough data in the buffer to decode a frame. 253 */ 254 protected abstract Object decode( 255 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception; 256 257 /** 258 * Decodes the received data so far into a frame when the channel is 259 * disconnected. 260 * 261 * @param ctx the context of this handler 262 * @param channel the current channel 263 * @param buffer the cumulative buffer of received packets so far. 264 * Note that the buffer might be empty, which means you 265 * should not make an assumption that the buffer contains 266 * at least one byte in your decoder implementation. 267 * 268 * @return the decoded frame if a full frame was received and decoded. 269 * {@code null} if there's not enough data in the buffer to decode a frame. 270 */ 271 protected Object decodeLast( 272 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { 273 return decode(ctx, channel, buffer); 274 } 275 276 private void callDecode( 277 ChannelHandlerContext context, Channel channel, 278 ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception { 279 280 while (cumulation.readable()) { 281 int oldReaderIndex = cumulation.readerIndex(); 282 Object frame = decode(context, channel, cumulation); 283 if (frame == null) { 284 if (oldReaderIndex == cumulation.readerIndex()) { 285 // Seems like more data is required. 286 // Let us wait for the next notification. 287 break; 288 } else { 289 // Previous data has been discarded. 290 // Probably it is reading on. 291 continue; 292 } 293 } else if (oldReaderIndex == cumulation.readerIndex()) { 294 throw new IllegalStateException( 295 "decode() method must read at least one byte " + 296 "if it returned a frame (caused by: " + getClass() + ")"); 297 } 298 299 unfoldAndFireMessageReceived(context, remoteAddress, frame); 300 } 301 } 302 303 private void unfoldAndFireMessageReceived(ChannelHandlerContext context, SocketAddress remoteAddress, Object result) { 304 if (unfold) { 305 if (result instanceof Object[]) { 306 for (Object r: (Object[]) result) { 307 Channels.fireMessageReceived(context, r, remoteAddress); 308 } 309 } else if (result instanceof Iterable<?>) { 310 for (Object r: (Iterable<?>) result) { 311 Channels.fireMessageReceived(context, r, remoteAddress); 312 } 313 } else { 314 Channels.fireMessageReceived(context, result, remoteAddress); 315 } 316 } else { 317 Channels.fireMessageReceived(context, result, remoteAddress); 318 } 319 } 320 321 private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) 322 throws Exception { 323 try { 324 ChannelBuffer cumulation = this.cumulation; 325 if (cumulation == null) { 326 return; 327 } else { 328 this.cumulation = null; 329 } 330 331 if (cumulation.readable()) { 332 // Make sure all frames are read before notifying a closed channel. 333 callDecode(ctx, ctx.getChannel(), cumulation, null); 334 } 335 336 // Call decodeLast() finally. Please note that decodeLast() is 337 // called even if there's nothing more to read from the buffer to 338 // notify a user that the connection was closed explicitly. 339 Object partialFrame = decodeLast(ctx, ctx.getChannel(), cumulation); 340 if (partialFrame != null) { 341 unfoldAndFireMessageReceived(ctx, null, partialFrame); 342 } 343 } finally { 344 ctx.sendUpstream(e); 345 } 346 } 347 348 private ChannelBuffer cumulation(ChannelHandlerContext ctx) { 349 ChannelBuffer c = cumulation; 350 if (c == null) { 351 c = ChannelBuffers.dynamicBuffer( 352 ctx.getChannel().getConfig().getBufferFactory()); 353 cumulation = c; 354 } 355 return c; 356 } 357 }