View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.example.proxy;
17  
18  import java.net.InetSocketAddress;
19  
20  import org.jboss.netty.bootstrap.ClientBootstrap;
21  import org.jboss.netty.buffer.ChannelBuffer;
22  import org.jboss.netty.buffer.ChannelBuffers;
23  import org.jboss.netty.channel.Channel;
24  import org.jboss.netty.channel.ChannelFuture;
25  import org.jboss.netty.channel.ChannelFutureListener;
26  import org.jboss.netty.channel.ChannelHandlerContext;
27  import org.jboss.netty.channel.ChannelStateEvent;
28  import org.jboss.netty.channel.ExceptionEvent;
29  import org.jboss.netty.channel.MessageEvent;
30  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
31  import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
32  
33  /**
34   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
35   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
36   * @version $Rev: 2376 $, $Date: 2010-10-25 03:24:20 +0900 (Mon, 25 Oct 2010) $
37   */
38  public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler {
39  
40      private final ClientSocketChannelFactory cf;
41      private final String remoteHost;
42      private final int remotePort;
43  
44      // This lock guards against the race condition that overrides the
45      // OP_READ flag incorrectly.
46      // See the related discussion: http://markmail.org/message/x7jc6mqx6ripynqf
47      final Object trafficLock = new Object();
48  
49      private volatile Channel outboundChannel;
50  
51      public HexDumpProxyInboundHandler(
52              ClientSocketChannelFactory cf, String remoteHost, int remotePort) {
53          this.cf = cf;
54          this.remoteHost = remoteHost;
55          this.remotePort = remotePort;
56      }
57  
58      @Override
59      public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
60              throws Exception {
61          // Suspend incoming traffic until connected to the remote host.
62          final Channel inboundChannel = e.getChannel();
63          inboundChannel.setReadable(false);
64  
65          // Start the connection attempt.
66          ClientBootstrap cb = new ClientBootstrap(cf);
67          cb.getPipeline().addLast("handler", new OutboundHandler(e.getChannel()));
68          ChannelFuture f = cb.connect(new InetSocketAddress(remoteHost, remotePort));
69  
70          outboundChannel = f.getChannel();
71          f.addListener(new ChannelFutureListener() {
72              public void operationComplete(ChannelFuture future) throws Exception {
73                  if (future.isSuccess()) {
74                      // Connection attempt succeeded:
75                      // Begin to accept incoming traffic.
76                      inboundChannel.setReadable(true);
77                  } else {
78                      // Close the connection if the connection attempt has failed.
79                      inboundChannel.close();
80                  }
81              }
82          });
83      }
84  
85      @Override
86      public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
87              throws Exception {
88          ChannelBuffer msg = (ChannelBuffer) e.getMessage();
89          //System.out.println(">>> " + ChannelBuffers.hexDump(msg));
90          synchronized (trafficLock) {
91              outboundChannel.write(msg);
92              // If outboundChannel is saturated, do not read until notified in
93              // OutboundHandler.channelInterestChanged().
94              if (!outboundChannel.isWritable()) {
95                  e.getChannel().setReadable(false);
96              }
97          }
98      }
99  
100     @Override
101     public void channelInterestChanged(ChannelHandlerContext ctx,
102             ChannelStateEvent e) throws Exception {
103         // If inboundChannel is not saturated anymore, continue accepting
104         // the incoming traffic from the outboundChannel.
105         synchronized (trafficLock) {
106             if (e.getChannel().isWritable()) {
107                 outboundChannel.setReadable(true);
108             }
109         }
110     }
111 
112     @Override
113     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
114             throws Exception {
115         if (outboundChannel != null) {
116             closeOnFlush(outboundChannel);
117         }
118     }
119 
120     @Override
121     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
122             throws Exception {
123         e.getCause().printStackTrace();
124         closeOnFlush(e.getChannel());
125     }
126 
127     private class OutboundHandler extends SimpleChannelUpstreamHandler {
128 
129         private final Channel inboundChannel;
130 
131         OutboundHandler(Channel inboundChannel) {
132             this.inboundChannel = inboundChannel;
133         }
134 
135         @Override
136         public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
137                 throws Exception {
138             ChannelBuffer msg = (ChannelBuffer) e.getMessage();
139             //System.out.println("<<< " + ChannelBuffers.hexDump(msg));
140             synchronized (trafficLock) {
141                 inboundChannel.write(msg);
142                 // If inboundChannel is saturated, do not read until notified in
143                 // HexDumpProxyInboundHandler.channelInterestChanged().
144                 if (!inboundChannel.isWritable()) {
145                     e.getChannel().setReadable(false);
146                 }
147             }
148         }
149 
150         @Override
151         public void channelInterestChanged(ChannelHandlerContext ctx,
152                 ChannelStateEvent e) throws Exception {
153             // If outboundChannel is not saturated anymore, continue accepting
154             // the incoming traffic from the inboundChannel.
155             synchronized (trafficLock) {
156                 if (e.getChannel().isWritable()) {
157                     inboundChannel.setReadable(true);
158                 }
159             }
160         }
161 
162         @Override
163         public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
164                 throws Exception {
165             closeOnFlush(inboundChannel);
166         }
167 
168         @Override
169         public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
170                 throws Exception {
171             e.getCause().printStackTrace();
172             closeOnFlush(e.getChannel());
173         }
174     }
175 
176     /**
177      * Closes the specified channel after all queued write requests are flushed.
178      */
179     static void closeOnFlush(Channel ch) {
180         if (ch.isConnected()) {
181             ch.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
182         }
183     }
184 }