1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.SocketAddress;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.AsynchronousCloseException;
24 import java.nio.channels.CancelledKeyException;
25 import java.nio.channels.ClosedChannelException;
26 import java.nio.channels.NotYetConnectedException;
27 import java.nio.channels.SelectionKey;
28 import java.nio.channels.Selector;
29 import java.nio.channels.SocketChannel;
30 import java.util.Iterator;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.locks.ReadWriteLock;
37 import java.util.concurrent.locks.ReentrantReadWriteLock;
38
39 import org.jboss.netty.buffer.ChannelBuffer;
40 import org.jboss.netty.buffer.ChannelBufferFactory;
41 import org.jboss.netty.channel.Channel;
42 import org.jboss.netty.channel.ChannelException;
43 import org.jboss.netty.channel.ChannelFuture;
44 import org.jboss.netty.channel.MessageEvent;
45 import org.jboss.netty.channel.ReceiveBufferSizePredictor;
46 import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
47 import org.jboss.netty.logging.InternalLogger;
48 import org.jboss.netty.logging.InternalLoggerFactory;
49 import org.jboss.netty.util.ThreadRenamingRunnable;
50 import org.jboss.netty.util.internal.DeadLockProofWorker;
51 import org.jboss.netty.util.internal.LinkedTransferQueue;
52
53
54
55
56
57
58
59
60
61 class NioWorker implements Runnable {
62
63 private static final InternalLogger logger =
64 InternalLoggerFactory.getInstance(NioWorker.class);
65
66 private static final int CONSTRAINT_LEVEL = NioProviderMetadata.CONSTRAINT_LEVEL;
67
68 static final int CLEANUP_INTERVAL = 256;
69
70 private final int bossId;
71 private final int id;
72 private final Executor executor;
73 private boolean started;
74 private volatile Thread thread;
75 volatile Selector selector;
76 private final AtomicBoolean wakenUp = new AtomicBoolean();
77 private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
78 private final Object startStopLock = new Object();
79 private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
80 private final Queue<Runnable> writeTaskQueue = new LinkedTransferQueue<Runnable>();
81 private volatile int cancelledKeys;
82
83 private final SocketReceiveBufferPool recvBufferPool = new SocketReceiveBufferPool();
84 private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
85
86 NioWorker(int bossId, int id, Executor executor) {
87 this.bossId = bossId;
88 this.id = id;
89 this.executor = executor;
90 }
91
92 void register(NioSocketChannel channel, ChannelFuture future) {
93
94 boolean server = !(channel instanceof NioClientSocketChannel);
95 Runnable registerTask = new RegisterTask(channel, future, server);
96 Selector selector;
97
98 synchronized (startStopLock) {
99
100 if (!started) {
101
102 try {
103 this.selector = selector = Selector.open();
104 } catch (Throwable t) {
105 throw new ChannelException(
106 "Failed to create a selector.", t);
107 }
108
109
110 String threadName =
111 (server ? "New I/O server worker #"
112 : "New I/O client worker #") + bossId + '-' + id;
113
114 boolean success = false;
115 try {
116 DeadLockProofWorker.start(
117 executor, new ThreadRenamingRunnable(this, threadName));
118 success = true;
119 } finally {
120 if (!success) {
121
122 try {
123 selector.close();
124 } catch (Throwable t) {
125 logger.warn("Failed to close a selector.", t);
126 }
127 this.selector = selector = null;
128
129 }
130 }
131 } else {
132
133 selector = this.selector;
134 }
135
136 assert selector != null && selector.isOpen();
137
138 started = true;
139 boolean offered = registerTaskQueue.offer(registerTask);
140 assert offered;
141 }
142
143 if (wakenUp.compareAndSet(false, true)) {
144 selector.wakeup();
145 }
146 }
147
148 public void run() {
149 thread = Thread.currentThread();
150
151 boolean shutdown = false;
152 Selector selector = this.selector;
153 for (;;) {
154 wakenUp.set(false);
155
156 if (CONSTRAINT_LEVEL != 0) {
157 selectorGuard.writeLock().lock();
158
159
160 selectorGuard.writeLock().unlock();
161 }
162
163 try {
164
165 SelectorUtil.select(selector);
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195 if (wakenUp.get()) {
196 selector.wakeup();
197 }
198
199 cancelledKeys = 0;
200 processRegisterTaskQueue();
201 processWriteTaskQueue();
202 processSelectedKeys(selector.selectedKeys());
203
204
205
206
207
208
209 if (selector.keys().isEmpty()) {
210 if (shutdown ||
211 executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
212
213 synchronized (startStopLock) {
214 if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
215 started = false;
216 try {
217 selector.close();
218 } catch (IOException e) {
219 logger.warn(
220 "Failed to close a selector.", e);
221 } finally {
222 this.selector = null;
223 }
224 break;
225 } else {
226 shutdown = false;
227 }
228 }
229 } else {
230
231 shutdown = true;
232 }
233 } else {
234 shutdown = false;
235 }
236 } catch (Throwable t) {
237 logger.warn(
238 "Unexpected exception in the selector loop.", t);
239
240
241
242 try {
243 Thread.sleep(1000);
244 } catch (InterruptedException e) {
245
246 }
247 }
248 }
249 }
250
251 private void processRegisterTaskQueue() throws IOException {
252 for (;;) {
253 final Runnable task = registerTaskQueue.poll();
254 if (task == null) {
255 break;
256 }
257
258 task.run();
259 cleanUpCancelledKeys();
260 }
261 }
262
263 private void processWriteTaskQueue() throws IOException {
264 for (;;) {
265 final Runnable task = writeTaskQueue.poll();
266 if (task == null) {
267 break;
268 }
269
270 task.run();
271 cleanUpCancelledKeys();
272 }
273 }
274
275 private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
276 for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
277 SelectionKey k = i.next();
278 i.remove();
279 try {
280 int readyOps = k.readyOps();
281 if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
282 if (!read(k)) {
283
284 continue;
285 }
286 }
287 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
288 writeFromSelectorLoop(k);
289 }
290 } catch (CancelledKeyException e) {
291 close(k);
292 }
293
294 if (cleanUpCancelledKeys()) {
295 break;
296 }
297 }
298 }
299
300 private boolean cleanUpCancelledKeys() throws IOException {
301 if (cancelledKeys >= CLEANUP_INTERVAL) {
302 cancelledKeys = 0;
303 selector.selectNow();
304 return true;
305 }
306 return false;
307 }
308
309 private boolean read(SelectionKey k) {
310 final SocketChannel ch = (SocketChannel) k.channel();
311 final NioSocketChannel channel = (NioSocketChannel) k.attachment();
312
313 final ReceiveBufferSizePredictor predictor =
314 channel.getConfig().getReceiveBufferSizePredictor();
315 final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
316
317 int ret = 0;
318 int readBytes = 0;
319 boolean failure = true;
320
321 ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
322 try {
323 while ((ret = ch.read(bb)) > 0) {
324 readBytes += ret;
325 if (!bb.hasRemaining()) {
326 break;
327 }
328 }
329 failure = false;
330 } catch (ClosedChannelException e) {
331
332 } catch (Throwable t) {
333 fireExceptionCaught(channel, t);
334 }
335
336 if (readBytes > 0) {
337 bb.flip();
338
339 final ChannelBufferFactory bufferFactory =
340 channel.getConfig().getBufferFactory();
341 final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
342 buffer.setBytes(0, bb);
343 buffer.writerIndex(readBytes);
344
345 recvBufferPool.release(bb);
346
347
348 predictor.previousReceiveBufferSize(readBytes);
349
350
351 fireMessageReceived(channel, buffer);
352 } else {
353 recvBufferPool.release(bb);
354 }
355
356 if (ret < 0 || failure) {
357 k.cancel();
358 close(channel, succeededFuture(channel));
359 return false;
360 }
361
362 return true;
363 }
364
365 private void close(SelectionKey k) {
366 NioSocketChannel ch = (NioSocketChannel) k.attachment();
367 close(ch, succeededFuture(ch));
368 }
369
370 void writeFromUserCode(final NioSocketChannel channel) {
371 if (!channel.isConnected()) {
372 cleanUpWriteBuffer(channel);
373 return;
374 }
375
376 if (scheduleWriteIfNecessary(channel)) {
377 return;
378 }
379
380
381
382 if (channel.writeSuspended) {
383 return;
384 }
385
386 if (channel.inWriteNowLoop) {
387 return;
388 }
389
390 write0(channel);
391 }
392
393 void writeFromTaskLoop(final NioSocketChannel ch) {
394 if (!ch.writeSuspended) {
395 write0(ch);
396 }
397 }
398
399 void writeFromSelectorLoop(final SelectionKey k) {
400 NioSocketChannel ch = (NioSocketChannel) k.attachment();
401 ch.writeSuspended = false;
402 write0(ch);
403 }
404
405 private boolean scheduleWriteIfNecessary(final NioSocketChannel channel) {
406 final Thread currentThread = Thread.currentThread();
407 final Thread workerThread = thread;
408 if (currentThread != workerThread) {
409 if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
410 boolean offered = writeTaskQueue.offer(channel.writeTask);
411 assert offered;
412 }
413
414 if (!(channel instanceof NioAcceptedSocketChannel) ||
415 ((NioAcceptedSocketChannel) channel).bossThread != currentThread) {
416 final Selector workerSelector = selector;
417 if (workerSelector != null) {
418 if (wakenUp.compareAndSet(false, true)) {
419 workerSelector.wakeup();
420 }
421 }
422 } else {
423
424
425
426
427
428
429
430
431
432 }
433
434 return true;
435 }
436
437 return false;
438 }
439
440 private void write0(NioSocketChannel channel) {
441 boolean open = true;
442 boolean addOpWrite = false;
443 boolean removeOpWrite = false;
444
445 long writtenBytes = 0;
446
447 final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
448 final SocketChannel ch = channel.socket;
449 final Queue<MessageEvent> writeBuffer = channel.writeBuffer;
450 final int writeSpinCount = channel.getConfig().getWriteSpinCount();
451 synchronized (channel.writeLock) {
452 channel.inWriteNowLoop = true;
453 for (;;) {
454 MessageEvent evt = channel.currentWriteEvent;
455 SendBuffer buf;
456 if (evt == null) {
457 if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
458 removeOpWrite = true;
459 channel.writeSuspended = false;
460 break;
461 }
462
463 channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
464 } else {
465 buf = channel.currentWriteBuffer;
466 }
467
468 ChannelFuture future = evt.getFuture();
469 try {
470 long localWrittenBytes = 0;
471 for (int i = writeSpinCount; i > 0; i --) {
472 localWrittenBytes = buf.transferTo(ch);
473 if (localWrittenBytes != 0) {
474 writtenBytes += localWrittenBytes;
475 break;
476 }
477 if (buf.finished()) {
478 break;
479 }
480 }
481
482 if (buf.finished()) {
483
484 buf.release();
485 channel.currentWriteEvent = null;
486 channel.currentWriteBuffer = null;
487 evt = null;
488 buf = null;
489 future.setSuccess();
490 } else {
491
492 addOpWrite = true;
493 channel.writeSuspended = true;
494
495 if (localWrittenBytes > 0) {
496
497 future.setProgress(
498 localWrittenBytes,
499 buf.writtenBytes(), buf.totalBytes());
500 }
501 break;
502 }
503 } catch (AsynchronousCloseException e) {
504
505 } catch (Throwable t) {
506 buf.release();
507 channel.currentWriteEvent = null;
508 channel.currentWriteBuffer = null;
509 buf = null;
510 evt = null;
511 future.setFailure(t);
512 fireExceptionCaught(channel, t);
513 if (t instanceof IOException) {
514 open = false;
515 close(channel, succeededFuture(channel));
516 }
517 }
518 }
519 channel.inWriteNowLoop = false;
520
521
522
523
524
525
526
527 if (open) {
528 if (addOpWrite) {
529 setOpWrite(channel);
530 } else if (removeOpWrite) {
531 clearOpWrite(channel);
532 }
533 }
534 }
535
536 fireWriteComplete(channel, writtenBytes);
537 }
538
539 private void setOpWrite(NioSocketChannel channel) {
540 Selector selector = this.selector;
541 SelectionKey key = channel.socket.keyFor(selector);
542 if (key == null) {
543 return;
544 }
545 if (!key.isValid()) {
546 close(key);
547 return;
548 }
549
550
551
552 synchronized (channel.interestOpsLock) {
553 int interestOps = channel.getRawInterestOps();
554 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
555 interestOps |= SelectionKey.OP_WRITE;
556 key.interestOps(interestOps);
557 channel.setRawInterestOpsNow(interestOps);
558 }
559 }
560 }
561
562 private void clearOpWrite(NioSocketChannel channel) {
563 Selector selector = this.selector;
564 SelectionKey key = channel.socket.keyFor(selector);
565 if (key == null) {
566 return;
567 }
568 if (!key.isValid()) {
569 close(key);
570 return;
571 }
572
573
574
575 synchronized (channel.interestOpsLock) {
576 int interestOps = channel.getRawInterestOps();
577 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
578 interestOps &= ~SelectionKey.OP_WRITE;
579 key.interestOps(interestOps);
580 channel.setRawInterestOpsNow(interestOps);
581 }
582 }
583 }
584
585 void close(NioSocketChannel channel, ChannelFuture future) {
586 boolean connected = channel.isConnected();
587 boolean bound = channel.isBound();
588 try {
589 channel.socket.close();
590 cancelledKeys ++;
591
592 if (channel.setClosed()) {
593 future.setSuccess();
594 if (connected) {
595 fireChannelDisconnected(channel);
596 }
597 if (bound) {
598 fireChannelUnbound(channel);
599 }
600
601 cleanUpWriteBuffer(channel);
602 fireChannelClosed(channel);
603 } else {
604 future.setSuccess();
605 }
606 } catch (Throwable t) {
607 future.setFailure(t);
608 fireExceptionCaught(channel, t);
609 }
610 }
611
612 private void cleanUpWriteBuffer(NioSocketChannel channel) {
613 Exception cause = null;
614 boolean fireExceptionCaught = false;
615
616
617 synchronized (channel.writeLock) {
618 MessageEvent evt = channel.currentWriteEvent;
619 if (evt != null) {
620
621
622 if (channel.isOpen()) {
623 cause = new NotYetConnectedException();
624 } else {
625 cause = new ClosedChannelException();
626 }
627
628 ChannelFuture future = evt.getFuture();
629 channel.currentWriteBuffer.release();
630 channel.currentWriteBuffer = null;
631 channel.currentWriteEvent = null;
632 evt = null;
633 future.setFailure(cause);
634 fireExceptionCaught = true;
635 }
636
637 Queue<MessageEvent> writeBuffer = channel.writeBuffer;
638 if (!writeBuffer.isEmpty()) {
639
640
641 if (cause == null) {
642 if (channel.isOpen()) {
643 cause = new NotYetConnectedException();
644 } else {
645 cause = new ClosedChannelException();
646 }
647 }
648
649 for (;;) {
650 evt = writeBuffer.poll();
651 if (evt == null) {
652 break;
653 }
654 evt.getFuture().setFailure(cause);
655 fireExceptionCaught = true;
656 }
657 }
658 }
659
660 if (fireExceptionCaught) {
661 fireExceptionCaught(channel, cause);
662 }
663 }
664
665 void setInterestOps(
666 NioSocketChannel channel, ChannelFuture future, int interestOps) {
667 boolean changed = false;
668 try {
669
670
671 synchronized (channel.interestOpsLock) {
672 Selector selector = this.selector;
673 SelectionKey key = channel.socket.keyFor(selector);
674
675 if (key == null || selector == null) {
676
677
678 channel.setRawInterestOpsNow(interestOps);
679 return;
680 }
681
682
683 interestOps &= ~Channel.OP_WRITE;
684 interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE;
685
686 switch (CONSTRAINT_LEVEL) {
687 case 0:
688 if (channel.getRawInterestOps() != interestOps) {
689 key.interestOps(interestOps);
690 if (Thread.currentThread() != thread &&
691 wakenUp.compareAndSet(false, true)) {
692 selector.wakeup();
693 }
694 changed = true;
695 }
696 break;
697 case 1:
698 case 2:
699 if (channel.getRawInterestOps() != interestOps) {
700 if (Thread.currentThread() == thread) {
701 key.interestOps(interestOps);
702 changed = true;
703 } else {
704 selectorGuard.readLock().lock();
705 try {
706 if (wakenUp.compareAndSet(false, true)) {
707 selector.wakeup();
708 }
709 key.interestOps(interestOps);
710 changed = true;
711 } finally {
712 selectorGuard.readLock().unlock();
713 }
714 }
715 }
716 break;
717 default:
718 throw new Error();
719 }
720
721 if (changed) {
722 channel.setRawInterestOpsNow(interestOps);
723 }
724 }
725
726 future.setSuccess();
727 if (changed) {
728 fireChannelInterestChanged(channel);
729 }
730 } catch (CancelledKeyException e) {
731
732 ClosedChannelException cce = new ClosedChannelException();
733 future.setFailure(cce);
734 fireExceptionCaught(channel, cce);
735 } catch (Throwable t) {
736 future.setFailure(t);
737 fireExceptionCaught(channel, t);
738 }
739 }
740
741 private final class RegisterTask implements Runnable {
742 private final NioSocketChannel channel;
743 private final ChannelFuture future;
744 private final boolean server;
745
746 RegisterTask(
747 NioSocketChannel channel, ChannelFuture future, boolean server) {
748
749 this.channel = channel;
750 this.future = future;
751 this.server = server;
752 }
753
754 public void run() {
755 SocketAddress localAddress = channel.getLocalAddress();
756 SocketAddress remoteAddress = channel.getRemoteAddress();
757 if (localAddress == null || remoteAddress == null) {
758 if (future != null) {
759 future.setFailure(new ClosedChannelException());
760 }
761 close(channel, succeededFuture(channel));
762 return;
763 }
764
765 try {
766 if (server) {
767 channel.socket.configureBlocking(false);
768 }
769
770 synchronized (channel.interestOpsLock) {
771 channel.socket.register(
772 selector, channel.getRawInterestOps(), channel);
773 }
774 if (future != null) {
775 channel.setConnected();
776 future.setSuccess();
777 }
778 } catch (IOException e) {
779 if (future != null) {
780 future.setFailure(e);
781 }
782 close(channel, succeededFuture(channel));
783 if (!(e instanceof ClosedChannelException)) {
784 throw new ChannelException(
785 "Failed to register a socket to the selector.", e);
786 }
787 }
788
789 if (server || !((NioClientSocketChannel) channel).boundManually) {
790 fireChannelBound(channel, localAddress);
791 }
792 fireChannelConnected(channel, remoteAddress);
793 }
794 }
795 }