|
||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Objectorg.jboss.netty.channel.SimpleChannelHandler
org.jboss.netty.handler.queue.BufferedWriteHandler
public class BufferedWriteHandler
Emulates buffered write operation. This handler stores all write requests
into an unbounded Queue
and flushes them to the downstream when
flush()
method is called.
Here is an example that demonstrates the usage:
BufferedWriteHandler bufferedWriter = new BufferedWriteHandler(); ChannelPipeline p = ...; p.addFirst("buffer", bufferedWriter); ... Channel ch = ...; // msg1, 2, and 3 are stored in the queue of bufferedWriter. ch.write(msg1); ch.write(msg2); ch.write(msg3); // and will be flushed on request. bufferedWriter.flush();
Channel
is disconnected or closed. However, it does not flush the
queue otherwise. It means you have to call flush()
before the size
of the queue increases too much. You can implement your own auto-flush
strategy by extending this handler:
public class AutoFlusher extendsBufferedWriteHandler
{ private final AtomicLong bufferSize = new AtomicLong(); @Override public void writeRequested(ChannelHandlerContext
ctx,MessageEvent
e) { super.writeRequested(ctx, e);ChannelBuffer
data = (ChannelBuffer
) e.getMessage(); int newBufferSize = bufferSize.addAndGet(data.readableBytes()); // Flush the queue if it gets larger than 8KiB. if (newBufferSize > 8192) { flush(); bufferSize.set(0); } } }
ChannelBuffer
, they can be merged into a single write request
to save the number of system calls.
BEFORE consolidation: AFTER consolidation: +-------+-------+-------+ +-------------+ | Req C | Req B | Req A |------\\| Request ABC | | "789" | "456" | "123" |------//| "123456789" | +-------+-------+-------+ +-------------+This feature is disabled by default. You can override the default when you create this handler or call
flush(boolean)
. If you specified
true
when you call the constructor, calling flush()
will
always consolidate the queue. Otherwise, you have to call
flush(boolean)
with true
to enable this feature for each
flush.
The disadvantage of consolidation is that the ChannelFuture
and its
ChannelFutureListener
s associated with the original write requests
might be notified later than when they are actually written out. They will
always be notified when the consolidated write request is fully written.
The following example implements the consolidation strategy that reduces the number of write requests based on the writability of a channel:
public class ConsolidatingAutoFlusher extendsBufferedWriteHandler
{ public ConsolidatingAutoFlusher() { // Enable consolidation by default. super(true); } @Override public void channelOpen(ChannelHandlerContext
ctx,ChannelStateEvent
e) throws Exception {ChannelConfig
cfg = e.getChannel().getConfig(); if (cfg instanceofNioSocketChannelConfig
) { // Lower the watermark to increase the chance of consolidation. cfg.setWriteBufferLowWaterMark(0); } super.channelOpen(e); } @Override public void writeRequested(ChannelHandlerContext
ctx,MessageEvent
e) throws Exception { super.writeRequested(ctx, et); if (e.getChannel().isWritable()) { flush(); } } @Override public void channelInterestChanged(ChannelHandlerContext
ctx,ChannelStateEvent
e) throws Exception { if (e.getChannel().isWritable()) { flush(); } } }
flush()
should be called.
For example, you could call flush()
periodically, using
HashedWheelTimer
every second.
Nested Class Summary |
---|
Nested classes/interfaces inherited from interface org.jboss.netty.channel.ChannelHandler |
---|
ChannelHandler.Sharable |
Constructor Summary | |
---|---|
BufferedWriteHandler()
Creates a new instance with the default unbounded BlockingQueue
implementation and without buffer consolidation. |
|
BufferedWriteHandler(boolean consolidateOnFlush)
Creates a new instance with the default unbounded BlockingQueue
implementation. |
|
BufferedWriteHandler(Queue<MessageEvent> queue)
Creates a new instance with the specified thread-safe unbounded Queue and without buffer consolidation. |
|
BufferedWriteHandler(Queue<MessageEvent> queue,
boolean consolidateOnFlush)
Creates a new instance with the specified thread-safe unbounded Queue . |
Method Summary | |
---|---|
void |
closeRequested(ChannelHandlerContext ctx,
ChannelStateEvent e)
Invoked when Channel.close() was called. |
void |
disconnectRequested(ChannelHandlerContext ctx,
ChannelStateEvent e)
Invoked when Channel.disconnect() was called. |
void |
flush()
Sends the queued write requests to the downstream. |
void |
flush(boolean consolidateOnFlush)
Sends the queued write requests to the downstream. |
protected Queue<MessageEvent> |
getQueue()
Returns the queue which stores the write requests. |
boolean |
isConsolidateOnFlush()
|
void |
writeRequested(ChannelHandlerContext ctx,
MessageEvent e)
Stores all write requests to the queue so that they are actually written on flush() . |
Methods inherited from class org.jboss.netty.channel.SimpleChannelHandler |
---|
bindRequested, channelBound, channelClosed, channelConnected, channelDisconnected, channelInterestChanged, channelOpen, channelUnbound, childChannelClosed, childChannelOpen, connectRequested, exceptionCaught, handleDownstream, handleUpstream, messageReceived, setInterestOpsRequested, unbindRequested, writeComplete |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Constructor Detail |
---|
public BufferedWriteHandler()
BlockingQueue
implementation and without buffer consolidation.
public BufferedWriteHandler(Queue<MessageEvent> queue)
Queue
and without buffer consolidation. Please note that
specifying a bounded Queue
or a thread-unsafe Queue
will
result in an unspecified behavior.
public BufferedWriteHandler(boolean consolidateOnFlush)
BlockingQueue
implementation.
consolidateOnFlush
- true
if and only if the buffered write requests are merged
into a single write request on flush()
public BufferedWriteHandler(Queue<MessageEvent> queue, boolean consolidateOnFlush)
Queue
. Please note that specifying a bounded Queue
or
a thread-unsafe Queue
will result in an unspecified behavior.
consolidateOnFlush
- true
if and only if the buffered write requests are merged
into a single write request on flush()
Method Detail |
---|
public boolean isConsolidateOnFlush()
protected Queue<MessageEvent> getQueue()
public void flush()
public void flush(boolean consolidateOnFlush)
consolidateOnFlush
- true
if and only if the buffered write requests are merged
into a single write requestpublic void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception
flush()
.
writeRequested
in class SimpleChannelHandler
Exception
public void disconnectRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
SimpleChannelHandler
Channel.disconnect()
was called.
disconnectRequested
in class SimpleChannelHandler
Exception
public void closeRequested(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
SimpleChannelHandler
Channel.close()
was called.
closeRequested
in class SimpleChannelHandler
Exception
|
||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |