View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.net.InetAddress;
22  import java.net.InetSocketAddress;
23  import java.net.NetworkInterface;
24  import java.net.SocketAddress;
25  import java.nio.channels.DatagramChannel;
26  import java.util.Queue;
27  import java.util.concurrent.atomic.AtomicBoolean;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  import org.jboss.netty.buffer.ChannelBuffer;
31  import org.jboss.netty.channel.AbstractChannel;
32  import org.jboss.netty.channel.Channel;
33  import org.jboss.netty.channel.ChannelException;
34  import org.jboss.netty.channel.ChannelFactory;
35  import org.jboss.netty.channel.ChannelFuture;
36  import org.jboss.netty.channel.ChannelPipeline;
37  import org.jboss.netty.channel.ChannelSink;
38  import org.jboss.netty.channel.MessageEvent;
39  import org.jboss.netty.channel.socket.DatagramChannelConfig;
40  import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
41  import org.jboss.netty.util.internal.LinkedTransferQueue;
42  import org.jboss.netty.util.internal.ThreadLocalBoolean;
43  
44  /**
45   * Provides an NIO based {@link org.jboss.netty.channel.socket.DatagramChannel}.
46   *
47   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
48   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
49   * @author Daniel Bevenius (dbevenius@jboss.com)
50   *
51   * @version $Rev: 2202 $, $Date: 2010-02-23 16:18:58 +0900 (Tue, 23 Feb 2010) $
52   */
53  class NioDatagramChannel extends AbstractChannel
54                                  implements org.jboss.netty.channel.socket.DatagramChannel {
55  
56      /**
57       * The {@link DatagramChannelConfig}.
58       */
59      private final NioDatagramChannelConfig config;
60  
61      /**
62       * The {@link NioDatagramWorker} for this NioDatagramChannnel.
63       */
64      final NioDatagramWorker worker;
65  
66      /**
67       * The {@link DatagramChannel} that this channel uses.
68       */
69      private final java.nio.channels.DatagramChannel datagramChannel;
70  
71      /**
72       * Monitor object to synchronize access to InterestedOps.
73       */
74      final Object interestOpsLock = new Object();
75  
76      /**
77       * Monitor object for synchronizing access to the {@link WriteRequestQueue}.
78       */
79      final Object writeLock = new Object();
80  
81      /**
82       * WriteTask that performs write operations.
83       */
84      final Runnable writeTask = new WriteTask();
85  
86      /**
87       * Indicates if there is a {@link WriteTask} in the task queue.
88       */
89      final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
90  
91      /**
92       * Queue of write {@link MessageEvent}s.
93       */
94      final Queue<MessageEvent> writeBufferQueue = new WriteRequestQueue();
95  
96      /**
97       * Keeps track of the number of bytes that the {@link WriteRequestQueue} currently
98       * contains.
99       */
100     final AtomicInteger writeBufferSize = new AtomicInteger();
101 
102     /**
103      * Keeps track of the highWaterMark.
104      */
105     final AtomicInteger highWaterMarkCounter = new AtomicInteger();
106 
107     /**
108      * The current write {@link MessageEvent}
109      */
110     MessageEvent currentWriteEvent;
111     SendBuffer currentWriteBuffer;
112 
113     /**
114      * Boolean that indicates that write operation is in progress.
115      */
116     boolean inWriteNowLoop;
117     boolean writeSuspended;
118 
119     private volatile InetSocketAddress localAddress;
120     volatile InetSocketAddress remoteAddress;
121 
122     NioDatagramChannel(final ChannelFactory factory,
123             final ChannelPipeline pipeline, final ChannelSink sink,
124             final NioDatagramWorker worker) {
125         super(null, factory, pipeline, sink);
126         this.worker = worker;
127         datagramChannel = openNonBlockingChannel();
128         config = new DefaultNioDatagramChannelConfig(datagramChannel.socket());
129 
130         fireChannelOpen(this);
131     }
132 
133     private DatagramChannel openNonBlockingChannel() {
134         try {
135             final DatagramChannel channel = DatagramChannel.open();
136             channel.configureBlocking(false);
137             return channel;
138         } catch (final IOException e) {
139             throw new ChannelException("Failed to open a DatagramChannel.", e);
140         }
141     }
142 
143     public InetSocketAddress getLocalAddress() {
144         InetSocketAddress localAddress = this.localAddress;
145         if (localAddress == null) {
146             try {
147                 this.localAddress = localAddress =
148                     (InetSocketAddress) datagramChannel.socket().getLocalSocketAddress();
149             } catch (Throwable t) {
150                 // Sometimes fails on a closed socket in Windows.
151                 return null;
152             }
153         }
154         return localAddress;
155     }
156 
157     public InetSocketAddress getRemoteAddress() {
158         InetSocketAddress remoteAddress = this.remoteAddress;
159         if (remoteAddress == null) {
160             try {
161                 this.remoteAddress = remoteAddress =
162                     (InetSocketAddress) datagramChannel.socket().getRemoteSocketAddress();
163             } catch (Throwable t) {
164                 // Sometimes fails on a closed socket in Windows.
165                 return null;
166             }
167         }
168         return remoteAddress;
169     }
170 
171     public boolean isBound() {
172         return isOpen() && datagramChannel.socket().isBound();
173     }
174 
175     public boolean isConnected() {
176         return datagramChannel.isConnected();
177     }
178 
179     @Override
180     protected boolean setClosed() {
181         return super.setClosed();
182     }
183 
184     public NioDatagramChannelConfig getConfig() {
185         return config;
186     }
187 
188     DatagramChannel getDatagramChannel() {
189         return datagramChannel;
190     }
191 
192     @Override
193     public int getInterestOps() {
194         if (!isOpen()) {
195             return Channel.OP_WRITE;
196         }
197 
198         int interestOps = getRawInterestOps();
199         int writeBufferSize = this.writeBufferSize.get();
200         if (writeBufferSize != 0) {
201             if (highWaterMarkCounter.get() > 0) {
202                 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
203                 if (writeBufferSize >= lowWaterMark) {
204                     interestOps |= Channel.OP_WRITE;
205                 } else {
206                     interestOps &= ~Channel.OP_WRITE;
207                 }
208             } else {
209                 int highWaterMark = getConfig().getWriteBufferHighWaterMark();
210                 if (writeBufferSize >= highWaterMark) {
211                     interestOps |= Channel.OP_WRITE;
212                 } else {
213                     interestOps &= ~Channel.OP_WRITE;
214                 }
215             }
216         } else {
217             interestOps &= ~Channel.OP_WRITE;
218         }
219 
220         return interestOps;
221     }
222 
223     int getRawInterestOps() {
224         return super.getInterestOps();
225     }
226 
227     void setRawInterestOpsNow(int interestOps) {
228         super.setInterestOpsNow(interestOps);
229     }
230 
231     @Override
232     public ChannelFuture write(Object message, SocketAddress remoteAddress) {
233         if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
234             return super.write(message, null);
235         } else {
236             return super.write(message, remoteAddress);
237         }
238     }
239 
240     /**
241      * {@link WriteRequestQueue} is an extension of {@link LinkedTransferQueue}
242      * that adds support for highWaterMark checking of the write buffer size.
243      */
244     private final class WriteRequestQueue extends
245             LinkedTransferQueue<MessageEvent> {
246 
247         private static final long serialVersionUID = 5057413071460766376L;
248 
249         private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
250 
251         WriteRequestQueue() {
252             super();
253         }
254 
255         /**
256          * This method first delegates to {@link LinkedTransferQueue#offer(Object)} and
257          * adds support for keeping track of the size of the this write buffer.
258          */
259         @Override
260         public boolean offer(MessageEvent e) {
261             boolean success = super.offer(e);
262             assert success;
263 
264             int messageSize = getMessageSize(e);
265             int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
266             int highWaterMark = getConfig().getWriteBufferHighWaterMark();
267 
268             if (newWriteBufferSize >= highWaterMark) {
269                 if (newWriteBufferSize - messageSize < highWaterMark) {
270                     highWaterMarkCounter.incrementAndGet();
271                     if (!notifying.get()) {
272                         notifying.set(Boolean.TRUE);
273                         fireChannelInterestChanged(NioDatagramChannel.this);
274                         notifying.set(Boolean.FALSE);
275                     }
276                 }
277             }
278             return true;
279         }
280 
281         /**
282          * This method first delegates to {@link LinkedTransferQueue#poll()} and
283          * adds support for keeping track of the size of the this writebuffers queue.
284          */
285         @Override
286         public MessageEvent poll() {
287             MessageEvent e = super.poll();
288             if (e != null) {
289                 int messageSize = getMessageSize(e);
290                 int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
291                 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
292 
293                 if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
294                     if (newWriteBufferSize + messageSize >= lowWaterMark) {
295                         highWaterMarkCounter.decrementAndGet();
296                         if (isBound() && !notifying.get()) {
297                             notifying.set(Boolean.TRUE);
298                             fireChannelInterestChanged(NioDatagramChannel.this);
299                             notifying.set(Boolean.FALSE);
300                         }
301                     }
302                 }
303             }
304             return e;
305         }
306 
307         private int getMessageSize(MessageEvent e) {
308             Object m = e.getMessage();
309             if (m instanceof ChannelBuffer) {
310                 return ((ChannelBuffer) m).readableBytes();
311             }
312             return 0;
313         }
314     }
315 
316     /**
317      * WriteTask is a simple runnable performs writes by delegating the {@link NioDatagramWorker}.
318      *
319      */
320     private final class WriteTask implements Runnable {
321         WriteTask() {
322             super();
323         }
324 
325         public void run() {
326             writeTaskInTaskQueue.set(false);
327             worker.writeFromTaskLoop(NioDatagramChannel.this);
328         }
329     }
330 
331     public void joinGroup(InetAddress multicastAddress) {
332         throw new UnsupportedOperationException();
333     }
334 
335     public void joinGroup(InetSocketAddress multicastAddress,
336             NetworkInterface networkInterface) {
337         throw new UnsupportedOperationException();
338     }
339 
340     public void leaveGroup(InetAddress multicastAddress) {
341         throw new UnsupportedOperationException();
342     }
343 
344     public void leaveGroup(InetSocketAddress multicastAddress,
345             NetworkInterface networkInterface) {
346         throw new UnsupportedOperationException();
347     }
348 }