org.jboss.netty.handler.queue
Class BufferedWriteHandler

java.lang.Object
  extended by org.jboss.netty.channel.SimpleChannelHandler
      extended by org.jboss.netty.handler.queue.BufferedWriteHandler
All Implemented Interfaces:
ChannelDownstreamHandler, ChannelHandler, ChannelUpstreamHandler

public class BufferedWriteHandler
extends SimpleChannelHandler

Emulates buffered write operation. This handler stores all write requests into an unbounded Queue and flushes them to the downstream when flush() method is called.

Here is an example that demonstrates the usage:

 BufferedWriteHandler bufferedWriter = new BufferedWriteHandler();
 ChannelPipeline p = ...;
 p.addFirst("buffer", bufferedWriter);

 ...

 Channel ch = ...;

 // msg1, 2, and 3 are stored in the queue of bufferedWriter.
 ch.write(msg1);
 ch.write(msg2);
 ch.write(msg3);

 // and will be flushed on request.
 bufferedWriter.flush();
 

Auto-flush

The write request queue is automatically flushed when the associated Channel is disconnected or closed. However, it does not flush the queue otherwise. It means you have to call flush() before the size of the queue increases too much. You can implement your own auto-flush strategy by extending this handler:
 public class AutoFlusher extends BufferedWriteHandler {

     private final AtomicLong bufferSize = new AtomicLong();

     @Override
     public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) {
         super.writeRequested(ctx, e);

         ChannelBuffer data = (ChannelBuffer) e.getMessage();
         int newBufferSize = bufferSize.addAndGet(data.readableBytes());

         // Flush the queue if it gets larger than 8KiB.
         if (newBufferSize > 8192) {
             flush();
             bufferSize.set(0);
         }
     }
 }
 

Consolidate on flush

If there are two or more write requests in the queue and all their message type is ChannelBuffer, they can be merged into a single write request to save the number of system calls.
 BEFORE consolidation:            AFTER consolidation:
 +-------+-------+-------+        +-------------+
 | Req C | Req B | Req A |------\\| Request ABC |
 | "789" | "456" | "123" |------//| "123456789" |
 +-------+-------+-------+        +-------------+
 
This feature is disabled by default. You can override the default when you create this handler or call flush(boolean). If you specified true when you call the constructor, calling flush() will always consolidate the queue. Otherwise, you have to call flush(boolean) with true to enable this feature for each flush.

The disadvantage of consolidation is that the ChannelFuture and its ChannelFutureListeners associated with the original write requests might be notified later than when they are actually written out. They will always be notified when the consolidated write request is fully written.

The following example implements the consolidation strategy that reduces the number of write requests based on the writability of a channel:

 public class ConsolidatingAutoFlusher extends BufferedWriteHandler {

     public ConsolidatingAutoFlusher() {
         // Enable consolidation by default.
         super(true);
     }

     @Override
     public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
         ChannelConfig cfg = e.getChannel().getConfig();
         if (cfg instanceof NioSocketChannelConfig) {
             // Lower the watermark to increase the chance of consolidation.
             cfg.setWriteBufferLowWaterMark(0);
         }
         super.channelOpen(e);
     }

     @Override
     public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
         super.writeRequested(ctx, et);
         if (e.getChannel().isWritable()) {
             flush();
         }
     }

     @Override
     public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
         if (e.getChannel().isWritable()) {
             flush();
         }
     }
 }
 

Prioritized Writes

You can implement prioritized writes by specifying an unbounded priority queue in the constructor of this handler. It will be required to design the proper strategy to determine how often flush() should be called. For example, you could call flush() periodically, using HashedWheelTimer every second.

Version:
$Rev: 2243 $, $Date: 2010-04-16 14:01:55 +0900 (Fri, 16 Apr 2010) $
Author:
The Netty Project, Trustin Lee

Nested Class Summary
 
Nested classes/interfaces inherited from interface org.jboss.netty.channel.ChannelHandler
ChannelHandler.Sharable
 
Constructor Summary
BufferedWriteHandler()
          Creates a new instance with the default unbounded BlockingQueue implementation and without buffer consolidation.
