View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.replay;
17  
18  import java.net.SocketAddress;
19  import java.util.concurrent.atomic.AtomicReference;
20  
21  import org.jboss.netty.buffer.ChannelBuffer;
22  import org.jboss.netty.buffer.ChannelBufferFactory;
23  import org.jboss.netty.buffer.ChannelBuffers;
24  import org.jboss.netty.channel.Channel;
25  import org.jboss.netty.channel.ChannelHandler;
26  import org.jboss.netty.channel.ChannelHandlerContext;
27  import org.jboss.netty.channel.ChannelPipeline;
28  import org.jboss.netty.channel.ChannelStateEvent;
29  import org.jboss.netty.channel.Channels;
30  import org.jboss.netty.channel.ExceptionEvent;
31  import org.jboss.netty.channel.MessageEvent;
32  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
33  import org.jboss.netty.handler.codec.frame.FrameDecoder;
34  
35  /**
36   * A specialized variation of {@link FrameDecoder} which enables implementation
37   * of a non-blocking decoder in the blocking I/O paradigm.
38   * <p>
39   * The biggest difference between {@link ReplayingDecoder} and
40   * {@link FrameDecoder} is that {@link ReplayingDecoder} allows you to
41   * implement the {@code decode()} and {@code decodeLast()} methods just like
42   * all required bytes were received already, rather than checking the
43   * availability of the required bytes.  For example, the following
44   * {@link FrameDecoder} implementation:
45   * <pre>
46   * public class IntegerHeaderFrameDecoder extends {@link FrameDecoder} {
47   *
48   *   {@code @Override}
49   *   protected Object decode({@link ChannelHandlerContext} ctx,
50   *                           {@link Channel} channel,
51   *                           {@link ChannelBuffer} buf) throws Exception {
52   *
53   *     if (buf.readableBytes() &lt; 4) {
54   *        return <strong>null</strong>;
55   *     }
56   *
57   *     buf.markReaderIndex();
58   *     int length = buf.readInt();
59   *
60   *     if (buf.readableBytes() &lt; length) {
61   *        buf.resetReaderIndex();
62   *        return <strong>null</strong>;
63   *     }
64   *
65   *     return buf.readBytes(length);
66   *   }
67   * }
68   * </pre>
69   * is simplified like the following with {@link ReplayingDecoder}:
70   * <pre>
71   * public class IntegerHeaderFrameDecoder
72   *      extends {@link ReplayingDecoder}&lt;{@link VoidEnum}&gt; {
73   *
74   *   protected Object decode({@link ChannelHandlerContext} ctx,
75   *                           {@link Channel} channel,
76   *                           {@link ChannelBuffer} buf,
77   *                           {@link VoidEnum} state) throws Exception {
78   *
79   *     return buf.readBytes(buf.readInt());
80   *   }
81   * }
82   * </pre>
83   *
84   * <h3>How does this work?</h3>
85   * <p>
86   * {@link ReplayingDecoder} passes a specialized {@link ChannelBuffer}
87   * implementation which throws an {@link Error} of certain type when there's not
88   * enough data in the buffer.  In the {@code IntegerHeaderFrameDecoder} above,
89   * you just assumed that there will be 4 or more bytes in the buffer when
90   * you call {@code buf.readInt()}.  If there's really 4 bytes in the buffer,
91   * it will return the integer header as you expected.  Otherwise, the
92   * {@link Error} will be raised and the control will be returned to
93   * {@link ReplayingDecoder}.  If {@link ReplayingDecoder} catches the
94   * {@link Error}, then it will rewind the {@code readerIndex} of the buffer
95   * back to the 'initial' position (i.e. the beginning of the buffer) and call
96   * the {@code decode(..)} method again when more data is received into the
97   * buffer.
98   * <p>
99   * Please note that {@link ReplayingDecoder} always throws the same cached
100  * {@link Error} instance to avoid the overhead of creating a new {@link Error}
101  * and filling its stack trace for every throw.
102  *
103  * <h3>Limitations</h3>
104  * <p>
105  * At the cost of the simplicity, {@link ReplayingDecoder} enforces you a few
106  * limitations:
107  * <ul>
108  * <li>Some buffer operations are prohibited.</li>
109  * <li>Performance can be worse if the network is slow and the message
110  *     format is complicated unlike the example above.  In this case, your
111  *     decoder might have to decode the same part of the message over and over
112  *     again.</li>
113  * <li>You must keep in mind that {@code decode(..)} method can be called many
114  *     times to decode a single message.  For example, the following code will
115  *     not work:
116  * <pre> public class MyDecoder extends {@link ReplayingDecoder}&lt;{@link VoidEnum}&gt; {
117  *
118  *   private final Queue&lt;Integer&gt; values = new LinkedList&lt;Integer&gt;();
119  *
120  *   {@code @Override}
121  *   public Object decode(.., {@link ChannelBuffer} buffer, ..) throws Exception {
122  *
123  *     // A message contains 2 integers.
124  *     values.offer(buffer.readInt());
125  *     values.offer(buffer.readInt());
126  *
127  *     // This assertion will fail intermittently since values.offer()
128  *     // can be called more than two times!
129  *     assert values.size() == 2;
130  *     return values.poll() + values.poll();
131  *   }
132  * }</pre>
133  *      The correct implementation looks like the following, and you can also
134  *      utilize the 'checkpoint' feature which is explained in detail in the
135  *      next section.
136  * <pre> public class MyDecoder extends {@link ReplayingDecoder}&lt;{@link VoidEnum}&gt; {
137  *
138  *   private final Queue&lt;Integer&gt; values = new LinkedList&lt;Integer&gt;();
139  *
140  *   {@code @Override}
141  *   public Object decode(.., {@link ChannelBuffer} buffer, ..) throws Exception {
142  *
143  *     // Revert the state of the variable that might have been changed
144  *     // since the last partial decode.
145  *     values.clear();
146  *
147  *     // A message contains 2 integers.
148  *     values.offer(buffer.readInt());
149  *     values.offer(buffer.readInt());
150  *
151  *     // Now we know this assertion will never fail.
152  *     assert values.size() == 2;
153  *     return values.poll() + values.poll();
154  *   }
155  * }</pre>
156  *     </li>
157  * </ul>
158  *
159  * <h3>Improving the performance</h3>
160  * <p>
161  * Fortunately, the performance of a complex decoder implementation can be
162  * improved significantly with the {@code checkpoint()} method.  The
163  * {@code checkpoint()} method updates the 'initial' position of the buffer so
164  * that {@link ReplayingDecoder} rewinds the {@code readerIndex} of the buffer
165  * to the last position where you called the {@code checkpoint()} method.
166  *
167  * <h4>Calling {@code checkpoint(T)} with an {@link Enum}</h4>
168  * <p>
169  * Although you can just use {@code checkpoint()} method and manage the state
170  * of the decoder by yourself, the easiest way to manage the state of the
171  * decoder is to create an {@link Enum} type which represents the current state
172  * of the decoder and to call {@code checkpoint(T)} method whenever the state
173  * changes.  You can have as many states as you want depending on the
174  * complexity of the message you want to decode:
175  *
176  * <pre>
177  * public enum MyDecoderState {
178  *   READ_LENGTH,
179  *   READ_CONTENT;
180  * }
181  *
182  * public class IntegerHeaderFrameDecoder
183  *      extends {@link ReplayingDecoder}&lt;<strong>MyDecoderState</strong>&gt; {
184  *
185  *   private int length;
186  *
187  *   public IntegerHeaderFrameDecoder() {
188  *     // Set the initial state.
189  *     <strong>super(MyDecoderState.READ_LENGTH);</strong>
190  *   }
191  *
192  *   {@code @Override}
193  *   protected Object decode({@link ChannelHandlerContext} ctx,
194  *                           {@link Channel} channel,
195  *                           {@link ChannelBuffer} buf,
196  *                           <b>MyDecoderState</b> state) throws Exception {
197  *     switch (state) {
198  *     case READ_LENGTH:
199  *       length = buf.readInt();
200  *       <strong>checkpoint(MyDecoderState.READ_CONTENT);</strong>
201  *     case READ_CONTENT:
202  *       ChannelBuffer frame = buf.readBytes(length);
203  *       <strong>checkpoint(MyDecoderState.READ_LENGTH);</strong>
204  *       return frame;
205  *     default:
206  *       throw new Error("Shouldn't reach here.");
207  *     }
208  *   }
209  * }
210  * </pre>
211  *
212  * <h4>Calling {@code checkpoint()} with no parameter</h4>
213  * <p>
214  * An alternative way to manage the decoder state is to manage it by yourself.
215  * <pre>
216  * public class IntegerHeaderFrameDecoder
217  *      extends {@link ReplayingDecoder}&lt;<strong>{@link VoidEnum}</strong>&gt; {
218  *
219  *   <strong>private boolean readLength;</strong>
220  *   private int length;
221  *
222  *   {@code @Override}
223  *   protected Object decode({@link ChannelHandlerContext} ctx,
224  *                           {@link Channel} channel,
225  *                           {@link ChannelBuffer} buf,
226  *                           {@link VoidEnum} state) throws Exception {
227  *     if (!readLength) {
228  *       length = buf.readInt();
229  *       <strong>readLength = true;</strong>
230  *       <strong>checkpoint();</strong>
231  *     }
232  *
233  *     if (readLength) {
234  *       ChannelBuffer frame = buf.readBytes(length);
235  *       <strong>readLength = false;</strong>
236  *       <strong>checkpoint();</strong>
237  *       return frame;
238  *     }
239  *   }
240  * }
241  * </pre>
242  *
243  * <h3>Replacing a decoder with another decoder in a pipeline</h3>
244  * <p>
245  * If you are going to write a protocol multiplexer, you will probably want to
246  * replace a {@link ReplayingDecoder} (protocol detector) with another
247  * {@link ReplayingDecoder} or {@link FrameDecoder} (actual protocol decoder).
248  * It is not possible to achieve this simply by calling
249  * {@link ChannelPipeline#replace(ChannelHandler, String, ChannelHandler)}, but
250  * some additional steps are required:
251  * <pre>
252  * public class FirstDecoder extends {@link ReplayingDecoder}&lt;{@link VoidEnum}&gt; {
253  *
254  *     public FirstDecoder() {
255  *         super(true); // Enable unfold
256  *     }
257  *
258  *     {@code @Override}
259  *     protected Object decode({@link ChannelHandlerContext} ctx,
260  *                             {@link Channel} ch,
261  *                             {@link ChannelBuffer} buf,
262  *                             {@link VoidEnum} state) {
263  *         ...
264  *         // Decode the first message
265  *         Object firstMessage = ...;
266  *
267  *         // Add the second decoder
268  *         ctx.getPipeline().addLast("second", new SecondDecoder());
269  *
270  *         // Remove the first decoder (me)
271  *         ctx.getPipeline().remove(this);
272  *
273  *         if (buf.readable()) {
274  *             // Hand off the remaining data to the second decoder
275  *             return new Object[] { firstMessage, buf.readBytes(<b>super.actualReadableBytes()</b>) };
276  *         } else {
277  *             // Nothing to hand off
278  *             return firstMessage;
279  *         }
280  *     }
281  * </pre>
282  *
283  * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
284  * @author <a href="http://gleamynode.net/">Trustin Lee</a>
285  *
286  * @version $Rev: 2380 $, $Date: 2010-11-09 14:35:24 +0900 (Tue, 09 Nov 2010) $
287  *
288  * @param <T>
289  *        the state type; use {@link VoidEnum} if state management is unused
290  *
291  * @apiviz.landmark
292  * @apiviz.has org.jboss.netty.handler.codec.replay.UnreplayableOperationException oneway - - throws
293  */
294 public abstract class ReplayingDecoder<T extends Enum<T>>
295         extends SimpleChannelUpstreamHandler {
296 
297 
298     private final AtomicReference<ChannelBuffer> cumulation =
299         new AtomicReference<ChannelBuffer>();
300     private final boolean unfold;
301     private ReplayingDecoderBuffer replayable;
302     private T state;
303     private int checkpoint;
304 
305     /**
306      * Creates a new instance with no initial state (i.e: {@code null}).
307      */
308     protected ReplayingDecoder() {
309         this(null);
310     }
311 
312     protected ReplayingDecoder(boolean unfold) {
313         this(null, unfold);
314     }
315 
316     /**
317      * Creates a new instance with the specified initial state.
318      */
319     protected ReplayingDecoder(T initialState) {
320         this(initialState, false);
321     }
322 
323     protected ReplayingDecoder(T initialState, boolean unfold) {
324         this.state = initialState;
325         this.unfold = unfold;
326     }
327 
328     /**
329      * Stores the internal cumulative buffer's reader position.
330      */
331     protected void checkpoint() {
332         ChannelBuffer cumulation = this.cumulation.get();
333         if (cumulation != null) {
334             checkpoint = cumulation.readerIndex();
335         } else {
336             checkpoint = -1; // buffer not available (already cleaned up)
337         }
338     }
339 
340     /**
341      * Stores the internal cumulative buffer's reader position and updates
342      * the current decoder state.
343      */
344     protected void checkpoint(T state) {
345         checkpoint();
346         setState(state);
347     }
348 
349     /**
350      * Returns the current state of this decoder.
351      * @return the current state of this decoder
352      */
353     protected T getState() {
354         return state;
355     }
356 
357     /**
358      * Sets the current state of this decoder.
359      * @return the old state of this decoder
360      */
361     protected T setState(T newState) {
362         T oldState = state;
363         state = newState;
364         return oldState;
365     }
366 
367     /**
368      * Returns the actual number of readable bytes in the internal cumulative
369      * buffer of this decoder.  You usually do not need to rely on this value
370      * to write a decoder.  Use it only when you muse use it at your own risk.
371      * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}.
372      */
373     protected int actualReadableBytes() {
374         return internalBuffer().readableBytes();
375     }
376 
377     /**
378      * Returns the internal cumulative buffer of this decoder.  You usually
379      * do not need to access the internal buffer directly to write a decoder.
380      * Use it only when you must use it at your own risk.
381      */
382     protected ChannelBuffer internalBuffer() {
383         ChannelBuffer buf = cumulation.get();
384         if (buf == null) {
385             return ChannelBuffers.EMPTY_BUFFER;
386         }
387         return buf;
388     }
389 
390     /**
391      * Decodes the received packets so far into a frame.
392      *
393      * @param ctx      the context of this handler
394      * @param channel  the current channel
395      * @param buffer   the cumulative buffer of received packets so far.
396      *                 Note that the buffer might be empty, which means you
397      *                 should not make an assumption that the buffer contains
398      *                 at least one byte in your decoder implementation.
399      * @param state    the current decoder state ({@code null} if unused)
400      *
401      * @return the decoded frame
402      */
403     protected abstract Object decode(ChannelHandlerContext ctx,
404             Channel channel, ChannelBuffer buffer, T state) throws Exception;
405 
406     /**
407      * Decodes the received data so far into a frame when the channel is
408      * disconnected.
409      *
410      * @param ctx      the context of this handler
411      * @param channel  the current channel
412      * @param buffer   the cumulative buffer of received packets so far.
413      *                 Note that the buffer might be empty, which means you
414      *                 should not make an assumption that the buffer contains
415      *                 at least one byte in your decoder implementation.
416      * @param state    the current decoder state ({@code null} if unused)
417      *
418      * @return the decoded frame
419      */
420     protected Object decodeLast(
421             ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, T state) throws Exception {
422         return decode(ctx, channel, buffer, state);
423     }
424 
425     @Override
426     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
427             throws Exception {
428 
429         Object m = e.getMessage();
430         if (!(m instanceof ChannelBuffer)) {
431             ctx.sendUpstream(e);
432             return;
433         }
434 
435         ChannelBuffer input = (ChannelBuffer) m;
436         if (!input.readable()) {
437             return;
438         }
439 
440         ChannelBuffer cumulation = cumulation(ctx);
441         cumulation.discardReadBytes();
442         cumulation.writeBytes(input);
443         callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
444     }
445 
446     @Override
447     public void channelDisconnected(ChannelHandlerContext ctx,
448             ChannelStateEvent e) throws Exception {
449         cleanup(ctx, e);
450     }
451 
452     @Override
453     public void channelClosed(ChannelHandlerContext ctx,
454             ChannelStateEvent e) throws Exception {
455         cleanup(ctx, e);
456     }
457 
458     @Override
459     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
460             throws Exception {
461         ctx.sendUpstream(e);
462     }
463 
464     private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception {
465         while (cumulation.readable()) {
466             int oldReaderIndex = checkpoint = cumulation.readerIndex();
467             Object result = null;
468             T oldState = state;
469             try {
470                 result = decode(context, channel, replayable, state);
471                 if (result == null) {
472                     if (oldReaderIndex == cumulation.readerIndex() && oldState == state) {
473                         throw new IllegalStateException(
474                                 "null cannot be returned if no data is consumed and state didn't change.");
475                     } else {
476                         // Previous data has been discarded or caused state transition.
477                         // Probably it is reading on.
478                         continue;
479                     }
480                 }
481             } catch (ReplayError replay) {
482                 // Return to the checkpoint (or oldPosition) and retry.
483                 int checkpoint = this.checkpoint;
484                 if (checkpoint >= 0) {
485                     cumulation.readerIndex(checkpoint);
486                 } else {
487                     // Called by cleanup() - no need to maintain the readerIndex
488                     // anymore because the buffer has been released already.
489                 }
490             }
491 
492             if (result == null) {
493                 // Seems like more data is required.
494                 // Let us wait for the next notification.
495                 break;
496             }
497 
498             if (oldReaderIndex == cumulation.readerIndex() && oldState == state) {
499                 throw new IllegalStateException(
500                         "decode() method must consume at least one byte " +
501                         "if it returned a decoded message (caused by: " +
502                         getClass() + ")");
503             }
504 
505             // A successful decode
506             unfoldAndFireMessageReceived(context, result, remoteAddress);
507         }
508     }
509 
510     private void unfoldAndFireMessageReceived(
511             ChannelHandlerContext context, Object result, SocketAddress remoteAddress) {
512         if (unfold) {
513             if (result instanceof Object[]) {
514                 for (Object r: (Object[]) result) {
515                     Channels.fireMessageReceived(context, r, remoteAddress);
516                 }
517             } else if (result instanceof Iterable<?>) {
518                 for (Object r: (Iterable<?>) result) {
519                     Channels.fireMessageReceived(context, r, remoteAddress);
520                 }
521             } else {
522                 Channels.fireMessageReceived(context, result, remoteAddress);
523             }
524         } else {
525             Channels.fireMessageReceived(context, result, remoteAddress);
526         }
527     }
528 
529     private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e)
530             throws Exception {
531         try {
532             ChannelBuffer cumulation = this.cumulation.getAndSet(null);
533             if (cumulation == null) {
534                 return;
535             }
536 
537             replayable.terminate();
538 
539             if (cumulation.readable()) {
540                 // Make sure all data was read before notifying a closed channel.
541                 callDecode(ctx, e.getChannel(), cumulation, null);
542             }
543 
544             // Call decodeLast() finally.  Please note that decodeLast() is
545             // called even if there's nothing more to read from the buffer to
546             // notify a user that the connection was closed explicitly.
547             Object partiallyDecoded = decodeLast(ctx, e.getChannel(), replayable, state);
548             if (partiallyDecoded != null) {
549                 unfoldAndFireMessageReceived(ctx, partiallyDecoded, null);
550             }
551         } catch (ReplayError replay) {
552             // Ignore
553         } finally {
554             ctx.sendUpstream(e);
555         }
556     }
557 
558     private ChannelBuffer cumulation(ChannelHandlerContext ctx) {
559         ChannelBuffer buf = cumulation.get();
560         if (buf == null) {
561             ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
562             buf = new UnsafeDynamicChannelBuffer(factory);
563             if (cumulation.compareAndSet(null, buf)) {
564                 replayable = new ReplayingDecoderBuffer(buf);
565             } else {
566                 buf = cumulation.get();
567             }
568         }
569         return buf;
570     }
571 }