1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.example.proxy;
17
18 import java.net.InetSocketAddress;
19
20 import org.jboss.netty.bootstrap.ClientBootstrap;
21 import org.jboss.netty.buffer.ChannelBuffer;
22 import org.jboss.netty.buffer.ChannelBuffers;
23 import org.jboss.netty.channel.Channel;
24 import org.jboss.netty.channel.ChannelFuture;
25 import org.jboss.netty.channel.ChannelFutureListener;
26 import org.jboss.netty.channel.ChannelHandlerContext;
27 import org.jboss.netty.channel.ChannelStateEvent;
28 import org.jboss.netty.channel.ExceptionEvent;
29 import org.jboss.netty.channel.MessageEvent;
30 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
31 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
32
33
34
35
36
37
38 public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler {
39
40 private final ClientSocketChannelFactory cf;
41 private final String remoteHost;
42 private final int remotePort;
43
44
45
46
47 final Object trafficLock = new Object();
48
49 private volatile Channel outboundChannel;
50
51 public HexDumpProxyInboundHandler(
52 ClientSocketChannelFactory cf, String remoteHost, int remotePort) {
53 this.cf = cf;
54 this.remoteHost = remoteHost;
55 this.remotePort = remotePort;
56 }
57
58 @Override
59 public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
60 throws Exception {
61
62 final Channel inboundChannel = e.getChannel();
63 inboundChannel.setReadable(false);
64
65
66 ClientBootstrap cb = new ClientBootstrap(cf);
67 cb.getPipeline().addLast("handler", new OutboundHandler(e.getChannel()));
68 ChannelFuture f = cb.connect(new InetSocketAddress(remoteHost, remotePort));
69
70 outboundChannel = f.getChannel();
71 f.addListener(new ChannelFutureListener() {
72 public void operationComplete(ChannelFuture future) throws Exception {
73 if (future.isSuccess()) {
74
75
76 inboundChannel.setReadable(true);
77 } else {
78
79 inboundChannel.close();
80 }
81 }
82 });
83 }
84
85 @Override
86 public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
87 throws Exception {
88 ChannelBuffer msg = (ChannelBuffer) e.getMessage();
89
90 synchronized (trafficLock) {
91 outboundChannel.write(msg);
92
93
94 if (!outboundChannel.isWritable()) {
95 e.getChannel().setReadable(false);
96 }
97 }
98 }
99
100 @Override
101 public void channelInterestChanged(ChannelHandlerContext ctx,
102 ChannelStateEvent e) throws Exception {
103
104
105 synchronized (trafficLock) {
106 if (e.getChannel().isWritable()) {
107 outboundChannel.setReadable(true);
108 }
109 }
110 }
111
112 @Override
113 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
114 throws Exception {
115 if (outboundChannel != null) {
116 closeOnFlush(outboundChannel);
117 }
118 }
119
120 @Override
121 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
122 throws Exception {
123 e.getCause().printStackTrace();
124 closeOnFlush(e.getChannel());
125 }
126
127 private class OutboundHandler extends SimpleChannelUpstreamHandler {
128
129 private final Channel inboundChannel;
130
131 OutboundHandler(Channel inboundChannel) {
132 this.inboundChannel = inboundChannel;
133 }
134
135 @Override
136 public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
137 throws Exception {
138 ChannelBuffer msg = (ChannelBuffer) e.getMessage();
139
140 synchronized (trafficLock) {
141 inboundChannel.write(msg);
142
143
144 if (!inboundChannel.isWritable()) {
145 e.getChannel().setReadable(false);
146 }
147 }
148 }
149
150 @Override
151 public void channelInterestChanged(ChannelHandlerContext ctx,
152 ChannelStateEvent e) throws Exception {
153
154
155 synchronized (trafficLock) {
156 if (e.getChannel().isWritable()) {
157 inboundChannel.setReadable(true);
158 }
159 }
160 }
161
162 @Override
163 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
164 throws Exception {
165 closeOnFlush(inboundChannel);
166 }
167
168 @Override
169 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
170 throws Exception {
171 e.getCause().printStackTrace();
172 closeOnFlush(e.getChannel());
173 }
174 }
175
176
177
178
179 static void closeOnFlush(Channel ch) {
180 if (ch.isConnected()) {
181 ch.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
182 }
183 }
184 }