1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.ConnectException;
22 import java.net.SocketAddress;
23 import java.nio.channels.ClosedChannelException;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.Selector;
26 import java.util.Iterator;
27 import java.util.Queue;
28 import java.util.Set;
29 import java.util.concurrent.Executor;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicInteger;
33
34 import org.jboss.netty.channel.AbstractChannelSink;
35 import org.jboss.netty.channel.ChannelEvent;
36 import org.jboss.netty.channel.ChannelException;
37 import org.jboss.netty.channel.ChannelFuture;
38 import org.jboss.netty.channel.ChannelFutureListener;
39 import org.jboss.netty.channel.ChannelPipeline;
40 import org.jboss.netty.channel.ChannelState;
41 import org.jboss.netty.channel.ChannelStateEvent;
42 import org.jboss.netty.channel.MessageEvent;
43 import org.jboss.netty.logging.InternalLogger;
44 import org.jboss.netty.logging.InternalLoggerFactory;
45 import org.jboss.netty.util.ThreadRenamingRunnable;
46 import org.jboss.netty.util.internal.DeadLockProofWorker;
47 import org.jboss.netty.util.internal.LinkedTransferQueue;
48
49
50
51
52
53
54
55
56
57 class NioClientSocketPipelineSink extends AbstractChannelSink {
58
59 static final InternalLogger logger =
60 InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
61 private static final AtomicInteger nextId = new AtomicInteger();
62
63 final int id = nextId.incrementAndGet();
64 final Executor bossExecutor;
65 private final Boss boss = new Boss();
66 private final NioWorker[] workers;
67 private final AtomicInteger workerIndex = new AtomicInteger();
68
69 NioClientSocketPipelineSink(
70 Executor bossExecutor, Executor workerExecutor, int workerCount) {
71 this.bossExecutor = bossExecutor;
72 workers = new NioWorker[workerCount];
73 for (int i = 0; i < workers.length; i ++) {
74 workers[i] = new NioWorker(id, i + 1, workerExecutor);
75 }
76 }
77
78 public void eventSunk(
79 ChannelPipeline pipeline, ChannelEvent e) throws Exception {
80 if (e instanceof ChannelStateEvent) {
81 ChannelStateEvent event = (ChannelStateEvent) e;
82 NioClientSocketChannel channel =
83 (NioClientSocketChannel) event.getChannel();
84 ChannelFuture future = event.getFuture();
85 ChannelState state = event.getState();
86 Object value = event.getValue();
87
88 switch (state) {
89 case OPEN:
90 if (Boolean.FALSE.equals(value)) {
91 channel.worker.close(channel, future);
92 }
93 break;
94 case BOUND:
95 if (value != null) {
96 bind(channel, future, (SocketAddress) value);
97 } else {
98 channel.worker.close(channel, future);
99 }
100 break;
101 case CONNECTED:
102 if (value != null) {
103 connect(channel, future, (SocketAddress) value);
104 } else {
105 channel.worker.close(channel, future);
106 }
107 break;
108 case INTEREST_OPS:
109 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
110 break;
111 }
112 } else if (e instanceof MessageEvent) {
113 MessageEvent event = (MessageEvent) e;
114 NioSocketChannel channel = (NioSocketChannel) event.getChannel();
115 boolean offered = channel.writeBuffer.offer(event);
116 assert offered;
117 channel.worker.writeFromUserCode(channel);
118 }
119 }
120
121 private void bind(
122 NioClientSocketChannel channel, ChannelFuture future,
123 SocketAddress localAddress) {
124 try {
125 channel.socket.socket().bind(localAddress);
126 channel.boundManually = true;
127 channel.setBound();
128 future.setSuccess();
129 fireChannelBound(channel, channel.getLocalAddress());
130 } catch (Throwable t) {
131 future.setFailure(t);
132 fireExceptionCaught(channel, t);
133 }
134 }
135
136 private void connect(
137 final NioClientSocketChannel channel, final ChannelFuture cf,
138 SocketAddress remoteAddress) {
139 try {
140 if (channel.socket.connect(remoteAddress)) {
141 channel.worker.register(channel, cf);
142 } else {
143 channel.getCloseFuture().addListener(new ChannelFutureListener() {
144 public void operationComplete(ChannelFuture f)
145 throws Exception {
146 if (!cf.isDone()) {
147 cf.setFailure(new ClosedChannelException());
148 }
149 }
150 });
151 cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
152 channel.connectFuture = cf;
153 boss.register(channel);
154 }
155
156 } catch (Throwable t) {
157 cf.setFailure(t);
158 fireExceptionCaught(channel, t);
159 channel.worker.close(channel, succeededFuture(channel));
160 }
161 }
162
163 NioWorker nextWorker() {
164 return workers[Math.abs(
165 workerIndex.getAndIncrement() % workers.length)];
166 }
167
168 private final class Boss implements Runnable {
169
170 volatile Selector selector;
171 private boolean started;
172 private final AtomicBoolean wakenUp = new AtomicBoolean();
173 private final Object startStopLock = new Object();
174 private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
175
176 Boss() {
177 super();
178 }
179
180 void register(NioClientSocketChannel channel) {
181 Runnable registerTask = new RegisterTask(this, channel);
182 Selector selector;
183
184 synchronized (startStopLock) {
185 if (!started) {
186
187 try {
188 this.selector = selector = Selector.open();
189 } catch (Throwable t) {
190 throw new ChannelException(
191 "Failed to create a selector.", t);
192 }
193
194
195 boolean success = false;
196 try {
197 DeadLockProofWorker.start(
198 bossExecutor,
199 new ThreadRenamingRunnable(
200 this, "New I/O client boss #" + id));
201 success = true;
202 } finally {
203 if (!success) {
204
205 try {
206 selector.close();
207 } catch (Throwable t) {
208 logger.warn("Failed to close a selector.", t);
209 }
210 this.selector = selector = null;
211
212 }
213 }
214 } else {
215
216 selector = this.selector;
217 }
218
219 assert selector != null && selector.isOpen();
220
221 started = true;
222 boolean offered = registerTaskQueue.offer(registerTask);
223 assert offered;
224 }
225
226 if (wakenUp.compareAndSet(false, true)) {
227 selector.wakeup();
228 }
229 }
230
231 public void run() {
232 boolean shutdown = false;
233 Selector selector = this.selector;
234 long lastConnectTimeoutCheckTimeNanos = System.nanoTime();
235 for (;;) {
236 wakenUp.set(false);
237
238 try {
239 int selectedKeyCount = selector.select(500);
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269 if (wakenUp.get()) {
270 selector.wakeup();
271 }
272
273 processRegisterTaskQueue();
274
275 if (selectedKeyCount > 0) {
276 processSelectedKeys(selector.selectedKeys());
277 }
278
279
280 long currentTimeNanos = System.nanoTime();
281 if (currentTimeNanos - lastConnectTimeoutCheckTimeNanos >= 500 * 1000000L) {
282 lastConnectTimeoutCheckTimeNanos = currentTimeNanos;
283 processConnectTimeout(selector.keys(), currentTimeNanos);
284 }
285
286
287
288
289
290
291 if (selector.keys().isEmpty()) {
292 if (shutdown ||
293 bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) {
294
295 synchronized (startStopLock) {
296 if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
297 started = false;
298 try {
299 selector.close();
300 } catch (IOException e) {
301 logger.warn(
302 "Failed to close a selector.", e);
303 } finally {
304 this.selector = null;
305 }
306 break;
307 } else {
308 shutdown = false;
309 }
310 }
311 } else {
312
313 shutdown = true;
314 }
315 } else {
316 shutdown = false;
317 }
318 } catch (Throwable t) {
319 logger.warn(
320 "Unexpected exception in the selector loop.", t);
321
322
323 try {
324 Thread.sleep(1000);
325 } catch (InterruptedException e) {
326
327 }
328 }
329 }
330 }
331
332 private void processRegisterTaskQueue() {
333 for (;;) {
334 final Runnable task = registerTaskQueue.poll();
335 if (task == null) {
336 break;
337 }
338
339 task.run();
340 }
341 }
342
343 private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
344 for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
345 SelectionKey k = i.next();
346 i.remove();
347
348 if (!k.isValid()) {
349 close(k);
350 continue;
351 }
352
353 if (k.isConnectable()) {
354 connect(k);
355 }
356 }
357 }
358
359 private void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) {
360 ConnectException cause = null;
361 for (SelectionKey k: keys) {
362 if (!k.isValid()) {
363 continue;
364 }
365
366 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
367 if (ch.connectDeadlineNanos > 0 &&
368 currentTimeNanos >= ch.connectDeadlineNanos) {
369
370 if (cause == null) {
371 cause = new ConnectException("connection timed out");
372 }
373
374 ch.connectFuture.setFailure(cause);
375 fireExceptionCaught(ch, cause);
376 ch.worker.close(ch, succeededFuture(ch));
377 }
378 }
379 }
380
381 private void connect(SelectionKey k) {
382 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
383 try {
384 if (ch.socket.finishConnect()) {
385 k.cancel();
386 ch.worker.register(ch, ch.connectFuture);
387 }
388 } catch (Throwable t) {
389 ch.connectFuture.setFailure(t);
390 fireExceptionCaught(ch, t);
391 k.cancel();
392 ch.worker.close(ch, succeededFuture(ch));
393 }
394 }
395
396 private void close(SelectionKey k) {
397 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
398 ch.worker.close(ch, succeededFuture(ch));
399 }
400 }
401
402 private static final class RegisterTask implements Runnable {
403 private final Boss boss;
404 private final NioClientSocketChannel channel;
405
406 RegisterTask(Boss boss, NioClientSocketChannel channel) {
407 this.boss = boss;
408 this.channel = channel;
409 }
410
411 public void run() {
412 try {
413 channel.socket.register(
414 boss.selector, SelectionKey.OP_CONNECT, channel);
415 } catch (ClosedChannelException e) {
416 channel.worker.close(channel, succeededFuture(channel));
417 }
418
419 int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
420 if (connectTimeout > 0) {
421 channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
422 }
423 }
424 }
425 }