1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.http;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.net.InetSocketAddress;
21 import java.net.SocketAddress;
22 import java.nio.channels.NotYetConnectedException;
23
24 import javax.net.ssl.SSLContext;
25 import javax.net.ssl.SSLEngine;
26
27 import org.jboss.netty.buffer.ChannelBuffer;
28 import org.jboss.netty.buffer.ChannelBuffers;
29 import org.jboss.netty.channel.AbstractChannel;
30 import org.jboss.netty.channel.ChannelException;
31 import org.jboss.netty.channel.ChannelFactory;
32 import org.jboss.netty.channel.ChannelFuture;
33 import org.jboss.netty.channel.ChannelFutureListener;
34 import org.jboss.netty.channel.ChannelHandlerContext;
35 import org.jboss.netty.channel.ChannelPipeline;
36 import org.jboss.netty.channel.ChannelSink;
37 import org.jboss.netty.channel.ChannelStateEvent;
38 import org.jboss.netty.channel.DefaultChannelPipeline;
39 import org.jboss.netty.channel.ExceptionEvent;
40 import org.jboss.netty.channel.MessageEvent;
41 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
42 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
43 import org.jboss.netty.channel.socket.SocketChannel;
44 import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
45 import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
46 import org.jboss.netty.handler.codec.http.HttpChunk;
47 import org.jboss.netty.handler.codec.http.HttpHeaders;
48 import org.jboss.netty.handler.codec.http.HttpMethod;
49 import org.jboss.netty.handler.codec.http.HttpRequest;
50 import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
51 import org.jboss.netty.handler.codec.http.HttpResponse;
52 import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
53 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
54 import org.jboss.netty.handler.codec.http.HttpVersion;
55 import org.jboss.netty.handler.ssl.SslHandler;
56
57
58
59
60
61
62
63 class HttpTunnelingClientSocketChannel extends AbstractChannel
64 implements org.jboss.netty.channel.socket.SocketChannel {
65
66 final HttpTunnelingSocketChannelConfig config;
67
68 volatile boolean requestHeaderWritten;
69
70 final Object interestOpsLock = new Object();
71
72 final SocketChannel realChannel;
73
74 private final HttpTunnelingClientSocketChannel.ServletChannelHandler handler = new ServletChannelHandler();
75
76 HttpTunnelingClientSocketChannel(
77 ChannelFactory factory,
78 ChannelPipeline pipeline,
79 ChannelSink sink, ClientSocketChannelFactory clientSocketChannelFactory) {
80
81 super(null, factory, pipeline, sink);
82
83 config = new HttpTunnelingSocketChannelConfig(this);
84 DefaultChannelPipeline channelPipeline = new DefaultChannelPipeline();
85 channelPipeline.addLast("decoder", new HttpResponseDecoder());
86 channelPipeline.addLast("encoder", new HttpRequestEncoder());
87 channelPipeline.addLast("handler", handler);
88 realChannel = clientSocketChannelFactory.newChannel(channelPipeline);
89
90 fireChannelOpen(this);
91 }
92
93 public HttpTunnelingSocketChannelConfig getConfig() {
94 return config;
95 }
96
97 public InetSocketAddress getLocalAddress() {
98 return realChannel.getLocalAddress();
99 }
100
101 public InetSocketAddress getRemoteAddress() {
102 return realChannel.getRemoteAddress();
103 }
104
105 public boolean isBound() {
106 return realChannel.isBound();
107 }
108
109 public boolean isConnected() {
110 return realChannel.isConnected();
111 }
112
113 @Override
114 public int getInterestOps() {
115 return realChannel.getInterestOps();
116 }
117
118 @Override
119 public boolean isWritable() {
120 return realChannel.isWritable();
121 }
122
123 @Override
124 protected boolean setClosed() {
125 return super.setClosed();
126 }
127
128 @Override
129 public ChannelFuture write(Object message, SocketAddress remoteAddress) {
130 if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
131 return super.write(message, null);
132 }
133 else {
134 return getUnsupportedOperationFuture();
135 }
136 }
137
138 void bindReal(final SocketAddress localAddress, final ChannelFuture future) {
139 realChannel.bind(localAddress).addListener(new ChannelFutureListener() {
140 public void operationComplete(ChannelFuture f) {
141 if (f.isSuccess()) {
142 future.setSuccess();
143 } else {
144 future.setFailure(f.getCause());
145 }
146 }
147 });
148 }
149
150 void connectReal(final SocketAddress remoteAddress, final ChannelFuture future) {
151 final SocketChannel virtualChannel = this;
152 realChannel.connect(remoteAddress).addListener(new ChannelFutureListener() {
153 public void operationComplete(ChannelFuture f) {
154 final String serverName = config.getServerName();
155 final int serverPort = ((InetSocketAddress) remoteAddress).getPort();
156 final String serverPath = config.getServerPath();
157
158 if (f.isSuccess()) {
159
160 SSLContext sslContext = config.getSslContext();
161 ChannelFuture sslHandshakeFuture = null;
162 if (sslContext != null) {
163
164 SSLEngine engine;
165 if (serverName != null) {
166 engine = sslContext.createSSLEngine(serverName, serverPort);
167 } else {
168 engine = sslContext.createSSLEngine();
169 }
170
171
172 engine.setUseClientMode(true);
173 engine.setEnableSessionCreation(config.isEnableSslSessionCreation());
174 String[] enabledCipherSuites = config.getEnabledSslCipherSuites();
175 if (enabledCipherSuites != null) {
176 engine.setEnabledCipherSuites(enabledCipherSuites);
177 }
178 String[] enabledProtocols = config.getEnabledSslProtocols();
179 if (enabledProtocols != null) {
180 engine.setEnabledProtocols(enabledProtocols);
181 }
182
183 SslHandler sslHandler = new SslHandler(engine);
184 realChannel.getPipeline().addFirst("ssl", sslHandler);
185 sslHandshakeFuture = sslHandler.handshake();
186 }
187
188
189 final HttpRequest req = new DefaultHttpRequest(
190 HttpVersion.HTTP_1_1, HttpMethod.POST, serverPath);
191 if (serverName != null) {
192 req.setHeader(HttpHeaders.Names.HOST, serverName);
193 }
194 req.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream");
195 req.setHeader(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
196 req.setHeader(HttpHeaders.Names.CONTENT_TRANSFER_ENCODING, HttpHeaders.Values.BINARY);
197 req.setHeader(HttpHeaders.Names.USER_AGENT, HttpTunnelingClientSocketChannel.class.getName());
198
199 if (sslHandshakeFuture == null) {
200 realChannel.write(req);
201 requestHeaderWritten = true;
202 future.setSuccess();
203 fireChannelConnected(virtualChannel, remoteAddress);
204 } else {
205 sslHandshakeFuture.addListener(new ChannelFutureListener() {
206 public void operationComplete(ChannelFuture f) {
207 if (f.isSuccess()) {
208 realChannel.write(req);
209 requestHeaderWritten = true;
210 future.setSuccess();
211 fireChannelConnected(virtualChannel, remoteAddress);
212 } else {
213 future.setFailure(f.getCause());
214 fireExceptionCaught(virtualChannel, f.getCause());
215 }
216 }
217 });
218 }
219 } else {
220 future.setFailure(f.getCause());
221 fireExceptionCaught(virtualChannel, f.getCause());
222 }
223 }
224 });
225 }
226
227 void writeReal(final ChannelBuffer a, final ChannelFuture future) {
228 if (!requestHeaderWritten) {
229 throw new NotYetConnectedException();
230 }
231
232 final int size = a.readableBytes();
233 final ChannelFuture f;
234
235 if (size == 0) {
236 f = realChannel.write(ChannelBuffers.EMPTY_BUFFER);
237 } else {
238 f = realChannel.write(new DefaultHttpChunk(a));
239 }
240
241 f.addListener(new ChannelFutureListener() {
242 public void operationComplete(ChannelFuture f) {
243 if (f.isSuccess()) {
244 future.setSuccess();
245 if (size != 0) {
246 fireWriteComplete(HttpTunnelingClientSocketChannel.this, size);
247 }
248 } else {
249 future.setFailure(f.getCause());
250 }
251 }
252 });
253 }
254
255 private ChannelFuture writeLastChunk() {
256 if (!requestHeaderWritten) {
257 return failedFuture(this, new NotYetConnectedException());
258 } else {
259 return realChannel.write(HttpChunk.LAST_CHUNK);
260 }
261 }
262
263 void setInterestOpsReal(final int interestOps, final ChannelFuture future) {
264 realChannel.setInterestOps(interestOps).addListener(new ChannelFutureListener() {
265 public void operationComplete(ChannelFuture f) {
266 if (f.isSuccess()) {
267 future.setSuccess();
268 } else {
269 future.setFailure(f.getCause());
270 }
271 }
272 });
273 }
274
275 void disconnectReal(final ChannelFuture future) {
276 writeLastChunk().addListener(new ChannelFutureListener() {
277 public void operationComplete(ChannelFuture f) {
278 realChannel.disconnect().addListener(new ChannelFutureListener() {
279 public void operationComplete(ChannelFuture f) {
280 if (f.isSuccess()) {
281 future.setSuccess();
282 } else {
283 future.setFailure(f.getCause());
284 }
285 }
286 });
287 }
288 });
289 }
290
291 void unbindReal(final ChannelFuture future) {
292 writeLastChunk().addListener(new ChannelFutureListener() {
293 public void operationComplete(ChannelFuture f) {
294 realChannel.unbind().addListener(new ChannelFutureListener() {
295 public void operationComplete(ChannelFuture f) {
296 if (f.isSuccess()) {
297 future.setSuccess();
298 } else {
299 future.setFailure(f.getCause());
300 }
301 }
302 });
303 }
304 });
305 }
306
307 void closeReal(final ChannelFuture future) {
308 writeLastChunk().addListener(new ChannelFutureListener() {
309 public void operationComplete(ChannelFuture f) {
310 realChannel.close().addListener(new ChannelFutureListener() {
311 public void operationComplete(ChannelFuture f) {
312
313
314
315
316
317 if (f.isSuccess()) {
318 future.setSuccess();
319 } else {
320 future.setFailure(f.getCause());
321 }
322
323
324 setClosed();
325 }
326 });
327 }
328 });
329 }
330
331 final class ServletChannelHandler extends SimpleChannelUpstreamHandler {
332
333 private volatile boolean readingChunks;
334 final SocketChannel virtualChannel = HttpTunnelingClientSocketChannel.this;
335
336 @Override
337 public void channelBound(ChannelHandlerContext ctx, ChannelStateEvent e)
338 throws Exception {
339 fireChannelBound(virtualChannel, (SocketAddress) e.getValue());
340 }
341
342 @Override
343 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
344 if (!readingChunks) {
345 HttpResponse res = (HttpResponse) e.getMessage();
346 if (res.getStatus().getCode() != HttpResponseStatus.OK.getCode()) {
347 throw new ChannelException("Unexpected HTTP response status: " + res.getStatus());
348 }
349
350 if (res.isChunked()) {
351 readingChunks = true;
352 } else {
353 ChannelBuffer content = res.getContent();
354 if (content.readable()) {
355 fireMessageReceived(HttpTunnelingClientSocketChannel.this, content);
356 }
357
358 closeReal(succeededFuture(virtualChannel));
359 }
360 } else {
361 HttpChunk chunk = (HttpChunk) e.getMessage();
362 if (!chunk.isLast()) {
363 fireMessageReceived(HttpTunnelingClientSocketChannel.this, chunk.getContent());
364 } else {
365 readingChunks = false;
366
367 closeReal(succeededFuture(virtualChannel));
368 }
369 }
370 }
371
372 @Override
373 public void channelInterestChanged(ChannelHandlerContext ctx,
374 ChannelStateEvent e) throws Exception {
375 fireChannelInterestChanged(virtualChannel);
376 }
377
378 @Override
379 public void channelDisconnected(ChannelHandlerContext ctx,
380 ChannelStateEvent e) throws Exception {
381 fireChannelDisconnected(virtualChannel);
382 }
383
384 @Override
385 public void channelUnbound(ChannelHandlerContext ctx,
386 ChannelStateEvent e) throws Exception {
387 fireChannelUnbound(virtualChannel);
388 }
389
390 @Override
391 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
392 throws Exception {
393 fireChannelClosed(virtualChannel);
394 }
395
396 @Override
397 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
398 fireExceptionCaught(virtualChannel, e.getCause());
399 realChannel.close();
400 }
401 }
402 }