1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.example.factorial;
17
18 import java.math.BigInteger;
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.LinkedBlockingQueue;
21 import java.util.logging.Level;
22 import java.util.logging.Logger;
23
24 import org.jboss.netty.channel.Channel;
25 import org.jboss.netty.channel.ChannelEvent;
26 import org.jboss.netty.channel.ChannelFuture;
27 import org.jboss.netty.channel.ChannelFutureListener;
28 import org.jboss.netty.channel.ChannelHandlerContext;
29 import org.jboss.netty.channel.ChannelStateEvent;
30 import org.jboss.netty.channel.ExceptionEvent;
31 import org.jboss.netty.channel.MessageEvent;
32 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
33
34
35
36
37
38
39
40
41
42
43
44
45
46 public class FactorialClientHandler extends SimpleChannelUpstreamHandler {
47
48 private static final Logger logger = Logger.getLogger(
49 FactorialClientHandler.class.getName());
50
51
52 private int i = 1;
53 private int receivedMessages = 0;
54 private final int count;
55 final BlockingQueue<BigInteger> answer = new LinkedBlockingQueue<BigInteger>();
56
57 public FactorialClientHandler(int count) {
58 this.count = count;
59 }
60
61 public BigInteger getFactorial() {
62 boolean interrupted = false;
63 for (;;) {
64 try {
65 BigInteger factorial = answer.take();
66 if (interrupted) {
67 Thread.currentThread().interrupt();
68 }
69 return factorial;
70 } catch (InterruptedException e) {
71 interrupted = true;
72 }
73 }
74 }
75
76 @Override
77 public void handleUpstream(
78 ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
79 if (e instanceof ChannelStateEvent) {
80 logger.info(e.toString());
81 }
82 super.handleUpstream(ctx, e);
83 }
84
85 @Override
86 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
87 sendNumbers(e);
88 }
89
90 @Override
91 public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
92 sendNumbers(e);
93 }
94
95 @Override
96 public void messageReceived(
97 ChannelHandlerContext ctx, final MessageEvent e) {
98 receivedMessages ++;
99 if (receivedMessages == count) {
100
101 e.getChannel().close().addListener(new ChannelFutureListener() {
102 public void operationComplete(ChannelFuture future) {
103 boolean offered = answer.offer((BigInteger) e.getMessage());
104 assert offered;
105 }
106 });
107 }
108 }
109
110 @Override
111 public void exceptionCaught(
112 ChannelHandlerContext ctx, ExceptionEvent e) {
113 logger.log(
114 Level.WARNING,
115 "Unexpected exception from downstream.",
116 e.getCause());
117 e.getChannel().close();
118 }
119
120 private void sendNumbers(ChannelStateEvent e) {
121 Channel channel = e.getChannel();
122 while (channel.isWritable()) {
123 if (i <= count) {
124 channel.write(Integer.valueOf(i));
125 i ++;
126 } else {
127 break;
128 }
129 }
130 }
131 }