View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.stream;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.nio.channels.ClosedChannelException;
21  import java.util.Queue;
22  
23  import org.jboss.netty.buffer.ChannelBuffers;
24  import org.jboss.netty.channel.Channel;
25  import org.jboss.netty.channel.ChannelDownstreamHandler;
26  import org.jboss.netty.channel.ChannelEvent;
27  import org.jboss.netty.channel.ChannelFuture;
28  import org.jboss.netty.channel.ChannelFutureListener;
29  import org.jboss.netty.channel.ChannelHandler;
30  import org.jboss.netty.channel.ChannelHandlerContext;
31  import org.jboss.netty.channel.ChannelPipeline;
32  import org.jboss.netty.channel.ChannelStateEvent;
33  import org.jboss.netty.channel.ChannelUpstreamHandler;
34  import org.jboss.netty.channel.Channels;
35  import org.jboss.netty.channel.MessageEvent;
36  import org.jboss.netty.logging.InternalLogger;
37  import org.jboss.netty.logging.InternalLoggerFactory;
38  import org.jboss.netty.util.internal.LinkedTransferQueue;
39  
40  /**
41   * A {@link ChannelHandler} that adds support for writing a large data stream
42   * asynchronously neither spending a lot of memory nor getting
43   * {@link java.lang.OutOfMemoryError}.  Large data streaming such as file
44   * transfer requires complicated state management in a {@link ChannelHandler}
45   * implementation.  {@link ChunkedWriteHandler} manages such complicated states
46   * so that you can send a large data stream without difficulties.
47   * <p>
48   * To use {@link ChunkedWriteHandler} in your application, you have to insert
49   * a new {@link ChunkedWriteHandler} instance:
50   * <pre>
51   * {@link ChannelPipeline} p = ...;
52   * p.addLast("streamer", <b>new {@link ChunkedWriteHandler}()</b>);
53   * p.addLast("handler", new MyHandler());
54   * </pre>
55   * Once inserted, you can write a {@link ChunkedInput} so that the
56   * {@link ChunkedWriteHandler} can pick it up and fetch the content of the
57   * stream chunk by chunk and write the fetched chunk downstream:
58   * <pre>
59   * {@link Channel} ch = ...;
60   * ch.write(new {@link ChunkedFile}(new File("video.mkv"));
61   * </pre>
62   *
63   * <h3>Sending a stream which generates a chunk intermittently</h3>
64   *
65   * Some {@link ChunkedInput} generates a chunk on a certain event or timing.
66   * Such {@link ChunkedInput} implementation often returns {@code null} on
67   * {@link ChunkedInput#nextChunk()}, resulting in the indefinitely suspended
68   * transfer.  To resume the transfer when a new chunk is available, you have to
69   * call {@link #resumeTransfer()}.
70   *
71   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
72   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
73   * @version $Rev: 2233 $, $Date: 2010-04-06 17:43:49 +0900 (Tue, 06 Apr 2010) $
74   *
75   * @apiviz.landmark
76   * @apiviz.has org.jboss.netty.handler.stream.ChunkedInput oneway - - reads from
77   */
78  public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler {
79  
80      private static final InternalLogger logger =
81          InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
82  
83      private final Queue<MessageEvent> queue =
84          new LinkedTransferQueue<MessageEvent>();
85  
86      private ChannelHandlerContext ctx;
87      private MessageEvent currentEvent;
88  
89      /**
90       * Creates a new instance.
91       */
92      public ChunkedWriteHandler() {
93          super();
94      }
95  
96      /**
97       * Continues to fetch the chunks from the input.
98       */
99      public void resumeTransfer() {
100         ChannelHandlerContext ctx = this.ctx;
101         if (ctx == null) {
102             return;
103         }
104 
105         try {
106             flush(ctx);
107         } catch (Exception e) {
108             logger.warn("Unexpected exception while sending chunks.", e);
109         }
110     }
111 
112     public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
113             throws Exception {
114         if (!(e instanceof MessageEvent)) {
115             ctx.sendDownstream(e);
116             return;
117         }
118 
119         boolean offered = queue.offer((MessageEvent) e);
120         assert offered;
121 
122         final Channel channel = ctx.getChannel();
123         if (channel.isWritable()) {
124             this.ctx = ctx;
125             flush(ctx);
126         } else if (!channel.isConnected()) {
127             this.ctx = ctx;
128             discard(ctx);
129         }
130     }
131 
132     public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
133             throws Exception {
134         if (e instanceof ChannelStateEvent) {
135             ChannelStateEvent cse = (ChannelStateEvent) e;
136             switch (cse.getState()) {
137             case INTEREST_OPS:
138                 // Continue writing when the channel becomes writable.
139                 flush(ctx);
140                 break;
141             case OPEN:
142                 if (!Boolean.TRUE.equals(cse.getValue())) {
143                     // Fail all pending writes
144                     discard(ctx);
145                 }
146                 break;
147             }
148         }
149         ctx.sendUpstream(e);
150     }
151 
152     private void discard(ChannelHandlerContext ctx) {
153         ClosedChannelException cause = null;
154         boolean fireExceptionCaught = false;
155            
156         for (;;) {
157             MessageEvent currentEvent = this.currentEvent;
158                 
159             if (this.currentEvent == null) { 
160                 currentEvent = queue.poll(); 
161             } else {
162                 this.currentEvent = null; 
163             }
164 
165             if (currentEvent == null) { 
166                 break; 
167             }
168             
169               
170             Object m = currentEvent.getMessage();
171             if (m instanceof ChunkedInput) {
172                 closeInput((ChunkedInput) m);
173             }
174 
175             // Trigger a ClosedChannelException
176             if (cause == null) {
177                 cause = new ClosedChannelException();
178             }
179             currentEvent.getFuture().setFailure(cause);
180             fireExceptionCaught = true;
181 
182             currentEvent = null;
183         }
184         
185 
186         if (fireExceptionCaught) {
187             Channels.fireExceptionCaught(ctx.getChannel(), cause);
188         }
189     }
190 
191     private synchronized void flush(ChannelHandlerContext ctx) throws Exception {
192         final Channel channel = ctx.getChannel();
193         if (!channel.isConnected()) {
194             discard(ctx);
195         }
196 
197         while (channel.isWritable()) {
198             if (currentEvent == null) {
199                 currentEvent = queue.poll();
200             }
201 
202             if (currentEvent == null) {
203                 break;
204             }
205 
206             if (currentEvent.getFuture().isDone()) {
207                 // Skip the current request because the previous partial write
208                 // attempt for the current request has been failed.
209                 currentEvent = null;
210             } else {
211                 final MessageEvent currentEvent = this.currentEvent;
212                 Object m = currentEvent.getMessage();
213                 if (m instanceof ChunkedInput) {
214                     ChunkedInput chunks = (ChunkedInput) m;
215                     Object chunk;
216                     boolean endOfInput;
217                     boolean suspend;
218                     try {
219                         chunk = chunks.nextChunk();
220                         endOfInput = chunks.isEndOfInput();
221                         if (chunk == null) {
222                             chunk = ChannelBuffers.EMPTY_BUFFER;
223                             // No need to suspend when reached at the end.
224                             suspend = !endOfInput;
225                         } else {
226                             suspend = false;
227                         }
228                     } catch (Throwable t) {
229                         this.currentEvent = null;
230 
231                         currentEvent.getFuture().setFailure(t);
232                         fireExceptionCaught(ctx, t);
233 
234                         closeInput(chunks);
235                         break;
236                     }
237 
238                     if (suspend) {
239                         // ChunkedInput.nextChunk() returned null and it has
240                         // not reached at the end of input.  Let's wait until
241                         // more chunks arrive.  Nothing to write or notify.
242                         break;
243                     } else {
244                         ChannelFuture writeFuture;
245                         if (endOfInput) {
246                             this.currentEvent = null;
247                             closeInput(chunks);
248                             writeFuture = currentEvent.getFuture();
249                         } else {
250                             writeFuture = future(channel);
251                             writeFuture.addListener(new ChannelFutureListener() {
252                                 public void operationComplete(ChannelFuture future)
253                                         throws Exception {
254                                     if (!future.isSuccess()) {
255                                         currentEvent.getFuture().setFailure(future.getCause());
256                                         closeInput((ChunkedInput) currentEvent.getMessage());
257                                     }
258                                 }
259                             });
260                         }
261 
262                         Channels.write(
263                                 ctx, writeFuture, chunk,
264                                 currentEvent.getRemoteAddress());
265                     }
266                 } else {
267                     this.currentEvent = null;
268                     ctx.sendDownstream(currentEvent);
269                 }
270             }
271 
272             if (!channel.isConnected()) {
273                 discard(ctx);
274                 break;
275             }
276         }
277     }
278 
279     static void closeInput(ChunkedInput chunks) {
280         try {
281             chunks.close();
282         } catch (Throwable t) {
283             logger.warn("Failed to close a chunked input.", t);
284         }
285     }
286 }