View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.queue;
17  
18  import java.util.ArrayList;
19  import java.util.List;
20  import java.util.Queue;
21  import java.util.concurrent.BlockingQueue;
22  
23  import org.jboss.netty.buffer.ChannelBuffer;
24  import org.jboss.netty.buffer.ChannelBuffers;
25  import org.jboss.netty.channel.Channel;
26  import org.jboss.netty.channel.ChannelConfig;
27  import org.jboss.netty.channel.ChannelFuture;
28  import org.jboss.netty.channel.ChannelFutureListener;
29  import org.jboss.netty.channel.ChannelHandlerContext;
30  import org.jboss.netty.channel.ChannelStateEvent;
31  import org.jboss.netty.channel.Channels;
32  import org.jboss.netty.channel.MessageEvent;
33  import org.jboss.netty.channel.SimpleChannelHandler;
34  import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
35  import org.jboss.netty.util.HashedWheelTimer;
36  import org.jboss.netty.util.internal.LinkedTransferQueue;
37  
38  /**
39   * Emulates buffered write operation.  This handler stores all write requests
40   * into an unbounded {@link Queue} and flushes them to the downstream when
41   * {@link #flush()} method is called.
42   * <p>
43   * Here is an example that demonstrates the usage:
44   * <pre>
45   * BufferedWriteHandler bufferedWriter = new BufferedWriteHandler();
46   * ChannelPipeline p = ...;
47   * p.addFirst("buffer", bufferedWriter);
48   *
49   * ...
50   *
51   * Channel ch = ...;
52   *
53   * // msg1, 2, and 3 are stored in the queue of bufferedWriter.
54   * ch.write(msg1);
55   * ch.write(msg2);
56   * ch.write(msg3);
57   *
58   * // and will be flushed on request.
59   * bufferedWriter.flush();
60   * </pre>
61   *
62   * <h3>Auto-flush</h3>
63   * The write request queue is automatically flushed when the associated
64   * {@link Channel} is disconnected or closed.  However, it does not flush the
65   * queue otherwise.  It means you have to call {@link #flush()} before the size
66   * of the queue increases too much.  You can implement your own auto-flush
67   * strategy by extending this handler:
68   * <pre>
69   * public class AutoFlusher extends {@link BufferedWriteHandler} {
70   *
71   *     private final AtomicLong bufferSize = new AtomicLong();
72   *
73   *     {@literal @Override}
74   *     public void writeRequested({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) {
75   *         super.writeRequested(ctx, e);
76   *
77   *         {@link ChannelBuffer} data = ({@link ChannelBuffer}) e.getMessage();
78   *         int newBufferSize = bufferSize.addAndGet(data.readableBytes());
79   *
80   *         // Flush the queue if it gets larger than 8KiB.
81   *         if (newBufferSize > 8192) {
82   *             flush();
83   *             bufferSize.set(0);
84   *         }
85   *     }
86   * }
87   * </pre>
88   *
89   * <h3>Consolidate on flush</h3>
90   *
91   * If there are two or more write requests in the queue and all their message
92   * type is {@link ChannelBuffer}, they can be merged into a single write request
93   * to save the number of system calls.
94   * <pre>
95   * BEFORE consolidation:            AFTER consolidation:
96   * +-------+-------+-------+        +-------------+
97   * | Req C | Req B | Req A |------\\| Request ABC |
98   * | "789" | "456" | "123" |------//| "123456789" |
99   * +-------+-------+-------+        +-------------+
100  * </pre>
101  * This feature is disabled by default.  You can override the default when you
102  * create this handler or call {@link #flush(boolean)}.  If you specified
103  * {@code true} when you call the constructor, calling {@link #flush()} will
104  * always consolidate the queue.  Otherwise, you have to call
105  * {@link #flush(boolean)} with {@code true} to enable this feature for each
106  * flush.
107  * <p>
108  * The disadvantage of consolidation is that the {@link ChannelFuture} and its
109  * {@link ChannelFutureListener}s associated with the original write requests
110  * might be notified later than when they are actually written out.  They will
111  * always be notified when the consolidated write request is fully written.
112  * <p>
113  * The following example implements the consolidation strategy that reduces
114  * the number of write requests based on the writability of a channel:
115  * <pre>
116  * public class ConsolidatingAutoFlusher extends {@link BufferedWriteHandler} {
117  *
118  *     public ConsolidatingAutoFlusher() {
119  *         // Enable consolidation by default.
120  *         super(true);
121  *     }
122  *
123  *     {@literal @Override}
124  *     public void channelOpen({@link ChannelHandlerContext} ctx, {@link ChannelStateEvent} e) throws Exception {
125  *         {@link ChannelConfig} cfg = e.getChannel().getConfig();
126  *         if (cfg instanceof {@link NioSocketChannelConfig}) {
127  *             // Lower the watermark to increase the chance of consolidation.
128  *             cfg.setWriteBufferLowWaterMark(0);
129  *         }
130  *         super.channelOpen(e);
131  *     }
132  *
133  *     {@literal @Override}
134  *     public void writeRequested({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) throws Exception {
135  *         super.writeRequested(ctx, et);
136  *         if (e.getChannel().isWritable()) {
137  *             flush();
138  *         }
139  *     }
140  *
141  *     {@literal @Override}
142  *     public void channelInterestChanged({@link ChannelHandlerContext} ctx, {@link ChannelStateEvent} e) throws Exception {
143  *         if (e.getChannel().isWritable()) {
144  *             flush();
145  *         }
146  *     }
147  * }
148  * </pre>
149  *
150  * <h3>Prioritized Writes</h3>
151  *
152  * You can implement prioritized writes by specifying an unbounded priority
153  * queue in the constructor of this handler.  It will be required to design
154  * the proper strategy to determine how often {@link #flush()} should be called.
155  * For example, you could call {@link #flush()} periodically, using
156  * {@link HashedWheelTimer} every second.
157  *
158  * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
159  * @author <a href="http://gleamynode.net/">Trustin Lee</a>
160  * @version $Rev: 2243 $, $Date: 2010-04-16 14:01:55 +0900 (Fri, 16 Apr 2010) $
161  *
162  * @apiviz.landmark
163  */
164 public class BufferedWriteHandler extends SimpleChannelHandler {
165 
166     private final Queue<MessageEvent> queue;
167     private final boolean consolidateOnFlush;
168     private volatile ChannelHandlerContext ctx;
169 
170     /**
171      * Creates a new instance with the default unbounded {@link BlockingQueue}
172      * implementation and without buffer consolidation.
173      */
174     public BufferedWriteHandler() {
175         this(false);
176     }
177 
178     /**
179      * Creates a new instance with the specified thread-safe unbounded
180      * {@link Queue} and without buffer consolidation.  Please note that
181      * specifying a bounded {@link Queue} or a thread-unsafe {@link Queue} will
182      * result in an unspecified behavior.
183      */
184     public BufferedWriteHandler(Queue<MessageEvent> queue) {
185         this(queue, false);
186     }
187 
188     /**
189      * Creates a new instance with the default unbounded {@link BlockingQueue}
190      * implementation.
191      *
192      * @param consolidateOnFlush
193      *        {@code true} if and only if the buffered write requests are merged
194      *        into a single write request on {@link #flush()}
195      */
196     public BufferedWriteHandler(boolean consolidateOnFlush) {
197         this(new LinkedTransferQueue<MessageEvent>(), consolidateOnFlush);
198     }
199 
200     /**
201      * Creates a new instance with the specified thread-safe unbounded
202      * {@link Queue}.  Please note that specifying a bounded {@link Queue} or
203      * a thread-unsafe {@link Queue} will result in an unspecified behavior.
204      *
205      * @param consolidateOnFlush
206      *        {@code true} if and only if the buffered write requests are merged
207      *        into a single write request on {@link #flush()}
208      */
209     public BufferedWriteHandler(Queue<MessageEvent> queue, boolean consolidateOnFlush) {
210         if (queue == null) {
211             throw new NullPointerException("queue");
212         }
213         this.queue = queue;
214         this.consolidateOnFlush = consolidateOnFlush;
215     }
216 
217     public boolean isConsolidateOnFlush() {
218         return consolidateOnFlush;
219     }
220 
221     /**
222      * Returns the queue which stores the write requests.  The default
223      * implementation returns the queue which was specified in the constructor.
224      */
225     protected Queue<MessageEvent> getQueue() {
226         return queue;
227     }
228 
229     /**
230      * Sends the queued write requests to the downstream.
231      */
232     public void flush() {
233         flush(consolidateOnFlush);
234     }
235 
236     /**
237      * Sends the queued write requests to the downstream.
238      *
239      * @param consolidateOnFlush
240      *        {@code true} if and only if the buffered write requests are merged
241      *        into a single write request
242      */
243     public void flush(boolean consolidateOnFlush) {
244         final ChannelHandlerContext ctx = this.ctx;
245         if (ctx == null) {
246             // No write request was made.
247             return;
248         }
249 
250         final Queue<MessageEvent> queue = getQueue();
251         if (consolidateOnFlush) {
252             if (queue.isEmpty()) {
253                 return;
254             }
255 
256             List<MessageEvent> pendingWrites = new ArrayList<MessageEvent>();
257             synchronized (this) {
258                 for (;;) {
259                     MessageEvent e = queue.poll();
260                     if (e == null) {
261                         break;
262                     }
263                     if (!(e.getMessage() instanceof ChannelBuffer)) {
264                         if ((pendingWrites = consolidatedWrite(pendingWrites)) == null) {
265                             pendingWrites = new ArrayList<MessageEvent>();
266                         }
267                         ctx.sendDownstream(e);
268                     } else {
269                         pendingWrites.add(e);
270                     }
271                 }
272                 consolidatedWrite(pendingWrites);
273             }
274         } else {
275             synchronized (this) {
276                 for (;;) {
277                     MessageEvent e = queue.poll();
278                     if (e == null) {
279                         break;
280                     }
281                     ctx.sendDownstream(e);
282                 }
283             }
284         }
285     }
286 
287     private List<MessageEvent> consolidatedWrite(final List<MessageEvent> pendingWrites) {
288         final int size = pendingWrites.size();
289         if (size == 1) {
290             ctx.sendDownstream(pendingWrites.remove(0));
291             return pendingWrites;
292         } else if (size == 0) {
293             return pendingWrites;
294         }
295 
296         ChannelBuffer[] data = new ChannelBuffer[size];
297         for (int i = 0; i < data.length; i ++) {
298             data[i] = (ChannelBuffer) pendingWrites.get(i).getMessage();
299         }
300 
301         ChannelBuffer composite = ChannelBuffers.wrappedBuffer(data);
302         ChannelFuture future = Channels.future(ctx.getChannel());
303         future.addListener(new ChannelFutureListener() {
304             public void operationComplete(ChannelFuture future)
305                     throws Exception {
306                 if (future.isSuccess()) {
307                     for (MessageEvent e: pendingWrites) {
308                         e.getFuture().setSuccess();
309                     }
310                 } else {
311                     Throwable cause = future.getCause();
312                     for (MessageEvent e: pendingWrites) {
313                         e.getFuture().setFailure(cause);
314                     }
315                 }
316             }
317         });
318 
319         Channels.write(ctx, future, composite);
320         return null;
321     }
322 
323     /**
324      * Stores all write requests to the queue so that they are actually written
325      * on {@link #flush()}.
326      */
327     @Override
328     public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
329             throws Exception {
330         if (this.ctx == null) {
331             this.ctx = ctx;
332         } else {
333             assert this.ctx == ctx;
334         }
335 
336         getQueue().add(e);
337     }
338 
339     @Override
340     public void disconnectRequested(ChannelHandlerContext ctx,
341             ChannelStateEvent e) throws Exception {
342         try {
343             flush(consolidateOnFlush);
344         } finally {
345             ctx.sendDownstream(e);
346         }
347     }
348 
349     @Override
350     public void closeRequested(ChannelHandlerContext ctx, ChannelStateEvent e)
351             throws Exception {
352         try {
353             flush(consolidateOnFlush);
354         } finally {
355             ctx.sendDownstream(e);
356         }
357     }
358 }