1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.SocketAddress;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.AsynchronousCloseException;
24 import java.nio.channels.CancelledKeyException;
25 import java.nio.channels.ClosedChannelException;
26 import java.nio.channels.DatagramChannel;
27 import java.nio.channels.NotYetBoundException;
28 import java.nio.channels.SelectionKey;
29 import java.nio.channels.Selector;
30 import java.util.Iterator;
31 import java.util.Queue;
32 import java.util.Set;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.locks.ReadWriteLock;
37 import java.util.concurrent.locks.ReentrantReadWriteLock;
38
39 import org.jboss.netty.buffer.ChannelBufferFactory;
40 import org.jboss.netty.channel.Channel;
41 import org.jboss.netty.channel.ChannelException;
42 import org.jboss.netty.channel.ChannelFuture;
43 import org.jboss.netty.channel.MessageEvent;
44 import org.jboss.netty.channel.ReceiveBufferSizePredictor;
45 import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
46 import org.jboss.netty.logging.InternalLogger;
47 import org.jboss.netty.logging.InternalLoggerFactory;
48 import org.jboss.netty.util.ThreadRenamingRunnable;
49 import org.jboss.netty.util.internal.LinkedTransferQueue;
50
51
52
53
54
55
56
57
58
59
60
61 class NioDatagramWorker implements Runnable {
62
63
64
65 private static final InternalLogger logger = InternalLoggerFactory
66 .getInstance(NioDatagramWorker.class);
67
68
69
70
71 private final int id;
72
73
74
75
76 private final int bossId;
77
78
79
80
81
82 private final Executor executor;
83
84
85
86
87 private boolean started;
88
89
90
91
92
93 private volatile Thread thread;
94
95
96
97
98 volatile Selector selector;
99
100
101
102
103
104
105
106 private final AtomicBoolean wakenUp = new AtomicBoolean();
107
108
109
110
111 private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
112
113
114
115
116 private final Object startStopLock = new Object();
117
118
119
120
121 private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
122
123
124
125
126 private final Queue<Runnable> writeTaskQueue = new LinkedTransferQueue<Runnable>();
127
128 private volatile int cancelledKeys;
129
130 private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
131
132
133
134
135
136
137
138
139
140 NioDatagramWorker(final int bossId, final int id, final Executor executor) {
141 this.bossId = bossId;
142 this.id = id;
143 this.executor = executor;
144 }
145
146
147
148
149
150
151
152
153 void register(final NioDatagramChannel channel, final ChannelFuture future) {
154 final Runnable channelRegTask = new ChannelRegistionTask(channel,
155 future);
156 Selector selector;
157
158 synchronized (startStopLock) {
159 if (!started) {
160
161 try {
162 this.selector = selector = Selector.open();
163 } catch (final Throwable t) {
164 throw new ChannelException("Failed to create a selector.",
165 t);
166 }
167
168 boolean success = false;
169 try {
170
171 executor.execute(new ThreadRenamingRunnable(this,
172 "New I/O datagram worker #" + bossId + "'-'" + id));
173 success = true;
174 } finally {
175 if (!success) {
176 try {
177
178 selector.close();
179 } catch (final Throwable t) {
180 logger.warn("Failed to close a selector.", t);
181 }
182 this.selector = selector = null;
183
184 }
185 }
186 } else {
187
188 selector = this.selector;
189 }
190 assert selector != null && selector.isOpen();
191
192 started = true;
193
194
195 boolean offered = registerTaskQueue.offer(channelRegTask);
196 assert offered;
197 }
198
199 if (wakenUp.compareAndSet(false, true)) {
200 selector.wakeup();
201 }
202 }
203
204
205
206
207 public void run() {
208
209 thread = Thread.currentThread();
210
211 final Selector selector = this.selector;
212 boolean shutdown = false;
213
214 for (;;) {
215 wakenUp.set(false);
216
217 if (NioProviderMetadata.CONSTRAINT_LEVEL != 0) {
218 selectorGuard.writeLock().lock();
219
220 selectorGuard.writeLock().unlock();
221 }
222
223 try {
224 SelectorUtil.select(selector);
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254 if (wakenUp.get()) {
255 selector.wakeup();
256 }
257
258 cancelledKeys = 0;
259 processRegisterTaskQueue();
260 processWriteTaskQueue();
261 processSelectedKeys(selector.selectedKeys());
262
263
264
265
266
267
268
269 if (selector.keys().isEmpty()) {
270 if (shutdown || executor instanceof ExecutorService &&
271 ((ExecutorService) executor).isShutdown()) {
272 synchronized (startStopLock) {
273 if (registerTaskQueue.isEmpty() &&
274 selector.keys().isEmpty()) {
275 started = false;
276 try {
277 selector.close();
278 } catch (IOException e) {
279 logger.warn("Failed to close a selector.",
280 e);
281 } finally {
282 this.selector = null;
283 }
284 break;
285 } else {
286 shutdown = false;
287 }
288 }
289 } else {
290
291 shutdown = true;
292 }
293 } else {
294 shutdown = false;
295 }
296 } catch (Throwable t) {
297 logger.warn("Unexpected exception in the selector loop.", t);
298
299
300
301 try {
302 Thread.sleep(1000);
303 } catch (InterruptedException e) {
304
305 }
306 }
307 }
308 }
309
310
311
312
313
314 private void processRegisterTaskQueue() throws IOException {
315 for (;;) {
316 final Runnable task = registerTaskQueue.poll();
317 if (task == null) {
318 break;
319 }
320
321 task.run();
322 cleanUpCancelledKeys();
323 }
324 }
325
326
327
328
329 private void processWriteTaskQueue() throws IOException {
330 for (;;) {
331 final Runnable task = writeTaskQueue.poll();
332 if (task == null) {
333 break;
334 }
335
336 task.run();
337 cleanUpCancelledKeys();
338 }
339 }
340
341 private void processSelectedKeys(final Set<SelectionKey> selectedKeys) throws IOException {
342 for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
343 SelectionKey k = i.next();
344 i.remove();
345 try {
346 int readyOps = k.readyOps();
347 if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
348 if (!read(k)) {
349
350 continue;
351 }
352 }
353 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
354 writeFromSelectorLoop(k);
355 }
356 } catch (CancelledKeyException e) {
357 close(k);
358 }
359
360 if (cleanUpCancelledKeys()) {
361 break;
362 }
363 }
364 }
365
366 private boolean cleanUpCancelledKeys() throws IOException {
367 if (cancelledKeys >= NioWorker.CLEANUP_INTERVAL) {
368 cancelledKeys = 0;
369 selector.selectNow();
370 return true;
371 }
372 return false;
373 }
374
375
376
377
378
379
380
381
382 private boolean read(final SelectionKey key) {
383 final NioDatagramChannel channel = (NioDatagramChannel) key.attachment();
384 ReceiveBufferSizePredictor predictor =
385 channel.getConfig().getReceiveBufferSizePredictor();
386 final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
387 final DatagramChannel nioChannel = (DatagramChannel) key.channel();
388
389
390
391
392
393 final ByteBuffer byteBuffer = ByteBuffer.allocate(
394 predictor.nextReceiveBufferSize()).order(bufferFactory.getDefaultOrder());
395
396 boolean failure = true;
397 SocketAddress remoteAddress = null;
398 try {
399
400
401 remoteAddress = nioChannel.receive(byteBuffer);
402 failure = false;
403 } catch (ClosedChannelException e) {
404
405 } catch (Throwable t) {
406 fireExceptionCaught(channel, t);
407 }
408
409 if (remoteAddress != null) {
410
411 byteBuffer.flip();
412
413 int readBytes = byteBuffer.remaining();
414 if (readBytes > 0) {
415
416 predictor.previousReceiveBufferSize(readBytes);
417
418
419 fireMessageReceived(
420 channel, bufferFactory.getBuffer(byteBuffer), remoteAddress);
421 }
422 }
423
424 if (failure) {
425 key.cancel();
426 close(channel, succeededFuture(channel));
427 return false;
428 }
429
430 return true;
431 }
432
433 private void close(SelectionKey k) {
434 final NioDatagramChannel ch = (NioDatagramChannel) k.attachment();
435 close(ch, succeededFuture(ch));
436 }
437
438 void writeFromUserCode(final NioDatagramChannel channel) {
439
440
441
442
443
444 if (!channel.isBound()) {
445 cleanUpWriteBuffer(channel);
446 return;
447 }
448
449 if (scheduleWriteIfNecessary(channel)) {
450 return;
451 }
452
453
454
455 if (channel.writeSuspended) {
456 return;
457 }
458
459 if (channel.inWriteNowLoop) {
460 return;
461 }
462
463 write0(channel);
464 }
465
466 void writeFromTaskLoop(final NioDatagramChannel ch) {
467 if (!ch.writeSuspended) {
468 write0(ch);
469 }
470 }
471
472 void writeFromSelectorLoop(final SelectionKey k) {
473 NioDatagramChannel ch = (NioDatagramChannel) k.attachment();
474 ch.writeSuspended = false;
475 write0(ch);
476 }
477
478 private boolean scheduleWriteIfNecessary(final NioDatagramChannel channel) {
479 final Thread workerThread = thread;
480 if (workerThread == null || Thread.currentThread() != workerThread) {
481 if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
482
483 boolean offered = writeTaskQueue.offer(channel.writeTask);
484 assert offered;
485 }
486
487 final Selector selector = this.selector;
488 if (selector != null) {
489 if (wakenUp.compareAndSet(false, true)) {
490 selector.wakeup();
491 }
492 }
493 return true;
494 }
495
496 return false;
497 }
498
499 private void write0(final NioDatagramChannel channel) {
500
501 boolean addOpWrite = false;
502 boolean removeOpWrite = false;
503
504 long writtenBytes = 0;
505
506 final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
507 final DatagramChannel ch = channel.getDatagramChannel();
508 final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
509 final int writeSpinCount = channel.getConfig().getWriteSpinCount();
510 synchronized (channel.writeLock) {
511
512 channel.inWriteNowLoop = true;
513
514
515 for (;;) {
516 MessageEvent evt = channel.currentWriteEvent;
517 SendBuffer buf;
518 if (evt == null) {
519 if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
520 removeOpWrite = true;
521 channel.writeSuspended = false;
522 break;
523 }
524
525 channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
526 } else {
527 buf = channel.currentWriteBuffer;
528 }
529
530 try {
531 long localWrittenBytes = 0;
532 SocketAddress raddr = evt.getRemoteAddress();
533 if (raddr == null) {
534 for (int i = writeSpinCount; i > 0; i --) {
535 localWrittenBytes = buf.transferTo(ch);
536 if (localWrittenBytes != 0) {
537 writtenBytes += localWrittenBytes;
538 break;
539 }
540 if (buf.finished()) {
541 break;
542 }
543 }
544 } else {
545 for (int i = writeSpinCount; i > 0; i --) {
546 localWrittenBytes = buf.transferTo(ch, raddr);
547 if (localWrittenBytes != 0) {
548 writtenBytes += localWrittenBytes;
549 break;
550 }
551 if (buf.finished()) {
552 break;
553 }
554 }
555 }
556
557 if (localWrittenBytes > 0 || buf.finished()) {
558
559 buf.release();
560 ChannelFuture future = evt.getFuture();
561 channel.currentWriteEvent = null;
562 channel.currentWriteBuffer = null;
563 evt = null;
564 buf = null;
565 future.setSuccess();
566 } else {
567
568 addOpWrite = true;
569 channel.writeSuspended = true;
570 break;
571 }
572 } catch (final AsynchronousCloseException e) {
573
574 } catch (final Throwable t) {
575 buf.release();
576 ChannelFuture future = evt.getFuture();
577 channel.currentWriteEvent = null;
578 channel.currentWriteBuffer = null;
579 buf = null;
580 evt = null;
581 future.setFailure(t);
582 fireExceptionCaught(channel, t);
583 }
584 }
585 channel.inWriteNowLoop = false;
586
587
588
589
590
591
592
593 if (addOpWrite) {
594 setOpWrite(channel);
595 } else if (removeOpWrite) {
596 clearOpWrite(channel);
597 }
598 }
599
600 fireWriteComplete(channel, writtenBytes);
601 }
602
603 private void setOpWrite(final NioDatagramChannel channel) {
604 Selector selector = this.selector;
605 SelectionKey key = channel.getDatagramChannel().keyFor(selector);
606 if (key == null) {
607 return;
608 }
609 if (!key.isValid()) {
610 close(key);
611 return;
612 }
613
614
615
616 synchronized (channel.interestOpsLock) {
617 int interestOps = channel.getRawInterestOps();
618 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
619 interestOps |= SelectionKey.OP_WRITE;
620 key.interestOps(interestOps);
621 channel.setRawInterestOpsNow(interestOps);
622 }
623 }
624 }
625
626 private void clearOpWrite(NioDatagramChannel channel) {
627 Selector selector = this.selector;
628 SelectionKey key = channel.getDatagramChannel().keyFor(selector);
629 if (key == null) {
630 return;
631 }
632 if (!key.isValid()) {
633 close(key);
634 return;
635 }
636
637
638
639 synchronized (channel.interestOpsLock) {
640 int interestOps = channel.getRawInterestOps();
641 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
642 interestOps &= ~SelectionKey.OP_WRITE;
643 key.interestOps(interestOps);
644 channel.setRawInterestOpsNow(interestOps);
645 }
646 }
647 }
648
649 static void disconnect(NioDatagramChannel channel, ChannelFuture future) {
650 boolean connected = channel.isConnected();
651 try {
652 channel.getDatagramChannel().disconnect();
653 future.setSuccess();
654 if (connected) {
655 fireChannelDisconnected(channel);
656 }
657 } catch (Throwable t) {
658 future.setFailure(t);
659 fireExceptionCaught(channel, t);
660 }
661 }
662
663 void close(final NioDatagramChannel channel,
664 final ChannelFuture future) {
665 boolean connected = channel.isConnected();
666 boolean bound = channel.isBound();
667 try {
668 channel.getDatagramChannel().close();
669 cancelledKeys ++;
670
671 if (channel.setClosed()) {
672 future.setSuccess();
673 if (connected) {
674 fireChannelDisconnected(channel);
675 }
676 if (bound) {
677 fireChannelUnbound(channel);
678 }
679
680 cleanUpWriteBuffer(channel);
681 fireChannelClosed(channel);
682 } else {
683 future.setSuccess();
684 }
685 } catch (Throwable t) {
686 future.setFailure(t);
687 fireExceptionCaught(channel, t);
688 }
689 }
690
691 private void cleanUpWriteBuffer(final NioDatagramChannel channel) {
692 Exception cause = null;
693 boolean fireExceptionCaught = false;
694
695
696 synchronized (channel.writeLock) {
697 MessageEvent evt = channel.currentWriteEvent;
698 if (evt != null) {
699
700
701 if (channel.isOpen()) {
702 cause = new NotYetBoundException();
703 } else {
704 cause = new ClosedChannelException();
705 }
706
707 ChannelFuture future = evt.getFuture();
708 channel.currentWriteBuffer.release();
709 channel.currentWriteBuffer = null;
710 channel.currentWriteEvent = null;
711 evt = null;
712 future.setFailure(cause);
713 fireExceptionCaught = true;
714 }
715
716 Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
717 if (!writeBuffer.isEmpty()) {
718
719
720 if (cause == null) {
721 if (channel.isOpen()) {
722 cause = new NotYetBoundException();
723 } else {
724 cause = new ClosedChannelException();
725 }
726 }
727
728 for (;;) {
729 evt = writeBuffer.poll();
730 if (evt == null) {
731 break;
732 }
733 evt.getFuture().setFailure(cause);
734 fireExceptionCaught = true;
735 }
736 }
737 }
738
739 if (fireExceptionCaught) {
740 fireExceptionCaught(channel, cause);
741 }
742 }
743
744 void setInterestOps(final NioDatagramChannel channel,
745 ChannelFuture future, int interestOps) {
746
747 boolean changed = false;
748 try {
749
750
751 synchronized (channel.interestOpsLock) {
752 final Selector selector = this.selector;
753 final SelectionKey key = channel.getDatagramChannel().keyFor(selector);
754
755 if (key == null || selector == null) {
756
757
758 channel.setRawInterestOpsNow(interestOps);
759 return;
760 }
761
762
763 interestOps &= ~Channel.OP_WRITE;
764 interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE;
765
766 switch (NioProviderMetadata.CONSTRAINT_LEVEL) {
767 case 0:
768 if (channel.getRawInterestOps() != interestOps) {
769
770 key.interestOps(interestOps);
771
772
773
774 if (Thread.currentThread() != thread &&
775 wakenUp.compareAndSet(false, true)) {
776 selector.wakeup();
777 }
778 changed = true;
779 }
780 break;
781 case 1:
782 case 2:
783 if (channel.getRawInterestOps() != interestOps) {
784 if (Thread.currentThread() == thread) {
785
786
787 key.interestOps(interestOps);
788 changed = true;
789 } else {
790
791
792 selectorGuard.readLock().lock();
793 try {
794 if (wakenUp.compareAndSet(false, true)) {
795 selector.wakeup();
796 }
797 key.interestOps(interestOps);
798 changed = true;
799 } finally {
800 selectorGuard.readLock().unlock();
801 }
802 }
803 }
804 break;
805 default:
806 throw new Error();
807 }
808 if (changed) {
809 channel.setRawInterestOpsNow(interestOps);
810 }
811 }
812
813 future.setSuccess();
814 if (changed) {
815 fireChannelInterestChanged(channel);
816 }
817 } catch (final CancelledKeyException e) {
818
819 ClosedChannelException cce = new ClosedChannelException();
820 future.setFailure(cce);
821 fireExceptionCaught(channel, cce);
822 } catch (final Throwable t) {
823 future.setFailure(t);
824 fireExceptionCaught(channel, t);
825 }
826 }
827
828
829
830
831
832 private final class ChannelRegistionTask implements Runnable {
833 private final NioDatagramChannel channel;
834
835 private final ChannelFuture future;
836
837 ChannelRegistionTask(final NioDatagramChannel channel,
838 final ChannelFuture future) {
839 this.channel = channel;
840 this.future = future;
841 }
842
843
844
845
846
847
848 public void run() {
849 final SocketAddress localAddress = channel.getLocalAddress();
850 if (localAddress == null) {
851 if (future != null) {
852 future.setFailure(new ClosedChannelException());
853 }
854 close(channel, succeededFuture(channel));
855 return;
856 }
857
858 try {
859 synchronized (channel.interestOpsLock) {
860 channel.getDatagramChannel().register(
861 selector, channel.getRawInterestOps(), channel);
862 }
863 if (future != null) {
864 future.setSuccess();
865 }
866 } catch (final ClosedChannelException e) {
867 if (future != null) {
868 future.setFailure(e);
869 }
870 close(channel, succeededFuture(channel));
871 throw new ChannelException(
872 "Failed to register a socket to the selector.", e);
873 }
874 }
875 }
876 }