1 /* 2 * Copyright 2009 Red Hat, Inc. 3 * 4 * Red Hat licenses this file to you under the Apache License, version 2.0 5 * (the "License"); you may not use this file except in compliance with the 6 * License. You may obtain a copy of the License at: 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations 14 * under the License. 15 */ 16 package org.jboss.netty.handler.codec.replay; 17 18 import java.net.SocketAddress; 19 import java.util.concurrent.atomic.AtomicReference; 20 21 import org.jboss.netty.buffer.ChannelBuffer; 22 import org.jboss.netty.buffer.ChannelBufferFactory; 23 import org.jboss.netty.buffer.ChannelBuffers; 24 import org.jboss.netty.channel.Channel; 25 import org.jboss.netty.channel.ChannelHandler; 26 import org.jboss.netty.channel.ChannelHandlerContext; 27 import org.jboss.netty.channel.ChannelPipeline; 28 import org.jboss.netty.channel.ChannelStateEvent; 29 import org.jboss.netty.channel.Channels; 30 import org.jboss.netty.channel.ExceptionEvent; 31 import org.jboss.netty.channel.MessageEvent; 32 import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 33 import org.jboss.netty.handler.codec.frame.FrameDecoder; 34 35 /** 36 * A specialized variation of {@link FrameDecoder} which enables implementation 37 * of a non-blocking decoder in the blocking I/O paradigm. 38 * <p> 39 * The biggest difference between {@link ReplayingDecoder} and 40 * {@link FrameDecoder} is that {@link ReplayingDecoder} allows you to 41 * implement the {@code decode()} and {@code decodeLast()} methods just like 42 * all required bytes were received already, rather than checking the 43 * availability of the required bytes. For example, the following 44 * {@link FrameDecoder} implementation: 45 * <pre> 46 * public class IntegerHeaderFrameDecoder extends {@link FrameDecoder} { 47 * 48 * {@code @Override} 49 * protected Object decode({@link ChannelHandlerContext} ctx, 50 * {@link Channel} channel, 51 * {@link ChannelBuffer} buf) throws Exception { 52 * 53 * if (buf.readableBytes() < 4) { 54 * return <strong>null</strong>; 55 * } 56 * 57 * buf.markReaderIndex(); 58 * int length = buf.readInt(); 59 * 60 * if (buf.readableBytes() < length) { 61 * buf.resetReaderIndex(); 62 * return <strong>null</strong>; 63 * } 64 * 65 * return buf.readBytes(length); 66 * } 67 * } 68 * </pre> 69 * is simplified like the following with {@link ReplayingDecoder}: 70 * <pre> 71 * public class IntegerHeaderFrameDecoder 72 * extends {@link ReplayingDecoder}<{@link VoidEnum}> { 73 * 74 * protected Object decode({@link ChannelHandlerContext} ctx, 75 * {@link Channel} channel, 76 * {@link ChannelBuffer} buf, 77 * {@link VoidEnum} state) throws Exception { 78 * 79 * return buf.readBytes(buf.readInt()); 80 * } 81 * } 82 * </pre> 83 * 84 * <h3>How does this work?</h3> 85 * <p> 86 * {@link ReplayingDecoder} passes a specialized {@link ChannelBuffer} 87 * implementation which throws an {@link Error} of certain type when there's not 88 * enough data in the buffer. In the {@code IntegerHeaderFrameDecoder} above, 89 * you just assumed that there will be 4 or more bytes in the buffer when 90 * you call {@code buf.readInt()}. If there's really 4 bytes in the buffer, 91 * it will return the integer header as you expected. Otherwise, the 92 * {@link Error} will be raised and the control will be returned to 93 * {@link ReplayingDecoder}. If {@link ReplayingDecoder} catches the 94 * {@link Error}, then it will rewind the {@code readerIndex} of the buffer 95 * back to the 'initial' position (i.e. the beginning of the buffer) and call 96 * the {@code decode(..)} method again when more data is received into the 97 * buffer. 98 * <p> 99 * Please note that {@link ReplayingDecoder} always throws the same cached 100 * {@link Error} instance to avoid the overhead of creating a new {@link Error} 101 * and filling its stack trace for every throw. 102 * 103 * <h3>Limitations</h3> 104 * <p> 105 * At the cost of the simplicity, {@link ReplayingDecoder} enforces you a few 106 * limitations: 107 * <ul> 108 * <li>Some buffer operations are prohibited.</li> 109 * <li>Performance can be worse if the network is slow and the message 110 * format is complicated unlike the example above. In this case, your 111 * decoder might have to decode the same part of the message over and over 112 * again.</li> 113 * <li>You must keep in mind that {@code decode(..)} method can be called many 114 * times to decode a single message. For example, the following code will 115 * not work: 116 * <pre> public class MyDecoder extends {@link ReplayingDecoder}<{@link VoidEnum}> { 117 * 118 * private final Queue<Integer> values = new LinkedList<Integer>(); 119 * 120 * {@code @Override} 121 * public Object decode(.., {@link ChannelBuffer} buffer, ..) throws Exception { 122 * 123 * // A message contains 2 integers. 124 * values.offer(buffer.readInt()); 125 * values.offer(buffer.readInt()); 126 * 127 * // This assertion will fail intermittently since values.offer() 128 * // can be called more than two times! 129 * assert values.size() == 2; 130 * return values.poll() + values.poll(); 131 * } 132 * }</pre> 133 * The correct implementation looks like the following, and you can also 134 * utilize the 'checkpoint' feature which is explained in detail in the 135 * next section. 136 * <pre> public class MyDecoder extends {@link ReplayingDecoder}<{@link VoidEnum}> { 137 * 138 * private final Queue<Integer> values = new LinkedList<Integer>(); 139 * 140 * {@code @Override} 141 * public Object decode(.., {@link ChannelBuffer} buffer, ..) throws Exception { 142 * 143 * // Revert the state of the variable that might have been changed 144 * // since the last partial decode. 145 * values.clear(); 146 * 147 * // A message contains 2 integers. 148 * values.offer(buffer.readInt()); 149 * values.offer(buffer.readInt()); 150 * 151 * // Now we know this assertion will never fail. 152 * assert values.size() == 2; 153 * return values.poll() + values.poll(); 154 * } 155 * }</pre> 156 * </li> 157 * </ul> 158 * 159 * <h3>Improving the performance</h3> 160 * <p> 161 * Fortunately, the performance of a complex decoder implementation can be 162 * improved significantly with the {@code checkpoint()} method. The 163 * {@code checkpoint()} method updates the 'initial' position of the buffer so 164 * that {@link ReplayingDecoder} rewinds the {@code readerIndex} of the buffer 165 * to the last position where you called the {@code checkpoint()} method. 166 * 167 * <h4>Calling {@code checkpoint(T)} with an {@link Enum}</h4> 168 * <p> 169 * Although you can just use {@code checkpoint()} method and manage the state 170 * of the decoder by yourself, the easiest way to manage the state of the 171 * decoder is to create an {@link Enum} type which represents the current state 172 * of the decoder and to call {@code checkpoint(T)} method whenever the state 173 * changes. You can have as many states as you want depending on the 174 * complexity of the message you want to decode: 175 * 176 * <pre> 177 * public enum MyDecoderState { 178 * READ_LENGTH, 179 * READ_CONTENT; 180 * } 181 * 182 * public class IntegerHeaderFrameDecoder 183 * extends {@link ReplayingDecoder}<<strong>MyDecoderState</strong>> { 184 * 185 * private int length; 186 * 187 * public IntegerHeaderFrameDecoder() { 188 * // Set the initial state. 189 * <strong>super(MyDecoderState.READ_LENGTH);</strong> 190 * } 191 * 192 * {@code @Override} 193 * protected Object decode({@link ChannelHandlerContext} ctx, 194 * {@link Channel} channel, 195 * {@link ChannelBuffer} buf, 196 * <b>MyDecoderState</b> state) throws Exception { 197 * switch (state) { 198 * case READ_LENGTH: 199 * length = buf.readInt(); 200 * <strong>checkpoint(MyDecoderState.READ_CONTENT);</strong> 201 * case READ_CONTENT: 202 * ChannelBuffer frame = buf.readBytes(length); 203 * <strong>checkpoint(MyDecoderState.READ_LENGTH);</strong> 204 * return frame; 205 * default: 206 * throw new Error("Shouldn't reach here."); 207 * } 208 * } 209 * } 210 * </pre> 211 * 212 * <h4>Calling {@code checkpoint()} with no parameter</h4> 213 * <p> 214 * An alternative way to manage the decoder state is to manage it by yourself. 215 * <pre> 216 * public class IntegerHeaderFrameDecoder 217 * extends {@link ReplayingDecoder}<<strong>{@link VoidEnum}</strong>> { 218 * 219 * <strong>private boolean readLength;</strong> 220 * private int length; 221 * 222 * {@code @Override} 223 * protected Object decode({@link ChannelHandlerContext} ctx, 224 * {@link Channel} channel, 225 * {@link ChannelBuffer} buf, 226 * {@link VoidEnum} state) throws Exception { 227 * if (!readLength) { 228 * length = buf.readInt(); 229 * <strong>readLength = true;</strong> 230 * <strong>checkpoint();</strong> 231 * } 232 * 233 * if (readLength) { 234 * ChannelBuffer frame = buf.readBytes(length); 235 * <strong>readLength = false;</strong> 236 * <strong>checkpoint();</strong> 237 * return frame; 238 * } 239 * } 240 * } 241 * </pre> 242 * 243 * <h3>Replacing a decoder with another decoder in a pipeline</h3> 244 * <p> 245 * If you are going to write a protocol multiplexer, you will probably want to 246 * replace a {@link ReplayingDecoder} (protocol detector) with another 247 * {@link ReplayingDecoder} or {@link FrameDecoder} (actual protocol decoder). 248 * It is not possible to achieve this simply by calling 249 * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but 250 * some additional steps are required: 251 * <pre> 252 * public class FirstDecoder extends {@link ReplayingDecoder}<{@link VoidEnum}> { 253 * 254 * public FirstDecoder() { 255 * super(true); // Enable unfold 256 * } 257 * 258 * {@code @Override} 259 * protected Object decode({@link ChannelHandlerContext} ctx, 260 * {@link Channel} ch, 261 * {@link ChannelBuffer} buf, 262 * {@link VoidEnum} state) { 263 * ... 264 * // Decode the first message 265 * Object firstMessage = ...; 266 * 267 * // Add the second decoder 268 * ctx.getPipeline().addLast("second", new SecondDecoder()); 269 * 270 * // Remove the first decoder (me) 271 * ctx.getPipeline().remove(this); 272 * 273 * if (buf.readable()) { 274 * // Hand off the remaining data to the second decoder 275 * return new Object[] { firstMessage, buf.readBytes(<b>super.actualReadableBytes()</b>) }; 276 * } else { 277 * // Nothing to hand off 278 * return firstMessage; 279 * } 280 * } 281 * </pre> 282 * 283 * @author <a href="http://www.jboss.org/netty/">The Netty Project</a> 284 * @author <a href="http://gleamynode.net/">Trustin Lee</a> 285 * 286 * @version $Rev: 2380 $, $Date: 2010-11-09 14:35:24 +0900 (Tue, 09 Nov 2010) $ 287 * 288 * @param <T> 289 * the state type; use {@link VoidEnum} if state management is unused 290 * 291 * @apiviz.landmark 292 * @apiviz.has org.jboss.netty.handler.codec.replay.UnreplayableOperationException oneway - - throws 293 */ 294 public abstract class ReplayingDecoder<T extends Enum<T>> 295 extends SimpleChannelUpstreamHandler { 296 297 298 private final AtomicReference<ChannelBuffer> cumulation = 299 new AtomicReference<ChannelBuffer>(); 300 private final boolean unfold; 301 private ReplayingDecoderBuffer replayable; 302 private T state; 303 private int checkpoint; 304 305 /** 306 * Creates a new instance with no initial state (i.e: {@code null}). 307 */ 308 protected ReplayingDecoder() { 309 this(null); 310 } 311 312 protected ReplayingDecoder(boolean unfold) { 313 this(null, unfold); 314 } 315 316 /** 317 * Creates a new instance with the specified initial state. 318 */ 319 protected ReplayingDecoder(T initialState) { 320 this(initialState, false); 321 } 322 323 protected ReplayingDecoder(T initialState, boolean unfold) { 324 this.state = initialState; 325 this.unfold = unfold; 326 } 327 328 /** 329 * Stores the internal cumulative buffer's reader position. 330 */ 331 protected void checkpoint() { 332 ChannelBuffer cumulation = this.cumulation.get(); 333 if (cumulation != null) { 334 checkpoint = cumulation.readerIndex(); 335 } else { 336 checkpoint = -1; // buffer not available (already cleaned up) 337 } 338 } 339 340 /** 341 * Stores the internal cumulative buffer's reader position and updates 342 * the current decoder state. 343 */ 344 protected void checkpoint(T state) { 345 checkpoint(); 346 setState(state); 347 } 348 349 /** 350 * Returns the current state of this decoder. 351 * @return the current state of this decoder 352 */ 353 protected T getState() { 354 return state; 355 } 356 357 /** 358 * Sets the current state of this decoder. 359 * @return the old state of this decoder 360 */ 361 protected T setState(T newState) { 362 T oldState = state; 363 state = newState; 364 return oldState; 365 } 366 367 /** 368 * Returns the actual number of readable bytes in the internal cumulative 369 * buffer of this decoder. You usually do not need to rely on this value 370 * to write a decoder. Use it only when you muse use it at your own risk. 371 * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}. 372 */ 373 protected int actualReadableBytes() { 374 return internalBuffer().readableBytes(); 375 } 376 377 /** 378 * Returns the internal cumulative buffer of this decoder. You usually 379 * do not need to access the internal buffer directly to write a decoder. 380 * Use it only when you must use it at your own risk. 381 */ 382 protected ChannelBuffer internalBuffer() { 383 ChannelBuffer buf = cumulation.get(); 384 if (buf == null) { 385 return ChannelBuffers.EMPTY_BUFFER; 386 } 387 return buf; 388 } 389 390 /** 391 * Decodes the received packets so far into a frame. 392 * 393 * @param ctx the context of this handler 394 * @param channel the current channel 395 * @param buffer the cumulative buffer of received packets so far. 396 * Note that the buffer might be empty, which means you 397 * should not make an assumption that the buffer contains 398 * at least one byte in your decoder implementation. 399 * @param state the current decoder state ({@code null} if unused) 400 * 401 * @return the decoded frame 402 */ 403 protected abstract Object decode(ChannelHandlerContext ctx, 404 Channel channel, ChannelBuffer buffer, T state) throws Exception; 405 406 /** 407 * Decodes the received data so far into a frame when the channel is 408 * disconnected. 409 * 410 * @param ctx the context of this handler 411 * @param channel the current channel 412 * @param buffer the cumulative buffer of received packets so far. 413 * Note that the buffer might be empty, which means you 414 * should not make an assumption that the buffer contains 415 * at least one byte in your decoder implementation. 416 * @param state the current decoder state ({@code null} if unused) 417 * 418 * @return the decoded frame 419 */ 420 protected Object decodeLast( 421 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state) throws Exception { 422 return decode(ctx, channel, buffer, state); 423 } 424 425 @Override 426 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) 427 throws Exception { 428 429 Object m = e.getMessage(); 430 if (!(m instanceof ChannelBuffer)) { 431 ctx.sendUpstream(e); 432 return; 433 } 434 435 ChannelBuffer input = (ChannelBuffer) m; 436 if (!input.readable()) { 437 return; 438 } 439 440 ChannelBuffer cumulation = cumulation(ctx); 441 cumulation.discardReadBytes(); 442 cumulation.writeBytes(input); 443 callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress()); 444 } 445 446 @Override 447 public void channelDisconnected(ChannelHandlerContext ctx, 448 ChannelStateEvent e) throws Exception { 449 cleanup(ctx, e); 450 } 451 452 @Override 453 public void channelClosed(ChannelHandlerContext ctx, 454 ChannelStateEvent e) throws Exception { 455 cleanup(ctx, e); 456 } 457 458 @Override 459 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) 460 throws Exception { 461 ctx.sendUpstream(e); 462 } 463 464 private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception { 465 while (cumulation.readable()) { 466 int oldReaderIndex = checkpoint = cumulation.readerIndex(); 467 Object result = null; 468 T oldState = state; 469 try { 470 result = decode(context, channel, replayable, state); 471 if (result == null) { 472 if (oldReaderIndex == cumulation.readerIndex() && oldState == state) { 473 throw new IllegalStateException( 474 "null cannot be returned if no data is consumed and state didn't change."); 475 } else { 476 // Previous data has been discarded or caused state transition. 477 // Probably it is reading on. 478 continue; 479 } 480 } 481 } catch (ReplayError replay) { 482 // Return to the checkpoint (or oldPosition) and retry. 483 int checkpoint = this.checkpoint; 484 if (checkpoint >= 0) { 485 cumulation.readerIndex(checkpoint); 486 } else { 487 // Called by cleanup() - no need to maintain the readerIndex 488 // anymore because the buffer has been released already. 489 } 490 } 491 492 if (result == null) { 493 // Seems like more data is required. 494 // Let us wait for the next notification. 495 break; 496 } 497 498 if (oldReaderIndex == cumulation.readerIndex() && oldState == state) { 499 throw new IllegalStateException( 500 "decode() method must consume at least one byte " + 501 "if it returned a decoded message (caused by: " + 502 getClass() + ")"); 503 } 504 505 // A successful decode 506 unfoldAndFireMessageReceived(context, result, remoteAddress); 507 } 508 } 509 510 private void unfoldAndFireMessageReceived( 511 ChannelHandlerContext context, Object result, SocketAddress remoteAddress) { 512 if (unfold) { 513 if (result instanceof Object[]) { 514 for (Object r: (Object[]) result) { 515 Channels.fireMessageReceived(context, r, remoteAddress); 516 } 517 } else if (result instanceof Iterable<?>) { 518 for (Object r: (Iterable<?>) result) { 519 Channels.fireMessageReceived(context, r, remoteAddress); 520 } 521 } else { 522 Channels.fireMessageReceived(context, result, remoteAddress); 523 } 524 } else { 525 Channels.fireMessageReceived(context, result, remoteAddress); 526 } 527 } 528 529 private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) 530 throws Exception { 531 try { 532 ChannelBuffer cumulation = this.cumulation.getAndSet(null); 533 if (cumulation == null) { 534 return; 535 } 536 537 replayable.terminate(); 538 539 if (cumulation.readable()) { 540 // Make sure all data was read before notifying a closed channel. 541 callDecode(ctx, e.getChannel(), cumulation, null); 542 } 543 544 // Call decodeLast() finally. Please note that decodeLast() is 545 // called even if there's nothing more to read from the buffer to 546 // notify a user that the connection was closed explicitly. 547 Object partiallyDecoded = decodeLast(ctx, e.getChannel(), replayable, state); 548 if (partiallyDecoded != null) { 549 unfoldAndFireMessageReceived(ctx, partiallyDecoded, null); 550 } 551 } catch (ReplayError replay) { 552 // Ignore 553 } finally { 554 ctx.sendUpstream(e); 555 } 556 } 557 558 private ChannelBuffer cumulation(ChannelHandlerContext ctx) { 559 ChannelBuffer buf = cumulation.get(); 560 if (buf == null) { 561 ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory(); 562 buf = new UnsafeDynamicChannelBuffer(factory); 563 if (cumulation.compareAndSet(null, buf)) { 564 replayable = new ReplayingDecoderBuffer(buf); 565 } else { 566 buf = cumulation.get(); 567 } 568 } 569 return buf; 570 } 571 }