org.jboss.netty.handler.codec.replay
Class ReplayingDecoder<T extends Enum<T>>

java.lang.Object
  extended by org.jboss.netty.channel.SimpleChannelUpstreamHandler
      extended by org.jboss.netty.handler.codec.replay.ReplayingDecoder<T>
Type Parameters:
T - the state type; use VoidEnum if state management is unused
All Implemented Interfaces:
ChannelHandler, ChannelUpstreamHandler
Direct Known Subclasses:
CompatibleObjectDecoder, HttpMessageDecoder, WebSocketFrameDecoder

public abstract class ReplayingDecoder<T extends Enum<T>>
extends SimpleChannelUpstreamHandler

A specialized variation of FrameDecoder which enables implementation of a non-blocking decoder in the blocking I/O paradigm.

The biggest difference between ReplayingDecoder and FrameDecoder is that ReplayingDecoder allows you to implement the decode() and decodeLast() methods just like all required bytes were received already, rather than checking the availability of the required bytes. For example, the following FrameDecoder implementation:

 public class IntegerHeaderFrameDecoder extends FrameDecoder {

   @Override
   protected Object decode(ChannelHandlerContext ctx,
                           Channel channel,
                           ChannelBuffer buf) throws Exception {

     if (buf.readableBytes() < 4) {
        return null;
     }

     buf.markReaderIndex();
     int length = buf.readInt();

     if (buf.readableBytes() < length) {
        buf.resetReaderIndex();
        return null;
     }

     return buf.readBytes(length);
   }
 }
 
is simplified like the following with ReplayingDecoder:
 public class IntegerHeaderFrameDecoder
      extends ReplayingDecoder<VoidEnum> {

   protected Object decode(ChannelHandlerContext ctx,
                           Channel channel,
                           ChannelBuffer buf,
                           VoidEnum state) throws Exception {

     return buf.readBytes(buf.readInt());
   }
 }
 

How does this work?

ReplayingDecoder passes a specialized ChannelBuffer implementation which throws an Error of certain type when there's not enough data in the buffer. In the IntegerHeaderFrameDecoder above, you just assumed that there will be 4 or more bytes in the buffer when you call buf.readInt(). If there's really 4 bytes in the buffer, it will return the integer header as you expected. Otherwise, the Error will be raised and the control will be returned to ReplayingDecoder. If ReplayingDecoder catches the Error, then it will rewind the readerIndex of the buffer back to the 'initial' position (i.e. the beginning of the buffer) and call the decode(..) method again when more data is received into the buffer.

Please note that ReplayingDecoder always throws the same cached Error instance to avoid the overhead of creating a new Error and filling its stack trace for every throw.

Limitations

At the cost of the simplicity, ReplayingDecoder enforces you a few limitations:

Improving the performance

Fortunately, the performance of a complex decoder implementation can be improved significantly with the checkpoint() method. The checkpoint() method updates the 'initial' position of the buffer so that ReplayingDecoder rewinds the readerIndex of the buffer to the last position where you called the checkpoint() method.

Calling checkpoint(T) with an Enum

Although you can just use checkpoint() method and manage the state of the decoder by yourself, the easiest way to manage the state of the decoder is to create an Enum type which represents the current state of the decoder and to call checkpoint(T) method whenever the state changes. You can have as many states as you want depending on the complexity of the message you want to decode:

 public enum MyDecoderState {
   READ_LENGTH,
   READ_CONTENT;
 }

 public class IntegerHeaderFrameDecoder
      extends ReplayingDecoder<MyDecoderState> {

   private int length;

   public IntegerHeaderFrameDecoder() {
     // Set the initial state.
     super(MyDecoderState.READ_LENGTH);
   }

   @Override
   protected Object decode(ChannelHandlerContext ctx,
                           Channel channel,
                           ChannelBuffer buf,
                           MyDecoderState state) throws Exception {
     switch (state) {
     case READ_LENGTH:
       length = buf.readInt();
       checkpoint(MyDecoderState.READ_CONTENT);
     case READ_CONTENT:
       ChannelBuffer frame = buf.readBytes(length);
       checkpoint(MyDecoderState.READ_LENGTH);
       return frame;
     default:
       throw new Error("Shouldn't reach here.");
     }
   }
 }
 

