1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.oio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.InterruptedIOException;
21 import java.net.DatagramPacket;
22 import java.net.MulticastSocket;
23 import java.net.SocketAddress;
24 import java.nio.ByteBuffer;
25
26 import org.jboss.netty.buffer.ChannelBuffer;
27 import org.jboss.netty.channel.Channel;
28 import org.jboss.netty.channel.ChannelFuture;
29 import org.jboss.netty.channel.ReceiveBufferSizePredictor;
30
31
32
33
34
35
36
37
38
39 class OioDatagramWorker implements Runnable {
40
41 private final OioDatagramChannel channel;
42
43 OioDatagramWorker(OioDatagramChannel channel) {
44 this.channel = channel;
45 }
46
47 public void run() {
48 channel.workerThread = Thread.currentThread();
49 final MulticastSocket socket = channel.socket;
50
51 while (channel.isOpen()) {
52 synchronized (channel.interestOpsLock) {
53 while (!channel.isReadable()) {
54 try {
55
56
57 channel.interestOpsLock.wait();
58 } catch (InterruptedException e) {
59 if (!channel.isOpen()) {
60 break;
61 }
62 }
63 }
64 }
65
66 ReceiveBufferSizePredictor predictor =
67 channel.getConfig().getReceiveBufferSizePredictor();
68
69 byte[] buf = new byte[predictor.nextReceiveBufferSize()];
70 DatagramPacket packet = new DatagramPacket(buf, buf.length);
71 try {
72 socket.receive(packet);
73 } catch (InterruptedIOException e) {
74
75
76 continue;
77 } catch (Throwable t) {
78 if (!channel.socket.isClosed()) {
79 fireExceptionCaught(channel, t);
80 }
81 break;
82 }
83
84 fireMessageReceived(
85 channel,
86 channel.getConfig().getBufferFactory().getBuffer(buf, 0, packet.getLength()),
87 packet.getSocketAddress());
88 }
89
90
91
92 channel.workerThread = null;
93
94
95 close(channel, succeededFuture(channel));
96 }
97
98 static void write(
99 OioDatagramChannel channel, ChannelFuture future,
100 Object message, SocketAddress remoteAddress) {
101 try {
102 ChannelBuffer buf = (ChannelBuffer) message;
103 int offset = buf.readerIndex();
104 int length = buf.readableBytes();
105 ByteBuffer nioBuf = buf.toByteBuffer();
106 DatagramPacket packet;
107 if (nioBuf.hasArray()) {
108
109 packet = new DatagramPacket(
110 nioBuf.array(), nioBuf.arrayOffset() + offset, length);
111 } else {
112
113 byte[] arrayBuf = new byte[length];
114 buf.getBytes(0, arrayBuf);
115 packet = new DatagramPacket(arrayBuf, length);
116 }
117
118 if (remoteAddress != null) {
119 packet.setSocketAddress(remoteAddress);
120 }
121 channel.socket.send(packet);
122 fireWriteComplete(channel, length);
123 future.setSuccess();
124 } catch (Throwable t) {
125 future.setFailure(t);
126 fireExceptionCaught(channel, t);
127 }
128 }
129
130 static void setInterestOps(
131 OioDatagramChannel channel, ChannelFuture future, int interestOps) {
132
133
134 interestOps &= ~Channel.OP_WRITE;
135 interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
136
137 boolean changed = false;
138 try {
139 if (channel.getInterestOps() != interestOps) {
140 if ((interestOps & Channel.OP_READ) != 0) {
141 channel.setInterestOpsNow(Channel.OP_READ);
142 } else {
143 channel.setInterestOpsNow(Channel.OP_NONE);
144 }
145 changed = true;
146 }
147
148 future.setSuccess();
149 if (changed) {
150 synchronized (channel.interestOpsLock) {
151 channel.setInterestOpsNow(interestOps);
152
153
154 Thread currentThread = Thread.currentThread();
155 Thread workerThread = channel.workerThread;
156 if (workerThread != null && currentThread != workerThread) {
157 workerThread.interrupt();
158 }
159 }
160
161 fireChannelInterestChanged(channel);
162 }
163 } catch (Throwable t) {
164 future.setFailure(t);
165 fireExceptionCaught(channel, t);
166 }
167 }
168
169 static void disconnect(OioDatagramChannel channel, ChannelFuture future) {
170 boolean connected = channel.isConnected();
171 try {
172 channel.socket.disconnect();
173 future.setSuccess();
174 if (connected) {
175
176 Thread workerThread = channel.workerThread;
177 if (workerThread != null) {
178 try {
179 workerThread.setName(
180 "Old I/O datagram worker (" + channel + ')');
181 } catch (SecurityException e) {
182
183 }
184 }
185
186
187 fireChannelDisconnected(channel);
188 }
189 } catch (Throwable t) {
190 future.setFailure(t);
191 fireExceptionCaught(channel, t);
192 }
193 }
194
195 static void close(OioDatagramChannel channel, ChannelFuture future) {
196 boolean connected = channel.isConnected();
197 boolean bound = channel.isBound();
198 try {
199 channel.socket.close();
200 if (channel.setClosed()) {
201 future.setSuccess();
202 if (connected) {
203
204 Thread currentThread = Thread.currentThread();
205 Thread workerThread = channel.workerThread;
206 if (workerThread != null && currentThread != workerThread) {
207 workerThread.interrupt();
208 }
209 fireChannelDisconnected(channel);
210 }
211 if (bound) {
212 fireChannelUnbound(channel);
213 }
214 fireChannelClosed(channel);
215 } else {
216 future.setSuccess();
217 }
218 } catch (Throwable t) {
219 future.setFailure(t);
220 fireExceptionCaught(channel, t);
221 }
222 }
223 }