1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.InetAddress;
22 import java.net.InetSocketAddress;
23 import java.net.NetworkInterface;
24 import java.net.SocketAddress;
25 import java.nio.channels.DatagramChannel;
26 import java.util.Queue;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import org.jboss.netty.buffer.ChannelBuffer;
31 import org.jboss.netty.channel.AbstractChannel;
32 import org.jboss.netty.channel.Channel;
33 import org.jboss.netty.channel.ChannelException;
34 import org.jboss.netty.channel.ChannelFactory;
35 import org.jboss.netty.channel.ChannelFuture;
36 import org.jboss.netty.channel.ChannelPipeline;
37 import org.jboss.netty.channel.ChannelSink;
38 import org.jboss.netty.channel.MessageEvent;
39 import org.jboss.netty.channel.socket.DatagramChannelConfig;
40 import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
41 import org.jboss.netty.util.internal.LinkedTransferQueue;
42 import org.jboss.netty.util.internal.ThreadLocalBoolean;
43
44
45
46
47
48
49
50
51
52
53 class NioDatagramChannel extends AbstractChannel
54 implements org.jboss.netty.channel.socket.DatagramChannel {
55
56
57
58
59 private final NioDatagramChannelConfig config;
60
61
62
63
64 final NioDatagramWorker worker;
65
66
67
68
69 private final java.nio.channels.DatagramChannel datagramChannel;
70
71
72
73
74 final Object interestOpsLock = new Object();
75
76
77
78
79 final Object writeLock = new Object();
80
81
82
83
84 final Runnable writeTask = new WriteTask();
85
86
87
88
89 final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
90
91
92
93
94 final Queue<MessageEvent> writeBufferQueue = new WriteRequestQueue();
95
96
97
98
99
100 final AtomicInteger writeBufferSize = new AtomicInteger();
101
102
103
104
105 final AtomicInteger highWaterMarkCounter = new AtomicInteger();
106
107
108
109
110 MessageEvent currentWriteEvent;
111 SendBuffer currentWriteBuffer;
112
113
114
115
116 boolean inWriteNowLoop;
117 boolean writeSuspended;
118
119 private volatile InetSocketAddress localAddress;
120 volatile InetSocketAddress remoteAddress;
121
122 NioDatagramChannel(final ChannelFactory factory,
123 final ChannelPipeline pipeline, final ChannelSink sink,
124 final NioDatagramWorker worker) {
125 super(null, factory, pipeline, sink);
126 this.worker = worker;
127 datagramChannel = openNonBlockingChannel();
128 config = new DefaultNioDatagramChannelConfig(datagramChannel.socket());
129
130 fireChannelOpen(this);
131 }
132
133 private DatagramChannel openNonBlockingChannel() {
134 try {
135 final DatagramChannel channel = DatagramChannel.open();
136 channel.configureBlocking(false);
137 return channel;
138 } catch (final IOException e) {
139 throw new ChannelException("Failed to open a DatagramChannel.", e);
140 }
141 }
142
143 public InetSocketAddress getLocalAddress() {
144 InetSocketAddress localAddress = this.localAddress;
145 if (localAddress == null) {
146 try {
147 this.localAddress = localAddress =
148 (InetSocketAddress) datagramChannel.socket().getLocalSocketAddress();
149 } catch (Throwable t) {
150
151 return null;
152 }
153 }
154 return localAddress;
155 }
156
157 public InetSocketAddress getRemoteAddress() {
158 InetSocketAddress remoteAddress = this.remoteAddress;
159 if (remoteAddress == null) {
160 try {
161 this.remoteAddress = remoteAddress =
162 (InetSocketAddress) datagramChannel.socket().getRemoteSocketAddress();
163 } catch (Throwable t) {
164
165 return null;
166 }
167 }
168 return remoteAddress;
169 }
170
171 public boolean isBound() {
172 return isOpen() && datagramChannel.socket().isBound();
173 }
174
175 public boolean isConnected() {
176 return datagramChannel.isConnected();
177 }
178
179 @Override
180 protected boolean setClosed() {
181 return super.setClosed();
182 }
183
184 public NioDatagramChannelConfig getConfig() {
185 return config;
186 }
187
188 DatagramChannel getDatagramChannel() {
189 return datagramChannel;
190 }
191
192 @Override
193 public int getInterestOps() {
194 if (!isOpen()) {
195 return Channel.OP_WRITE;
196 }
197
198 int interestOps = getRawInterestOps();
199 int writeBufferSize = this.writeBufferSize.get();
200 if (writeBufferSize != 0) {
201 if (highWaterMarkCounter.get() > 0) {
202 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
203 if (writeBufferSize >= lowWaterMark) {
204 interestOps |= Channel.OP_WRITE;
205 } else {
206 interestOps &= ~Channel.OP_WRITE;
207 }
208 } else {
209 int highWaterMark = getConfig().getWriteBufferHighWaterMark();
210 if (writeBufferSize >= highWaterMark) {
211 interestOps |= Channel.OP_WRITE;
212 } else {
213 interestOps &= ~Channel.OP_WRITE;
214 }
215 }
216 } else {
217 interestOps &= ~Channel.OP_WRITE;
218 }
219
220 return interestOps;
221 }
222
223 int getRawInterestOps() {
224 return super.getInterestOps();
225 }
226
227 void setRawInterestOpsNow(int interestOps) {
228 super.setInterestOpsNow(interestOps);
229 }
230
231 @Override
232 public ChannelFuture write(Object message, SocketAddress remoteAddress) {
233 if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
234 return super.write(message, null);
235 } else {
236 return super.write(message, remoteAddress);
237 }
238 }
239
240
241
242
243
244 private final class WriteRequestQueue extends
245 LinkedTransferQueue<MessageEvent> {
246
247 private static final long serialVersionUID = 5057413071460766376L;
248
249 private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
250
251 WriteRequestQueue() {
252 super();
253 }
254
255
256
257
258
259 @Override
260 public boolean offer(MessageEvent e) {
261 boolean success = super.offer(e);
262 assert success;
263
264 int messageSize = getMessageSize(e);
265 int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
266 int highWaterMark = getConfig().getWriteBufferHighWaterMark();
267
268 if (newWriteBufferSize >= highWaterMark) {
269 if (newWriteBufferSize - messageSize < highWaterMark) {
270 highWaterMarkCounter.incrementAndGet();
271 if (!notifying.get()) {
272 notifying.set(Boolean.TRUE);
273 fireChannelInterestChanged(NioDatagramChannel.this);
274 notifying.set(Boolean.FALSE);
275 }
276 }
277 }
278 return true;
279 }
280
281
282
283
284
285 @Override
286 public MessageEvent poll() {
287 MessageEvent e = super.poll();
288 if (e != null) {
289 int messageSize = getMessageSize(e);
290 int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
291 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
292
293 if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
294 if (newWriteBufferSize + messageSize >= lowWaterMark) {
295 highWaterMarkCounter.decrementAndGet();
296 if (isBound() && !notifying.get()) {
297 notifying.set(Boolean.TRUE);
298 fireChannelInterestChanged(NioDatagramChannel.this);
299 notifying.set(Boolean.FALSE);
300 }
301 }
302 }
303 }
304 return e;
305 }
306
307 private int getMessageSize(MessageEvent e) {
308 Object m = e.getMessage();
309 if (m instanceof ChannelBuffer) {
310 return ((ChannelBuffer) m).readableBytes();
311 }
312 return 0;
313 }
314 }
315
316
317
318
319
320 private final class WriteTask implements Runnable {
321 WriteTask() {
322 super();
323 }
324
325 public void run() {
326 writeTaskInTaskQueue.set(false);
327 worker.writeFromTaskLoop(NioDatagramChannel.this);
328 }
329 }
330
331 public void joinGroup(InetAddress multicastAddress) {
332 throw new UnsupportedOperationException();
333 }
334
335 public void joinGroup(InetSocketAddress multicastAddress,
336 NetworkInterface networkInterface) {
337 throw new UnsupportedOperationException();
338 }
339
340 public void leaveGroup(InetAddress multicastAddress) {
341 throw new UnsupportedOperationException();
342 }
343
344 public void leaveGroup(InetSocketAddress multicastAddress,
345 NetworkInterface networkInterface) {
346 throw new UnsupportedOperationException();
347 }
348 }