View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.http;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.net.InetSocketAddress;
21  import java.net.SocketAddress;
22  import java.nio.channels.NotYetConnectedException;
23  
24  import javax.net.ssl.SSLContext;
25  import javax.net.ssl.SSLEngine;
26  
27  import org.jboss.netty.buffer.ChannelBuffer;
28  import org.jboss.netty.buffer.ChannelBuffers;
29  import org.jboss.netty.channel.AbstractChannel;
30  import org.jboss.netty.channel.ChannelException;
31  import org.jboss.netty.channel.ChannelFactory;
32  import org.jboss.netty.channel.ChannelFuture;
33  import org.jboss.netty.channel.ChannelFutureListener;
34  import org.jboss.netty.channel.ChannelHandlerContext;
35  import org.jboss.netty.channel.ChannelPipeline;
36  import org.jboss.netty.channel.ChannelSink;
37  import org.jboss.netty.channel.ChannelStateEvent;
38  import org.jboss.netty.channel.DefaultChannelPipeline;
39  import org.jboss.netty.channel.ExceptionEvent;
40  import org.jboss.netty.channel.MessageEvent;
41  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
42  import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
43  import org.jboss.netty.channel.socket.SocketChannel;
44  import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
45  import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
46  import org.jboss.netty.handler.codec.http.HttpChunk;
47  import org.jboss.netty.handler.codec.http.HttpHeaders;
48  import org.jboss.netty.handler.codec.http.HttpMethod;
49  import org.jboss.netty.handler.codec.http.HttpRequest;
50  import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
51  import org.jboss.netty.handler.codec.http.HttpResponse;
52  import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
53  import org.jboss.netty.handler.codec.http.HttpResponseStatus;
54  import org.jboss.netty.handler.codec.http.HttpVersion;
55  import org.jboss.netty.handler.ssl.SslHandler;
56  
57  /**
58   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
59   * @author Andy Taylor (andy.taylor@jboss.org)
60   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
61   * @version $Rev: 2285 $, $Date: 2010-05-27 21:02:49 +0900 (Thu, 27 May 2010) $
62   */
63  class HttpTunnelingClientSocketChannel extends AbstractChannel
64          implements org.jboss.netty.channel.socket.SocketChannel {
65  
66      final HttpTunnelingSocketChannelConfig config;
67  
68      volatile boolean requestHeaderWritten;
69  
70      final Object interestOpsLock = new Object();
71  
72      final SocketChannel realChannel;
73  
74      private final HttpTunnelingClientSocketChannel.ServletChannelHandler handler = new ServletChannelHandler();
75  
76      HttpTunnelingClientSocketChannel(
77              ChannelFactory factory,
78              ChannelPipeline pipeline,
79              ChannelSink sink, ClientSocketChannelFactory clientSocketChannelFactory) {
80  
81          super(null, factory, pipeline, sink);
82  
83          config = new HttpTunnelingSocketChannelConfig(this);
84          DefaultChannelPipeline channelPipeline = new DefaultChannelPipeline();
85          channelPipeline.addLast("decoder", new HttpResponseDecoder());
86          channelPipeline.addLast("encoder", new HttpRequestEncoder());
87          channelPipeline.addLast("handler", handler);
88          realChannel = clientSocketChannelFactory.newChannel(channelPipeline);
89  
90          fireChannelOpen(this);
91      }
92  
93      public HttpTunnelingSocketChannelConfig getConfig() {
94          return config;
95      }
96  
97      public InetSocketAddress getLocalAddress() {
98          return realChannel.getLocalAddress();
99      }
100 
101     public InetSocketAddress getRemoteAddress() {
102         return realChannel.getRemoteAddress();
103     }
104 
105     public boolean isBound() {
106         return realChannel.isBound();
107     }
108 
109     public boolean isConnected() {
110         return realChannel.isConnected();
111     }
112 
113     @Override
114     public int getInterestOps() {
115         return realChannel.getInterestOps();
116     }
117 
118     @Override
119     public boolean isWritable() {
120         return realChannel.isWritable();
121     }
122 
123     @Override
124     protected boolean setClosed() {
125         return super.setClosed();
126     }
127 
128     @Override
129     public ChannelFuture write(Object message, SocketAddress remoteAddress) {
130         if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
131             return super.write(message, null);
132         }
133         else {
134             return getUnsupportedOperationFuture();
135         }
136     }
137 
138     void bindReal(final SocketAddress localAddress, final ChannelFuture future) {
139         realChannel.bind(localAddress).addListener(new ChannelFutureListener() {
140             public void operationComplete(ChannelFuture f) {
141                 if (f.isSuccess()) {
142                     future.setSuccess();
143                 } else {
144                     future.setFailure(f.getCause());
145                 }
146             }
147         });
148     }
149 
150     void connectReal(final SocketAddress remoteAddress, final ChannelFuture future) {
151         final SocketChannel virtualChannel = this;
152         realChannel.connect(remoteAddress).addListener(new ChannelFutureListener() {
153             public void operationComplete(ChannelFuture f) {
154                 final String serverName = config.getServerName();
155                 final int serverPort = ((InetSocketAddress) remoteAddress).getPort();
156                 final String serverPath = config.getServerPath();
157 
158                 if (f.isSuccess()) {
159                     // Configure SSL
160                     SSLContext sslContext = config.getSslContext();
161                     ChannelFuture sslHandshakeFuture = null;
162                     if (sslContext != null) {
163                         // Create a new SSLEngine from the specified SSLContext.
164                         SSLEngine engine;
165                         if (serverName != null) {
166                             engine = sslContext.createSSLEngine(serverName, serverPort);
167                         } else {
168                             engine = sslContext.createSSLEngine();
169                         }
170 
171                         // Configure the SSLEngine.
172                         engine.setUseClientMode(true);
173                         engine.setEnableSessionCreation(config.isEnableSslSessionCreation());
174                         String[] enabledCipherSuites = config.getEnabledSslCipherSuites();
175                         if (enabledCipherSuites != null) {
176                             engine.setEnabledCipherSuites(enabledCipherSuites);
177                         }
178                         String[] enabledProtocols = config.getEnabledSslProtocols();
179                         if (enabledProtocols != null) {
180                             engine.setEnabledProtocols(enabledProtocols);
181                         }
182 
183                         SslHandler sslHandler = new SslHandler(engine);
184                         realChannel.getPipeline().addFirst("ssl", sslHandler);
185                         sslHandshakeFuture = sslHandler.handshake();
186                     }
187 
188                     // Send the HTTP request.
189                     final HttpRequest req = new DefaultHttpRequest(
190                             HttpVersion.HTTP_1_1, HttpMethod.POST, serverPath);
191                     if (serverName != null) {
192                         req.setHeader(HttpHeaders.Names.HOST, serverName);
193                     }
194                     req.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream");
195                     req.setHeader(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
196                     req.setHeader(HttpHeaders.Names.CONTENT_TRANSFER_ENCODING, HttpHeaders.Values.BINARY);
197                     req.setHeader(HttpHeaders.Names.USER_AGENT, HttpTunnelingClientSocketChannel.class.getName());
198 
199                     if (sslHandshakeFuture == null) {
200                         realChannel.write(req);
201                         requestHeaderWritten = true;
202                         future.setSuccess();
203                         fireChannelConnected(virtualChannel, remoteAddress);
204                     } else {
205                         sslHandshakeFuture.addListener(new ChannelFutureListener() {
206                             public void operationComplete(ChannelFuture f) {
207                                 if (f.isSuccess()) {
208                                     realChannel.write(req);
209                                     requestHeaderWritten = true;
210                                     future.setSuccess();
211                                     fireChannelConnected(virtualChannel, remoteAddress);
212                                 } else {
213                                     future.setFailure(f.getCause());
214                                     fireExceptionCaught(virtualChannel, f.getCause());
215                                 }
216                             }
217                         });
218                     }
219                 } else {
220                     future.setFailure(f.getCause());
221                     fireExceptionCaught(virtualChannel, f.getCause());
222                 }
223             }
224         });
225     }
226 
227     void writeReal(final ChannelBuffer a, final ChannelFuture future) {
228         if (!requestHeaderWritten) {
229             throw new NotYetConnectedException();
230         }
231 
232         final int size = a.readableBytes();
233         final ChannelFuture f;
234 
235         if (size == 0) {
236             f = realChannel.write(ChannelBuffers.EMPTY_BUFFER);
237         } else {
238             f = realChannel.write(new DefaultHttpChunk(a));
239         }
240 
241         f.addListener(new ChannelFutureListener() {
242             public void operationComplete(ChannelFuture f) {
243                 if (f.isSuccess()) {
244                     future.setSuccess();
245                     if (size != 0) {
246                         fireWriteComplete(HttpTunnelingClientSocketChannel.this, size);
247                     }
248                 } else {
249                     future.setFailure(f.getCause());
250                 }
251             }
252         });
253     }
254 
255     private ChannelFuture writeLastChunk() {
256         if (!requestHeaderWritten) {
257             return failedFuture(this, new NotYetConnectedException());
258         } else {
259             return realChannel.write(HttpChunk.LAST_CHUNK);
260         }
261     }
262 
263     void setInterestOpsReal(final int interestOps, final ChannelFuture future) {
264         realChannel.setInterestOps(interestOps).addListener(new ChannelFutureListener() {
265             public void operationComplete(ChannelFuture f) {
266                 if (f.isSuccess()) {
267                     future.setSuccess();
268                 } else {
269                     future.setFailure(f.getCause());
270                 }
271             }
272         });
273     }
274 
275     void disconnectReal(final ChannelFuture future) {
276         writeLastChunk().addListener(new ChannelFutureListener() {
277             public void operationComplete(ChannelFuture f) {
278                 realChannel.disconnect().addListener(new ChannelFutureListener() {
279                     public void operationComplete(ChannelFuture f) {
280                         if (f.isSuccess()) {
281                             future.setSuccess();
282                         } else {
283                             future.setFailure(f.getCause());
284                         }
285                     }
286                 });
287             }
288         });
289     }
290 
291     void unbindReal(final ChannelFuture future) {
292         writeLastChunk().addListener(new ChannelFutureListener() {
293             public void operationComplete(ChannelFuture f) {
294                 realChannel.unbind().addListener(new ChannelFutureListener() {
295                     public void operationComplete(ChannelFuture f) {
296                         if (f.isSuccess()) {
297                             future.setSuccess();
298                         } else {
299                             future.setFailure(f.getCause());
300                         }
301                     }
302                 });
303             }
304         });
305     }
306 
307     void closeReal(final ChannelFuture future) {
308         writeLastChunk().addListener(new ChannelFutureListener() {
309             public void operationComplete(ChannelFuture f) {
310                 realChannel.close().addListener(new ChannelFutureListener() {
311                     public void operationComplete(ChannelFuture f) {
312                         // Note: If 'future' refers to the closeFuture,
313                         // setSuccess() and setFailure() do nothing.
314                         // AbstractChannel.setClosed() should be called instead.
315                         // (See AbstractChannel.ChannelCloseFuture)
316 
317                         if (f.isSuccess()) {
318                             future.setSuccess();
319                         } else {
320                             future.setFailure(f.getCause());
321                         }
322 
323                         // Notify the closeFuture.
324                         setClosed();
325                     }
326                 });
327             }
328         });
329     }
330 
331     final class ServletChannelHandler extends SimpleChannelUpstreamHandler {
332 
333         private volatile boolean readingChunks;
334         final SocketChannel virtualChannel = HttpTunnelingClientSocketChannel.this;
335 
336         @Override
337         public void channelBound(ChannelHandlerContext ctx, ChannelStateEvent e)
338                 throws Exception {
339             fireChannelBound(virtualChannel, (SocketAddress) e.getValue());
340         }
341 
342         @Override
343         public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
344             if (!readingChunks) {
345                 HttpResponse res = (HttpResponse) e.getMessage();
346                 if (res.getStatus().getCode() != HttpResponseStatus.OK.getCode()) {
347                     throw new ChannelException("Unexpected HTTP response status: " + res.getStatus());
348                 }
349 
350                 if (res.isChunked()) {
351                     readingChunks = true;
352                 } else {
353                     ChannelBuffer content = res.getContent();
354                     if (content.readable()) {
355                         fireMessageReceived(HttpTunnelingClientSocketChannel.this, content);
356                     }
357                     // Reached to the end of response - close the request.
358                     closeReal(succeededFuture(virtualChannel));
359                 }
360             } else {
361                 HttpChunk chunk = (HttpChunk) e.getMessage();
362                 if (!chunk.isLast()) {
363                     fireMessageReceived(HttpTunnelingClientSocketChannel.this, chunk.getContent());
364                 } else {
365                     readingChunks = false;
366                     // Reached to the end of response - close the request.
367                     closeReal(succeededFuture(virtualChannel));
368                 }
369             }
370         }
371 
372         @Override
373         public void channelInterestChanged(ChannelHandlerContext ctx,
374                 ChannelStateEvent e) throws Exception {
375             fireChannelInterestChanged(virtualChannel);
376         }
377 
378         @Override
379         public void channelDisconnected(ChannelHandlerContext ctx,
380                 ChannelStateEvent e) throws Exception {
381             fireChannelDisconnected(virtualChannel);
382         }
383 
384         @Override
385         public void channelUnbound(ChannelHandlerContext ctx,
386                 ChannelStateEvent e) throws Exception {
387             fireChannelUnbound(virtualChannel);
388         }
389 
390         @Override
391         public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
392                 throws Exception {
393             fireChannelClosed(virtualChannel);
394         }
395 
396         @Override
397         public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
398             fireExceptionCaught(virtualChannel, e.getCause());
399             realChannel.close();
400         }
401     }
402 }