1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.stream;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.nio.channels.ClosedChannelException;
21 import java.util.Queue;
22
23 import org.jboss.netty.buffer.ChannelBuffers;
24 import org.jboss.netty.channel.Channel;
25 import org.jboss.netty.channel.ChannelDownstreamHandler;
26 import org.jboss.netty.channel.ChannelEvent;
27 import org.jboss.netty.channel.ChannelFuture;
28 import org.jboss.netty.channel.ChannelFutureListener;
29 import org.jboss.netty.channel.ChannelHandler;
30 import org.jboss.netty.channel.ChannelHandlerContext;
31 import org.jboss.netty.channel.ChannelPipeline;
32 import org.jboss.netty.channel.ChannelStateEvent;
33 import org.jboss.netty.channel.ChannelUpstreamHandler;
34 import org.jboss.netty.channel.Channels;
35 import org.jboss.netty.channel.MessageEvent;
36 import org.jboss.netty.logging.InternalLogger;
37 import org.jboss.netty.logging.InternalLoggerFactory;
38 import org.jboss.netty.util.internal.LinkedTransferQueue;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler {
79
80 private static final InternalLogger logger =
81 InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
82
83 private final Queue<MessageEvent> queue =
84 new LinkedTransferQueue<MessageEvent>();
85
86 private ChannelHandlerContext ctx;
87 private MessageEvent currentEvent;
88
89
90
91
92 public ChunkedWriteHandler() {
93 super();
94 }
95
96
97
98
99 public void resumeTransfer() {
100 ChannelHandlerContext ctx = this.ctx;
101 if (ctx == null) {
102 return;
103 }
104
105 try {
106 flush(ctx);
107 } catch (Exception e) {
108 logger.warn("Unexpected exception while sending chunks.", e);
109 }
110 }
111
112 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
113 throws Exception {
114 if (!(e instanceof MessageEvent)) {
115 ctx.sendDownstream(e);
116 return;
117 }
118
119 boolean offered = queue.offer((MessageEvent) e);
120 assert offered;
121
122 final Channel channel = ctx.getChannel();
123 if (channel.isWritable()) {
124 this.ctx = ctx;
125 flush(ctx);
126 } else if (!channel.isConnected()) {
127 this.ctx = ctx;
128 discard(ctx);
129 }
130 }
131
132 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
133 throws Exception {
134 if (e instanceof ChannelStateEvent) {
135 ChannelStateEvent cse = (ChannelStateEvent) e;
136 switch (cse.getState()) {
137 case INTEREST_OPS:
138
139 flush(ctx);
140 break;
141 case OPEN:
142 if (!Boolean.TRUE.equals(cse.getValue())) {
143
144 discard(ctx);
145 }
146 break;
147 }
148 }
149 ctx.sendUpstream(e);
150 }
151
152 private void discard(ChannelHandlerContext ctx) {
153 ClosedChannelException cause = null;
154 boolean fireExceptionCaught = false;
155
156 for (;;) {
157 MessageEvent currentEvent = this.currentEvent;
158
159 if (this.currentEvent == null) {
160 currentEvent = queue.poll();
161 } else {
162 this.currentEvent = null;
163 }
164
165 if (currentEvent == null) {
166 break;
167 }
168
169
170 Object m = currentEvent.getMessage();
171 if (m instanceof ChunkedInput) {
172 closeInput((ChunkedInput) m);
173 }
174
175
176 if (cause == null) {
177 cause = new ClosedChannelException();
178 }
179 currentEvent.getFuture().setFailure(cause);
180 fireExceptionCaught = true;
181
182 currentEvent = null;
183 }
184
185
186 if (fireExceptionCaught) {
187 Channels.fireExceptionCaught(ctx.getChannel(), cause);
188 }
189 }
190
191 private synchronized void flush(ChannelHandlerContext ctx) throws Exception {
192 final Channel channel = ctx.getChannel();
193 if (!channel.isConnected()) {
194 discard(ctx);
195 }
196
197 while (channel.isWritable()) {
198 if (currentEvent == null) {
199 currentEvent = queue.poll();
200 }
201
202 if (currentEvent == null) {
203 break;
204 }
205
206 if (currentEvent.getFuture().isDone()) {
207
208
209 currentEvent = null;
210 } else {
211 final MessageEvent currentEvent = this.currentEvent;
212 Object m = currentEvent.getMessage();
213 if (m instanceof ChunkedInput) {
214 ChunkedInput chunks = (ChunkedInput) m;
215 Object chunk;
216 boolean endOfInput;
217 boolean suspend;
218 try {
219 chunk = chunks.nextChunk();
220 endOfInput = chunks.isEndOfInput();
221 if (chunk == null) {
222 chunk = ChannelBuffers.EMPTY_BUFFER;
223
224 suspend = !endOfInput;
225 } else {
226 suspend = false;
227 }
228 } catch (Throwable t) {
229 this.currentEvent = null;
230
231 currentEvent.getFuture().setFailure(t);
232 fireExceptionCaught(ctx, t);
233
234 closeInput(chunks);
235 break;
236 }
237
238 if (suspend) {
239
240
241
242 break;
243 } else {
244 ChannelFuture writeFuture;
245 if (endOfInput) {
246 this.currentEvent = null;
247 closeInput(chunks);
248 writeFuture = currentEvent.getFuture();
249 } else {
250 writeFuture = future(channel);
251 writeFuture.addListener(new ChannelFutureListener() {
252 public void operationComplete(ChannelFuture future)
253 throws Exception {
254 if (!future.isSuccess()) {
255 currentEvent.getFuture().setFailure(future.getCause());
256 closeInput((ChunkedInput) currentEvent.getMessage());
257 }
258 }
259 });
260 }
261
262 Channels.write(
263 ctx, writeFuture, chunk,
264 currentEvent.getRemoteAddress());
265 }
266 } else {
267 this.currentEvent = null;
268 ctx.sendDownstream(currentEvent);
269 }
270 }
271
272 if (!channel.isConnected()) {
273 discard(ctx);
274 break;
275 }
276 }
277 }
278
279 static void closeInput(ChunkedInput chunks) {
280 try {
281 chunks.close();
282 } catch (Throwable t) {
283 logger.warn("Failed to close a chunked input.", t);
284 }
285 }
286 }