Calling checkpoint() with no parameter

An alternative way to manage the decoder state is to manage it by yourself.

 public class IntegerHeaderFrameDecoder
      extends ReplayingDecoder<VoidEnum> {

   private boolean readLength;
   private int length;

   @Override
   protected Object decode(ChannelHandlerContext ctx,
                           Channel channel,
                           ChannelBuffer buf,
                           VoidEnum state) throws Exception {
     if (!readLength) {
       length = buf.readInt();
       readLength = true;
       checkpoint();
     }

     if (readLength) {
       ChannelBuffer frame = buf.readBytes(length);
       readLength = false;
       checkpoint();
       return frame;
     }
   }
 }
 

Replacing a decoder with another decoder in a pipeline

If you are going to write a protocol multiplexer, you will probably want to replace a ReplayingDecoder (protocol detector) with another ReplayingDecoder or FrameDecoder (actual protocol decoder). It is not possible to achieve this simply by calling ChannelPipeline.replace(ChannelHandler, String, ChannelHandler), but some additional steps are required:

 public class FirstDecoder extends ReplayingDecoder<VoidEnum> {

     public FirstDecoder() {
         super(true); // Enable unfold
     }

     @Override
     protected Object decode(ChannelHandlerContext ctx,
                             Channel ch,
                             ChannelBuffer buf,
                             VoidEnum state) {
         ...
         // Decode the first message
         Object firstMessage = ...;

         // Add the second decoder
         ctx.getPipeline().addLast("second", new SecondDecoder());

         // Remove the first decoder (me)
         ctx.getPipeline().remove(this);

         if (buf.readable()) {
             // Hand off the remaining data to the second decoder
             return new Object[] { firstMessage, buf.readBytes(super.actualReadableBytes()) };
         } else {
             // Nothing to hand off
             return firstMessage;
         }
     }
 

Version:
$Rev: 2380 $, $Date: 2010-11-09 14:35:24 +0900 (Tue, 09 Nov 2010) $
Author:
The Netty Project, Trustin Lee

Nested Class Summary
 
Nested classes/interfaces inherited from interface org.jboss.netty.channel.ChannelHandler
ChannelHandler.Sharable
 
Constructor Summary
protected ReplayingDecoder()
          Creates a new instance with no initial state (i.e: null).
protected ReplayingDecoder(boolean unfold)
           
protected ReplayingDecoder(T initialState)
          Creates a new instance with the specified initial state.
protected ReplayingDecoder(T initialState, boolean unfold)
           
 
Method Summary
protected  int actualReadableBytes()
          Returns the actual number of readable bytes in the internal cumulative buffer of this decoder.
 void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
          Invoked when a Channel was closed and all its related resources were released.
 void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
          Invoked when a Channel was disconnected from its remote peer.
protected  void checkpoint()
          Stores the internal cumulative buffer's reader position.
protected  void checkpoint(T state)
          Stores the internal cumulative buffer's reader position and updates the current decoder state.
protected abstract  Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state)
          Decodes the received packets so far into a frame.
protected  Object decodeLast(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state)
          Decodes the received data so far into a frame when the channel is disconnected.
 void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
          Invoked when an exception was raised by an I/O thread or a ChannelHandler.
protected  T getState()
          Returns the current state of this decoder.
protected  ChannelBuffer internalBuffer()
          Returns the internal cumulative buffer of this decoder.
 void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
          Invoked when a message object (e.g: ChannelBuffer) was received from a remote peer.
protected  T setState(T newState)
          Sets the current state of this decoder.
 
Methods inherited from class org.jboss.netty.channel.SimpleChannelUpstreamHandler
channelBound, channelConnected, channelInterestChanged, channelOpen, channelUnbound, childChannelClosed, childChannelOpen, handleUpstream, writeComplete
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

