1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.net.InetSocketAddress;
21 import java.net.SocketAddress;
22 import java.util.concurrent.Executor;
23 import java.util.concurrent.atomic.AtomicInteger;
24
25 import org.jboss.netty.channel.AbstractChannelSink;
26 import org.jboss.netty.channel.ChannelEvent;
27 import org.jboss.netty.channel.ChannelFuture;
28 import org.jboss.netty.channel.ChannelFutureListener;
29 import org.jboss.netty.channel.ChannelPipeline;
30 import org.jboss.netty.channel.ChannelState;
31 import org.jboss.netty.channel.ChannelStateEvent;
32 import org.jboss.netty.channel.MessageEvent;
33
34
35
36
37
38
39
40
41
42
43
44 class NioDatagramPipelineSink extends AbstractChannelSink {
45
46 private static final AtomicInteger nextId = new AtomicInteger();
47
48 private final int id = nextId.incrementAndGet();
49 private final NioDatagramWorker[] workers;
50 private final AtomicInteger workerIndex = new AtomicInteger();
51
52
53
54
55
56
57
58
59
60
61
62 NioDatagramPipelineSink(final Executor workerExecutor, final int workerCount) {
63 workers = new NioDatagramWorker[workerCount];
64 for (int i = 0; i < workers.length; i ++) {
65 workers[i] = new NioDatagramWorker(id, i + 1, workerExecutor);
66 }
67 }
68
69
70
71
72
73
74
75
76 public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e)
77 throws Exception {
78 final NioDatagramChannel channel = (NioDatagramChannel) e.getChannel();
79 final ChannelFuture future = e.getFuture();
80 if (e instanceof ChannelStateEvent) {
81 final ChannelStateEvent stateEvent = (ChannelStateEvent) e;
82 final ChannelState state = stateEvent.getState();
83 final Object value = stateEvent.getValue();
84 switch (state) {
85 case OPEN:
86 if (Boolean.FALSE.equals(value)) {
87 channel.worker.close(channel, future);
88 }
89 break;
90 case BOUND:
91 if (value != null) {
92 bind(channel, future, (InetSocketAddress) value);
93 } else {
94 channel.worker.close(channel, future);
95 }
96 break;
97 case CONNECTED:
98 if (value != null) {
99 connect(channel, future, (InetSocketAddress) value);
100 } else {
101 NioDatagramWorker.disconnect(channel, future);
102 }
103 break;
104 case INTEREST_OPS:
105 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
106 break;
107 }
108 } else if (e instanceof MessageEvent) {
109 final MessageEvent event = (MessageEvent) e;
110 final boolean offered = channel.writeBufferQueue.offer(event);
111 assert offered;
112 channel.worker.writeFromUserCode(channel);
113 }
114 }
115
116 private void close(NioDatagramChannel channel, ChannelFuture future) {
117 try {
118 channel.getDatagramChannel().socket().close();
119 if (channel.setClosed()) {
120 future.setSuccess();
121 if (channel.isBound()) {
122 fireChannelUnbound(channel);
123 }
124 fireChannelClosed(channel);
125 } else {
126 future.setSuccess();
127 }
128 } catch (final Throwable t) {
129 future.setFailure(t);
130 fireExceptionCaught(channel, t);
131 }
132 }
133
134
135
136
137
138 private void bind(final NioDatagramChannel channel,
139 final ChannelFuture future, final InetSocketAddress address) {
140 boolean bound = false;
141 boolean started = false;
142 try {
143
144 channel.getDatagramChannel().socket().bind(address);
145 bound = true;
146
147 future.setSuccess();
148 fireChannelBound(channel, address);
149
150 channel.worker.register(channel, null);
151 started = true;
152 } catch (final Throwable t) {
153 future.setFailure(t);
154 fireExceptionCaught(channel, t);
155 } finally {
156 if (!started && bound) {
157 close(channel, future);
158 }
159 }
160 }
161
162 private void connect(
163 NioDatagramChannel channel, ChannelFuture future,
164 SocketAddress remoteAddress) {
165
166 boolean bound = channel.isBound();
167 boolean connected = false;
168 boolean workerStarted = false;
169
170 future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
171
172
173
174 channel.remoteAddress = null;
175
176 try {
177 channel.getDatagramChannel().connect(remoteAddress);
178 connected = true;
179
180
181 future.setSuccess();
182 if (!bound) {
183 fireChannelBound(channel, channel.getLocalAddress());
184 }
185 fireChannelConnected(channel, channel.getRemoteAddress());
186
187 if (!bound) {
188 channel.worker.register(channel, future);
189 }
190
191 workerStarted = true;
192 } catch (Throwable t) {
193 future.setFailure(t);
194 fireExceptionCaught(channel, t);
195 } finally {
196 if (connected && !workerStarted) {
197 channel.worker.close(channel, future);
198 }
199 }
200 }
201
202 NioDatagramWorker nextWorker() {
203 return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
204 }
205
206 }