1 /*
2 * Copyright 2009 Red Hat, Inc.
3 *
4 * Red Hat licenses this file to you under the Apache License, version 2.0
5 * (the "License"); you may not use this file except in compliance with the
6 * License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.queue;
17
18 import java.util.ArrayList;
19 import java.util.List;
20 import java.util.Queue;
21 import java.util.concurrent.BlockingQueue;
22
23 import org.jboss.netty.buffer.ChannelBuffer;
24 import org.jboss.netty.buffer.ChannelBuffers;
25 import org.jboss.netty.channel.Channel;
26 import org.jboss.netty.channel.ChannelConfig;
27 import org.jboss.netty.channel.ChannelFuture;
28 import org.jboss.netty.channel.ChannelFutureListener;
29 import org.jboss.netty.channel.ChannelHandlerContext;
30 import org.jboss.netty.channel.ChannelStateEvent;
31 import org.jboss.netty.channel.Channels;
32 import org.jboss.netty.channel.MessageEvent;
33 import org.jboss.netty.channel.SimpleChannelHandler;
34 import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
35 import org.jboss.netty.util.HashedWheelTimer;
36 import org.jboss.netty.util.internal.LinkedTransferQueue;
37
38 /**
39 * Emulates buffered write operation. This handler stores all write requests
40 * into an unbounded {@link Queue} and flushes them to the downstream when
41 * {@link #flush()} method is called.
42 * <p>
43 * Here is an example that demonstrates the usage:
44 * <pre>
45 * BufferedWriteHandler bufferedWriter = new BufferedWriteHandler();
46 * ChannelPipeline p = ...;
47 * p.addFirst("buffer", bufferedWriter);
48 *
49 * ...
50 *
51 * Channel ch = ...;
52 *
53 * // msg1, 2, and 3 are stored in the queue of bufferedWriter.
54 * ch.write(msg1);
55 * ch.write(msg2);
56 * ch.write(msg3);
57 *
58 * // and will be flushed on request.
59 * bufferedWriter.flush();
60 * </pre>
61 *
62 * <h3>Auto-flush</h3>
63 * The write request queue is automatically flushed when the associated
64 * {@link Channel} is disconnected or closed. However, it does not flush the
65 * queue otherwise. It means you have to call {@link #flush()} before the size
66 * of the queue increases too much. You can implement your own auto-flush
67 * strategy by extending this handler:
68 * <pre>
69 * public class AutoFlusher extends {@link BufferedWriteHandler} {
70 *
71 * private final AtomicLong bufferSize = new AtomicLong();
72 *
73 * {@literal @Override}
74 * public void writeRequested({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) {
75 * super.writeRequested(ctx, e);
76 *
77 * {@link ChannelBuffer} data = ({@link ChannelBuffer}) e.getMessage();
78 * int newBufferSize = bufferSize.addAndGet(data.readableBytes());
79 *
80 * // Flush the queue if it gets larger than 8KiB.
81 * if (newBufferSize > 8192) {
82 * flush();
83 * bufferSize.set(0);
84 * }
85 * }
86 * }
87 * </pre>
88 *
89 * <h3>Consolidate on flush</h3>
90 *
91 * If there are two or more write requests in the queue and all their message
92 * type is {@link ChannelBuffer}, they can be merged into a single write request
93 * to save the number of system calls.
94 * <pre>
95 * BEFORE consolidation: AFTER consolidation:
96 * +-------+-------+-------+ +-------------+
97 * | Req C | Req B | Req A |------\\| Request ABC |
98 * | "789" | "456" | "123" |------//| "123456789" |
99 * +-------+-------+-------+ +-------------+
100 * </pre>
101 * This feature is disabled by default. You can override the default when you
102 * create this handler or call {@link #flush(boolean)}. If you specified
103 * {@code true} when you call the constructor, calling {@link #flush()} will
104 * always consolidate the queue. Otherwise, you have to call
105 * {@link #flush(boolean)} with {@code true} to enable this feature for each
106 * flush.
107 * <p>
108 * The disadvantage of consolidation is that the {@link ChannelFuture} and its
109 * {@link ChannelFutureListener}s associated with the original write requests
110 * might be notified later than when they are actually written out. They will
111 * always be notified when the consolidated write request is fully written.
112 * <p>
113 * The following example implements the consolidation strategy that reduces
114 * the number of write requests based on the writability of a channel:
115 * <pre>
116 * public class ConsolidatingAutoFlusher extends {@link BufferedWriteHandler} {
117 *
118 * public ConsolidatingAutoFlusher() {
119 * // Enable consolidation by default.
120 * super(true);
121 * }
122 *
123 * {@literal @Override}
124 * public void channelOpen({@link ChannelHandlerContext} ctx, {@link ChannelStateEvent} e) throws Exception {
125 * {@link ChannelConfig} cfg = e.getChannel().getConfig();
126 * if (cfg instanceof {@link NioSocketChannelConfig}) {
127 * // Lower the watermark to increase the chance of consolidation.
128 * cfg.setWriteBufferLowWaterMark(0);
129 * }
130 * super.channelOpen(e);
131 * }
132 *
133 * {@literal @Override}
134 * public void writeRequested({@link ChannelHandlerContext} ctx, {@link MessageEvent} e) throws Exception {
135 * super.writeRequested(ctx, et);
136 * if (e.getChannel().isWritable()) {
137 * flush();
138 * }
139 * }
140 *
141 * {@literal @Override}
142 * public void channelInterestChanged({@link ChannelHandlerContext} ctx, {@link ChannelStateEvent} e) throws Exception {
143 * if (e.getChannel().isWritable()) {
144 * flush();
145 * }
146 * }
147 * }
148 * </pre>
149 *
150 * <h3>Prioritized Writes</h3>
151 *
152 * You can implement prioritized writes by specifying an unbounded priority
153 * queue in the constructor of this handler. It will be required to design
154 * the proper strategy to determine how often {@link #flush()} should be called.
155 * For example, you could call {@link #flush()} periodically, using
156 * {@link HashedWheelTimer} every second.
157 *
158 * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
159 * @author <a href="http://gleamynode.net/">Trustin Lee</a>
160 * @version $Rev: 2243 $, $Date: 2010-04-16 14:01:55 +0900 (Fri, 16 Apr 2010) $
161 *
162 * @apiviz.landmark
163 */
164 public class BufferedWriteHandler extends SimpleChannelHandler {
165
166 private final Queue<MessageEvent> queue;
167 private final boolean consolidateOnFlush;
168 private volatile ChannelHandlerContext ctx;
169
170 /**
171 * Creates a new instance with the default unbounded {@link BlockingQueue}
172 * implementation and without buffer consolidation.
173 */
174 public BufferedWriteHandler() {
175 this(false);
176 }
177
178 /**
179 * Creates a new instance with the specified thread-safe unbounded
180 * {@link Queue} and without buffer consolidation. Please note that
181 * specifying a bounded {@link Queue} or a thread-unsafe {@link Queue} will
182 * result in an unspecified behavior.
183 */
184 public BufferedWriteHandler(Queue<MessageEvent> queue) {
185 this(queue, false);
186 }
187
188 /**
189 * Creates a new instance with the default unbounded {@link BlockingQueue}
190 * implementation.
191 *
192 * @param consolidateOnFlush
193 * {@code true} if and only if the buffered write requests are merged
194 * into a single write request on {@link #flush()}
195 */
196 public BufferedWriteHandler(boolean consolidateOnFlush) {
197 this(new LinkedTransferQueue<MessageEvent>(), consolidateOnFlush);
198 }
199
200 /**
201 * Creates a new instance with the specified thread-safe unbounded
202 * {@link Queue}. Please note that specifying a bounded {@link Queue} or
203 * a thread-unsafe {@link Queue} will result in an unspecified behavior.
204 *
205 * @param consolidateOnFlush
206 * {@code true} if and only if the buffered write requests are merged
207 * into a single write request on {@link #flush()}
208 */
209 public BufferedWriteHandler(Queue<MessageEvent> queue, boolean consolidateOnFlush) {
210 if (queue == null) {
211 throw new NullPointerException("queue");
212 }
213 this.queue = queue;
214 this.consolidateOnFlush = consolidateOnFlush;
215 }
216
217 public boolean isConsolidateOnFlush() {
218 return consolidateOnFlush;
219 }
220
221 /**
222 * Returns the queue which stores the write requests. The default
223 * implementation returns the queue which was specified in the constructor.
224 */
225 protected Queue<MessageEvent> getQueue() {
226 return queue;
227 }
228
229 /**
230 * Sends the queued write requests to the downstream.
231 */
232 public void flush() {
233 flush(consolidateOnFlush);
234 }
235
236 /**
237 * Sends the queued write requests to the downstream.
238 *
239 * @param consolidateOnFlush
240 * {@code true} if and only if the buffered write requests are merged
241 * into a single write request
242 */
243 public void flush(boolean consolidateOnFlush) {
244 final ChannelHandlerContext ctx = this.ctx;
245 if (ctx == null) {
246 // No write request was made.
247 return;
248 }
249
250 final Queue<MessageEvent> queue = getQueue();
251 if (consolidateOnFlush) {
252 if (queue.isEmpty()) {
253 return;
254 }
255
256 List<MessageEvent> pendingWrites = new ArrayList<MessageEvent>();
257 synchronized (this) {
258 for (;;) {
259 MessageEvent e = queue.poll();
260 if (e == null) {
261 break;
262 }
263 if (!(e.getMessage() instanceof ChannelBuffer)) {
264 if ((pendingWrites = consolidatedWrite(pendingWrites)) == null) {
265 pendingWrites = new ArrayList<MessageEvent>();
266 }
267 ctx.sendDownstream(e);
268 } else {
269 pendingWrites.add(e);
270 }
271 }
272 consolidatedWrite(pendingWrites);
273 }
274 } else {
275 synchronized (this) {
276 for (;;) {
277 MessageEvent e = queue.poll();
278 if (e == null) {
279 break;
280 }
281 ctx.sendDownstream(e);
282 }
283 }
284 }
285 }
286
287 private List<MessageEvent> consolidatedWrite(final List<MessageEvent> pendingWrites) {
288 final int size = pendingWrites.size();
289 if (size == 1) {
290 ctx.sendDownstream(pendingWrites.remove(0));
291 return pendingWrites;
292 } else if (size == 0) {
293 return pendingWrites;
294 }
295
296 ChannelBuffer[] data = new ChannelBuffer[size];
297 for (int i = 0; i < data.length; i ++) {
298 data[i] = (ChannelBuffer) pendingWrites.get(i).getMessage();
299 }
300
301 ChannelBuffer composite = ChannelBuffers.wrappedBuffer(data);
302 ChannelFuture future = Channels.future(ctx.getChannel());
303 future.addListener(new ChannelFutureListener() {
304 public void operationComplete(ChannelFuture future)
305 throws Exception {
306 if (future.isSuccess()) {
307 for (MessageEvent e: pendingWrites) {
308 e.getFuture().setSuccess();
309 }
310 } else {
311 Throwable cause = future.getCause();
312 for (MessageEvent e: pendingWrites) {
313 e.getFuture().setFailure(cause);
314 }
315 }
316 }
317 });
318
319 Channels.write(ctx, future, composite);
320 return null;
321 }
322
323 /**
324 * Stores all write requests to the queue so that they are actually written
325 * on {@link #flush()}.
326 */
327 @Override
328 public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
329 throws Exception {
330 if (this.ctx == null) {
331 this.ctx = ctx;
332 } else {
333 assert this.ctx == ctx;
334 }
335
336 getQueue().add(e);
337 }
338
339 @Override
340 public void disconnectRequested(ChannelHandlerContext ctx,
341 ChannelStateEvent e) throws Exception {
342 try {
343 flush(consolidateOnFlush);
344 } finally {
345 ctx.sendDownstream(e);
346 }
347 }
348
349 @Override
350 public void closeRequested(ChannelHandlerContext ctx, ChannelStateEvent e)
351 throws Exception {
352 try {
353 flush(consolidateOnFlush);
354 } finally {
355 ctx.sendDownstream(e);
356 }
357 }
358 }