ReplayingDecoder

protected ReplayingDecoder()
Creates a new instance with no initial state (i.e: null).


ReplayingDecoder

protected ReplayingDecoder(boolean unfold)

ReplayingDecoder

protected ReplayingDecoder(T initialState)
Creates a new instance with the specified initial state.


ReplayingDecoder

protected ReplayingDecoder(T initialState,
                           boolean unfold)
Method Detail

checkpoint

protected void checkpoint()
Stores the internal cumulative buffer's reader position.


checkpoint

protected void checkpoint(T state)
Stores the internal cumulative buffer's reader position and updates the current decoder state.


getState

protected T getState()
Returns the current state of this decoder.

Returns:
the current state of this decoder

setState

protected T setState(T newState)
Sets the current state of this decoder.

Returns:
the old state of this decoder

actualReadableBytes

protected int actualReadableBytes()
Returns the actual number of readable bytes in the internal cumulative buffer of this decoder. You usually do not need to rely on this value to write a decoder. Use it only when you muse use it at your own risk. This method is a shortcut to internalBuffer().readableBytes().


internalBuffer

protected ChannelBuffer internalBuffer()
Returns the internal cumulative buffer of this decoder. You usually do not need to access the internal buffer directly to write a decoder. Use it only when you must use it at your own risk.


decode

protected abstract Object decode(ChannelHandlerContext ctx,
                                 Channel channel,
                                 ChannelBuffer buffer,
                                 T state)
                          throws Exception
Decodes the received packets so far into a frame.

Parameters:
ctx - the context of this handler
channel - the current channel
buffer - the cumulative buffer of received packets so far. Note that the buffer might be empty, which means you should not make an assumption that the buffer contains at least one byte in your decoder implementation.
state - the current decoder state (null if unused)
Returns:
the decoded frame
Throws:
Exception

decodeLast

protected Object decodeLast(ChannelHandlerContext ctx,
                            Channel channel,
                            ChannelBuffer buffer,
                            T state)
                     throws Exception
Decodes the received data so far into a frame when the channel is disconnected.

Parameters:
ctx - the context of this handler
channel - the current channel
buffer - the cumulative buffer of received packets so far. Note that the buffer might be empty, which means you should not make an assumption that the buffer contains at least one byte in your decoder implementation.
state - the current decoder state (null if unused)
Returns:
the decoded frame
Throws:
Exception

messageReceived

public void messageReceived(ChannelHandlerContext ctx,
                            MessageEvent e)
                     throws Exception
Description copied from class: SimpleChannelUpstreamHandler
Invoked when a message object (e.g: ChannelBuffer) was received from a remote peer.

Overrides:
messageReceived in class SimpleChannelUpstreamHandler
Throws:
Exception

channelDisconnected

public void channelDisconnected(ChannelHandlerContext ctx,
                                ChannelStateEvent e)
                         throws Exception
Description copied from class: SimpleChannelUpstreamHandler
Invoked when a Channel was disconnected from its remote peer.

Overrides:
channelDisconnected in class SimpleChannelUpstreamHandler
Throws:
Exception

channelClosed

public void channelClosed(ChannelHandlerContext ctx,
                          ChannelStateEvent e)
                   throws Exception
Description copied from class: SimpleChannelUpstreamHandler
Invoked when a Channel was closed and all its related resources were released.

Overrides:
channelClosed in class SimpleChannelUpstreamHandler
Throws:
Exception

exceptionCaught

public void exceptionCaught(ChannelHandlerContext ctx,
                            ExceptionEvent e)
                     throws Exception
Description copied from class: SimpleChannelUpstreamHandler
Invoked when an exception was raised by an I/O thread or a ChannelHandler.

Overrides:
exceptionCaught in class SimpleChannelUpstreamHandler
Throws:
Exception


Copyright © 2008-2011 JBoss, a division of Red Hat, Inc.. All Rights Reserved.