1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.SocketAddress;
22 import java.net.SocketTimeoutException;
23 import java.nio.channels.CancelledKeyException;
24 import java.nio.channels.ClosedChannelException;
25 import java.nio.channels.ClosedSelectorException;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.nio.channels.SocketChannel;
29 import java.util.concurrent.Executor;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 import org.jboss.netty.channel.AbstractChannelSink;
33 import org.jboss.netty.channel.Channel;
34 import org.jboss.netty.channel.ChannelEvent;
35 import org.jboss.netty.channel.ChannelFuture;
36 import org.jboss.netty.channel.ChannelPipeline;
37 import org.jboss.netty.channel.ChannelState;
38 import org.jboss.netty.channel.ChannelStateEvent;
39 import org.jboss.netty.channel.MessageEvent;
40 import org.jboss.netty.logging.InternalLogger;
41 import org.jboss.netty.logging.InternalLoggerFactory;
42 import org.jboss.netty.util.ThreadRenamingRunnable;
43 import org.jboss.netty.util.internal.DeadLockProofWorker;
44
45
46
47
48
49
50
51
52
53 class NioServerSocketPipelineSink extends AbstractChannelSink {
54
55 static final InternalLogger logger =
56 InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);
57 private static final AtomicInteger nextId = new AtomicInteger();
58
59 private final int id = nextId.incrementAndGet();
60 private final NioWorker[] workers;
61 private final AtomicInteger workerIndex = new AtomicInteger();
62
63 NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) {
64 workers = new NioWorker[workerCount];
65 for (int i = 0; i < workers.length; i ++) {
66 workers[i] = new NioWorker(id, i + 1, workerExecutor);
67 }
68 }
69
70 public void eventSunk(
71 ChannelPipeline pipeline, ChannelEvent e) throws Exception {
72 Channel channel = e.getChannel();
73 if (channel instanceof NioServerSocketChannel) {
74 handleServerSocket(e);
75 } else if (channel instanceof NioSocketChannel) {
76 handleAcceptedSocket(e);
77 }
78 }
79
80 private void handleServerSocket(ChannelEvent e) {
81 if (!(e instanceof ChannelStateEvent)) {
82 return;
83 }
84
85 ChannelStateEvent event = (ChannelStateEvent) e;
86 NioServerSocketChannel channel =
87 (NioServerSocketChannel) event.getChannel();
88 ChannelFuture future = event.getFuture();
89 ChannelState state = event.getState();
90 Object value = event.getValue();
91
92 switch (state) {
93 case OPEN:
94 if (Boolean.FALSE.equals(value)) {
95 close(channel, future);
96 }
97 break;
98 case BOUND:
99 if (value != null) {
100 bind(channel, future, (SocketAddress) value);
101 } else {
102 close(channel, future);
103 }
104 break;
105 }
106 }
107
108 private void handleAcceptedSocket(ChannelEvent e) {
109 if (e instanceof ChannelStateEvent) {
110 ChannelStateEvent event = (ChannelStateEvent) e;
111 NioSocketChannel channel = (NioSocketChannel) event.getChannel();
112 ChannelFuture future = event.getFuture();
113 ChannelState state = event.getState();
114 Object value = event.getValue();
115
116 switch (state) {
117 case OPEN:
118 if (Boolean.FALSE.equals(value)) {
119 channel.worker.close(channel, future);
120 }
121 break;
122 case BOUND:
123 case CONNECTED:
124 if (value == null) {
125 channel.worker.close(channel, future);
126 }
127 break;
128 case INTEREST_OPS:
129 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
130 break;
131 }
132 } else if (e instanceof MessageEvent) {
133 MessageEvent event = (MessageEvent) e;
134 NioSocketChannel channel = (NioSocketChannel) event.getChannel();
135 boolean offered = channel.writeBuffer.offer(event);
136 assert offered;
137 channel.worker.writeFromUserCode(channel);
138 }
139 }
140
141 private void bind(
142 NioServerSocketChannel channel, ChannelFuture future,
143 SocketAddress localAddress) {
144
145 boolean bound = false;
146 boolean bossStarted = false;
147 try {
148 channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
149 bound = true;
150
151 future.setSuccess();
152 fireChannelBound(channel, channel.getLocalAddress());
153
154 Executor bossExecutor =
155 ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
156 DeadLockProofWorker.start(
157 bossExecutor,
158 new ThreadRenamingRunnable(
159 new Boss(channel),
160 "New I/O server boss #" + id + " (" + channel + ')'));
161 bossStarted = true;
162 } catch (Throwable t) {
163 future.setFailure(t);
164 fireExceptionCaught(channel, t);
165 } finally {
166 if (!bossStarted && bound) {
167 close(channel, future);
168 }
169 }
170 }
171
172 private void close(NioServerSocketChannel channel, ChannelFuture future) {
173 boolean bound = channel.isBound();
174 try {
175 if (channel.socket.isOpen()) {
176 channel.socket.close();
177 Selector selector = channel.selector;
178 if (selector != null) {
179 selector.wakeup();
180 }
181 }
182
183
184
185
186 channel.shutdownLock.lock();
187 try {
188 if (channel.setClosed()) {
189 future.setSuccess();
190 if (bound) {
191 fireChannelUnbound(channel);
192 }
193 fireChannelClosed(channel);
194 } else {
195 future.setSuccess();
196 }
197 } finally {
198 channel.shutdownLock.unlock();
199 }
200 } catch (Throwable t) {
201 future.setFailure(t);
202 fireExceptionCaught(channel, t);
203 }
204 }
205
206 NioWorker nextWorker() {
207 return workers[Math.abs(
208 workerIndex.getAndIncrement() % workers.length)];
209 }
210
211 private final class Boss implements Runnable {
212 private final Selector selector;
213 private final NioServerSocketChannel channel;
214
215 Boss(NioServerSocketChannel channel) throws IOException {
216 this.channel = channel;
217
218 selector = Selector.open();
219
220 boolean registered = false;
221 try {
222 channel.socket.register(selector, SelectionKey.OP_ACCEPT);
223 registered = true;
224 } finally {
225 if (!registered) {
226 closeSelector();
227 }
228 }
229
230 channel.selector = selector;
231 }
232
233 public void run() {
234 final Thread currentThread = Thread.currentThread();
235
236 channel.shutdownLock.lock();
237 try {
238 for (;;) {
239 try {
240 if (selector.select(1000) > 0) {
241 selector.selectedKeys().clear();
242 }
243
244 SocketChannel acceptedSocket = channel.socket.accept();
245 if (acceptedSocket != null) {
246 registerAcceptedChannel(acceptedSocket, currentThread);
247 }
248 } catch (SocketTimeoutException e) {
249
250
251 } catch (CancelledKeyException e) {
252
253 } catch (ClosedSelectorException e) {
254
255 } catch (ClosedChannelException e) {
256
257 break;
258 } catch (Throwable e) {
259 logger.warn(
260 "Failed to accept a connection.", e);
261 try {
262 Thread.sleep(1000);
263 } catch (InterruptedException e1) {
264
265 }
266 }
267 }
268 } finally {
269 channel.shutdownLock.unlock();
270 closeSelector();
271 }
272 }
273
274 private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
275 try {
276 ChannelPipeline pipeline =
277 channel.getConfig().getPipelineFactory().getPipeline();
278 NioWorker worker = nextWorker();
279 worker.register(new NioAcceptedSocketChannel(
280 channel.getFactory(), pipeline, channel,
281 NioServerSocketPipelineSink.this, acceptedSocket,
282 worker, currentThread), null);
283 } catch (Exception e) {
284 logger.warn(
285 "Failed to initialize an accepted socket.", e);
286 try {
287 acceptedSocket.close();
288 } catch (IOException e2) {
289 logger.warn(
290 "Failed to close a partially accepted socket.",
291 e2);
292 }
293 }
294 }
295
296 private void closeSelector() {
297 channel.selector = null;
298 try {
299 selector.close();
300 } catch (Exception e) {
301 logger.warn("Failed to close a selector.", e);
302 }
303 }
304 }
305 }