BufferedWriteHandler(boolean consolidateOnFlush)
          Creates a new instance with the default unbounded BlockingQueue implementation.
BufferedWriteHandler(Queue<MessageEvent> queue)
          Creates a new instance with the specified thread-safe unbounded Queue and without buffer consolidation.
BufferedWriteHandler(Queue<MessageEvent> queue, boolean consolidateOnFlush)
          Creates a new instance with the specified thread-safe unbounded Queue.
 
Method Summary
 void closeRequested(ChannelHandlerContext ctx, ChannelStateEvent e)
          Invoked when Channel.close() was called.
 void disconnectRequested(ChannelHandlerContext ctx, ChannelStateEvent e)
          Invoked when Channel.disconnect() was called.
 void flush()
          Sends the queued write requests to the downstream.
 void flush(boolean consolidateOnFlush)
          Sends the queued write requests to the downstream.
protected  Queue<MessageEvent> getQueue()
          Returns the queue which stores the write requests.
 boolean isConsolidateOnFlush()
           
 void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
          Stores all write requests to the queue so that they are actually written on flush().
 
Methods inherited from class org.jboss.netty.channel.SimpleChannelHandler
bindRequested, channelBound, channelClosed, channelConnected, channelDisconnected, channelInterestChanged, channelOpen, channelUnbound, childChannelClosed, childChannelOpen, connectRequested, exceptionCaught, handleDownstream, handleUpstream, messageReceived, setInterestOpsRequested, unbindRequested, writeComplete
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

BufferedWriteHandler

public BufferedWriteHandler()
Creates a new instance with the default unbounded BlockingQueue implementation and without buffer consolidation.


BufferedWriteHandler

public BufferedWriteHandler(Queue<MessageEvent> queue)
Creates a new instance with the specified thread-safe unbounded Queue and without buffer consolidation. Please note that specifying a bounded Queue or a thread-unsafe Queue will result in an unspecified behavior.


BufferedWriteHandler

public BufferedWriteHandler(boolean consolidateOnFlush)
Creates a new instance with the default unbounded BlockingQueue implementation.

Parameters:
consolidateOnFlush - true if and only if the buffered write requests are merged into a single write request on flush()

BufferedWriteHandler

public BufferedWriteHandler(Queue<MessageEvent> queue,
                            boolean consolidateOnFlush)
Creates a new instance with the specified thread-safe unbounded Queue. Please note that specifying a bounded Queue or a thread-unsafe Queue will result in an unspecified behavior.

Parameters:
consolidateOnFlush - true if and only if the buffered write requests are merged into a single write request on flush()
Method Detail

isConsolidateOnFlush

public boolean isConsolidateOnFlush()

getQueue

protected Queue<MessageEvent> getQueue()
Returns the queue which stores the write requests. The default implementation returns the queue which was specified in the constructor.


flush

public void flush()
Sends the queued write requests to the downstream.


flush

public void flush(boolean consolidateOnFlush)
Sends the queued write requests to the downstream.

Parameters:
consolidateOnFlush - true if and only if the buffered write requests are merged into a single write request

writeRequested

public void writeRequested(ChannelHandlerContext ctx,
                           MessageEvent e)
                    throws Exception
Stores all write requests to the queue so that they are actually written on flush().

Overrides:
writeRequested in class SimpleChannelHandler
Throws:
Exception

disconnectRequested

public void disconnectRequested(ChannelHandlerContext ctx,
                                ChannelStateEvent e)
                         throws Exception
Description copied from class: SimpleChannelHandler
Invoked when Channel.disconnect() was called.

Overrides:
disconnectRequested in class SimpleChannelHandler
Throws:
Exception

closeRequested

public void closeRequested(ChannelHandlerContext ctx,
                           ChannelStateEvent e)
                    throws Exception
Description copied from class: SimpleChannelHandler
Invoked when Channel.close() was called.

Overrides:
closeRequested in class SimpleChannelHandler
Throws:
Exception


Copyright © 2008-2011 JBoss, a division of Red Hat, Inc.. All Rights Reserved.