1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.example.discard;
17
18 import java.util.concurrent.atomic.AtomicLong;
19 import java.util.logging.Level;
20 import java.util.logging.Logger;
21
22 import org.jboss.netty.buffer.ChannelBuffer;
23 import org.jboss.netty.buffer.ChannelBuffers;
24 import org.jboss.netty.channel.Channel;
25 import org.jboss.netty.channel.ChannelEvent;
26 import org.jboss.netty.channel.ChannelHandlerContext;
27 import org.jboss.netty.channel.ChannelState;
28 import org.jboss.netty.channel.ChannelStateEvent;
29 import org.jboss.netty.channel.ExceptionEvent;
30 import org.jboss.netty.channel.MessageEvent;
31 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
32 import org.jboss.netty.channel.WriteCompletionEvent;
33
34
35
36
37
38
39
40
41
42 public class DiscardClientHandler extends SimpleChannelUpstreamHandler {
43
44 private static final Logger logger = Logger.getLogger(
45 DiscardClientHandler.class.getName());
46
47 private final AtomicLong transferredBytes = new AtomicLong();
48 private final byte[] content;
49
50 public DiscardClientHandler(int messageSize) {
51 if (messageSize <= 0) {
52 throw new IllegalArgumentException(
53 "messageSize: " + messageSize);
54 }
55 content = new byte[messageSize];
56 }
57
58 public long getTransferredBytes() {
59 return transferredBytes.get();
60 }
61
62 @Override
63 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
64 if (e instanceof ChannelStateEvent) {
65 if (((ChannelStateEvent) e).getState() != ChannelState.INTEREST_OPS) {
66 logger.info(e.toString());
67 }
68 }
69
70
71 super.handleUpstream(ctx, e);
72 }
73
74 @Override
75 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
76
77 generateTraffic(e);
78 }
79
80 @Override
81 public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
82
83 generateTraffic(e);
84 }
85
86 @Override
87 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
88
89 }
90
91 @Override
92 public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) {
93 transferredBytes.addAndGet(e.getWrittenAmount());
94 }
95
96 @Override
97 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
98
99 logger.log(
100 Level.WARNING,
101 "Unexpected exception from downstream.",
102 e.getCause());
103 e.getChannel().close();
104 }
105
106 private void generateTraffic(ChannelStateEvent e) {
107
108
109
110
111 Channel channel = e.getChannel();
112 while (channel.isWritable()) {
113 ChannelBuffer m = nextMessage();
114 if (m == null) {
115 break;
116 }
117 channel.write(m);
118 }
119 }
120
121 private ChannelBuffer nextMessage() {
122 return ChannelBuffers.wrappedBuffer(content);
123 }
124 }