|
||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Objectorg.jboss.netty.channel.SimpleChannelUpstreamHandler
org.jboss.netty.handler.codec.replay.ReplayingDecoder<T>
T
- the state type; use VoidEnum
if state management is unused@ChannelPipelineCoverage(value="one") public abstract class ReplayingDecoder<T extends Enum<T>>
A specialized variation of FrameDecoder
which enables implementation
of a non-blocking decoder in the blocking I/O paradigm.
The biggest difference between ReplayingDecoder
and
FrameDecoder
is that ReplayingDecoder
allows you to
implement the decode()
and decodeLast()
methods just like
all required bytes were received already, rather than checking the
availability of the required bytes. For example, the following
FrameDecoder
implementation:
public class IntegerHeaderFrameDecoder extends FrameDecoder { protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception { if (buf.readableBytes() < 4) { return null; } buf.markReaderIndex(); int length = buf.readInt(); if (buf.readableBytes() < length) { buf.resetReaderIndex(); return null; } return buf.readBytes(length); } }is simplified like the following with
ReplayingDecoder
:
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<VoidEnum> { protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf, VoidEnum state) throws Exception { return buf.readBytes(buf.readInt()); } }
ReplayingDecoder
passes a specialized ChannelBuffer
implementation which throws an Error
of certain type when there's not
enough data in the buffer. In the IntegerHeaderFrameDecoder
above,
you just assumed that there will be 4 or more bytes in the buffer when
you call buf.readInt()
. If there's really 4 bytes in the buffer,
it will return the integer header as you expected. Otherwise, the
Error
will be raised and the control will be returned to
ReplayingDecoder
. If ReplayingDecoder
catches the
Error
, then it will rewind the readerIndex
of the buffer
back to the 'initial' position (i.e. the beginning of the buffer) and call
the decode(..)
method again when more data is received into the
buffer.
Please note that the overhead of throwing an Error
is minimal unlike
throwing a new Exception
in an ordinary way. ReplayingDecoder
reuses the same Error
instance so that it does not need to fill its
stack trace, which takes most of Exception
initialization time.
At the cost of the simplicity, ReplayingDecoder
enforces you a few
limitations:
decode(..)
method can be called many
times to decode a single message. For example, the following code will
not work:
public class MyDecoder extends ReplayingDecoder<VoidEnum> { private final Queue<Integer> values = new LinkedList<Integer>(); public Object decode(.., ChannelBuffer buffer, ..) throws Exception { // A message contains 2 integers. values.offer(buffer.readInt()); values.offer(buffer.readInt()); // This assertion will fail intermittently since values.offer() // can be called more than two times! assert values.size() == 2; return values.poll() + values.poll(); } }The correct implementation looks like the following, and you can also utilize the 'checkpoint' feature which is explained in detail in the next section.
public class MyDecoder extends ReplayingDecoder<VoidEnum> { private final Queue<Integer> values = new LinkedList<Integer>(); public Object decode(.., ChannelBuffer buffer, ..) throws Exception { // Revert the state of the variable that might have been changed // since the last partial decode. values.clear(); // A message contains 2 integers. values.offer(buffer.readInt()); values.offer(buffer.readInt()); // Now we know this assertion will never fail. assert values.size() == 2; return values.poll() + values.poll(); } }
Fortunately, the performance of a complex decoder implementation can be
improved significantly with the checkpoint()
method. The
checkpoint()
method updates the 'initial' position of the buffer so
that ReplayingDecoder
rewinds the readerIndex
of the buffer
to the last position where you called the checkpoint()
method.
checkpoint(T)
with an Enum
Although you can just use checkpoint()
method and manage the state
of the decoder by yourself, the easiest way to manage the state of the
decoder is to create an Enum
type which represents the current state
of the decoder and to call checkpoint(T)
method whenever the state
changes. You can have as many states as you want depending on the
complexity of the message you want to decode:
public enum MyDecoderState { READ_LENGTH, READ_CONTENT; } public class IntegerHeaderFrameDecoder extends ReplayingDecoder<MyDecoderState> { private int length; public IntegerHeaderFrameDecoder() { // Set the initial state. super(MyDecoderState.READ_LENGTH); } protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf, MyDecoderState state) throws Exception { switch (state) { case READ_LENGTH: length = buf.readInt(); checkpoint(MyDecoderState.READ_CONTENT); case READ_CONTENT: ChannelBuffer frame = buf.readBytes(length); checkpoint(MyDecoderState.READ_LENGTH); return frame; default: throw new Error("Shouldn't reach here."); } } }
checkpoint()
with no parameterAn alternative way to manage the decoder state is to manage it by yourself.
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<VoidEnum> { private boolean readLength; private int length; protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf, MyDecoderState state) throws Exception { if (!readLength) { length = buf.readInt(); readLength = true; checkpoint(); } if (readLength) { ChannelBuffer frame = buf.readBytes(length); readLength = false; checkpoint(); return frame; } } }
If you are going to write a protocol multiplexer, you will probably want to
replace a ReplayingDecoder
(protocol detector) with another
ReplayingDecoder
or FrameDecoder
(actual protocol decoder).
It is not possible to achieve this simply by calling
ChannelPipeline.replace(ChannelHandler, String, ChannelHandler)
, but
some additional steps are required:
public class FirstDecoder extends ReplayingDecoder<VoidEnum> { public FirstDecoder() { super(true); // Enable unfold } protected Object decode(ChannelHandlerContext ctx, Channel ch, ChannelBuffer buf, VoidEnum state) { ... // Decode the first message Object firstMessage = ...; // Add the second decoder ctx.getPipeline().addLast("second", new SecondDecoder()); // Remove the first decoder (me) ctx.getPipeline().remove(this); if (buf.readable()) { // Hand off the remaining data to the second decoder return new Object[] { firstMessage, buf.readBytes(super.actualReadableBytes()) }; } else { // Nothing to hand off return firstMessage; } }
Constructor Summary | |
---|---|
protected |
ReplayingDecoder()
Creates a new instance with no initial state (i.e: null ). |
protected |
ReplayingDecoder(boolean unfold)
|
protected |
ReplayingDecoder(T initialState)
Creates a new instance with the specified initial state. |
protected |
ReplayingDecoder(T initialState,
boolean unfold)
|
Method Summary | |
---|---|
protected int |
actualReadableBytes()
Returns the actual number of readable bytes in the cumulative buffer of this decoder. |
void |
channelClosed(ChannelHandlerContext ctx,
ChannelStateEvent e)
Invoked when a Channel was closed and all its related resources
were released. |
void |
channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e)
Invoked when a Channel was disconnected from its remote peer. |
protected void |
checkpoint()
Stores the internal cumulative buffer's reader position. |
protected void |
checkpoint(T state)
Stores the internal cumulative buffer's reader position and updates the current decoder state. |
protected abstract Object |
decode(ChannelHandlerContext ctx,
Channel channel,
ChannelBuffer buffer,
T state)
Decodes the received packets so far into a frame. |
protected Object |
decodeLast(ChannelHandlerContext ctx,
Channel channel,
ChannelBuffer buffer,
T state)
Decodes the received data so far into a frame when the channel is disconnected. |
void |
exceptionCaught(ChannelHandlerContext ctx,
ExceptionEvent e)
Invoked when an exception was raised by an I/O thread or a ChannelHandler . |
protected T |
getState()
Returns the current state of this decoder. |
void |
messageReceived(ChannelHandlerContext ctx,
MessageEvent e)
Invoked when a message object (e.g: ChannelBuffer ) was received
from a remote peer. |
protected T |
setState(T newState)
Sets the current state of this decoder. |
Methods inherited from class org.jboss.netty.channel.SimpleChannelUpstreamHandler |
---|
channelBound, channelConnected, channelInterestChanged, channelOpen, channelUnbound, childChannelClosed, childChannelOpen, handleUpstream, writeComplete |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Constructor Detail |
---|
protected ReplayingDecoder()
null
).
protected ReplayingDecoder(boolean unfold)
protected ReplayingDecoder(T initialState)
protected ReplayingDecoder(T initialState, boolean unfold)
Method Detail |
---|
protected void checkpoint()
protected void checkpoint(T state)
protected T getState()
protected T setState(T newState)
protected int actualReadableBytes()
protected abstract Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state) throws Exception
ctx
- the context of this handlerchannel
- the current channelbuffer
- the cumulative buffer of received packets so far.
Note that the buffer might be empty, which means you
should not make an assumption that the buffer contains
at least one byte in your decoder implementation.state
- the current decoder state (null
if unused)
Exception
protected Object decodeLast(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state) throws Exception
ctx
- the context of this handlerchannel
- the current channelbuffer
- the cumulative buffer of received packets so far.
Note that the buffer might be empty, which means you
should not make an assumption that the buffer contains
at least one byte in your decoder implementation.state
- the current decoder state (null
if unused)
Exception
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
SimpleChannelUpstreamHandler
ChannelBuffer
) was received
from a remote peer.
messageReceived
in class SimpleChannelUpstreamHandler
Exception
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
SimpleChannelUpstreamHandler
Channel
was disconnected from its remote peer.
channelDisconnected
in class SimpleChannelUpstreamHandler
Exception
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
SimpleChannelUpstreamHandler
Channel
was closed and all its related resources
were released.
channelClosed
in class SimpleChannelUpstreamHandler
Exception
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception
SimpleChannelUpstreamHandler
ChannelHandler
.
exceptionCaught
in class SimpleChannelUpstreamHandler
Exception
|
||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |