View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.oio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.InterruptedIOException;
21  import java.net.DatagramPacket;
22  import java.net.MulticastSocket;
23  import java.net.SocketAddress;
24  import java.nio.ByteBuffer;
25  
26  import org.jboss.netty.buffer.ChannelBuffer;
27  import org.jboss.netty.channel.Channel;
28  import org.jboss.netty.channel.ChannelFuture;
29  import org.jboss.netty.channel.ReceiveBufferSizePredictor;
30  
31  /**
32   *
33   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
34   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
35   *
36   * @version $Rev: 2341 $, $Date: 2010-07-07 13:44:23 +0900 (Wed, 07 Jul 2010) $
37   *
38   */
39  class OioDatagramWorker implements Runnable {
40  
41      private final OioDatagramChannel channel;
42  
43      OioDatagramWorker(OioDatagramChannel channel) {
44          this.channel = channel;
45      }
46  
47      public void run() {
48          channel.workerThread = Thread.currentThread();
49          final MulticastSocket socket = channel.socket;
50  
51          while (channel.isOpen()) {
52              synchronized (channel.interestOpsLock) {
53                  while (!channel.isReadable()) {
54                      try {
55                          // notify() is not called at all.
56                          // close() and setInterestOps() calls Thread.interrupt()
57                          channel.interestOpsLock.wait();
58                      } catch (InterruptedException e) {
59                          if (!channel.isOpen()) {
60                              break;
61                          }
62                      }
63                  }
64              }
65  
66              ReceiveBufferSizePredictor predictor =
67                  channel.getConfig().getReceiveBufferSizePredictor();
68  
69              byte[] buf = new byte[predictor.nextReceiveBufferSize()];
70              DatagramPacket packet = new DatagramPacket(buf, buf.length);
71              try {
72                  socket.receive(packet);
73              } catch (InterruptedIOException e) {
74                  // Can happen on interruption.
75                  // Keep receiving unless the channel is closed.
76                  continue;
77              } catch (Throwable t) {
78                  if (!channel.socket.isClosed()) {
79                      fireExceptionCaught(channel, t);
80                  }
81                  break;
82              }
83  
84              fireMessageReceived(
85                      channel,
86                      channel.getConfig().getBufferFactory().getBuffer(buf, 0, packet.getLength()),
87                      packet.getSocketAddress());
88          }
89  
90          // Setting the workerThread to null will prevent any channel
91          // operations from interrupting this thread from now on.
92          channel.workerThread = null;
93  
94          // Clean up.
95          close(channel, succeededFuture(channel));
96      }
97  
98      static void write(
99              OioDatagramChannel channel, ChannelFuture future,
100             Object message, SocketAddress remoteAddress) {
101         try {
102             ChannelBuffer buf = (ChannelBuffer) message;
103             int offset = buf.readerIndex();
104             int length = buf.readableBytes();
105             ByteBuffer nioBuf = buf.toByteBuffer();
106             DatagramPacket packet;
107             if (nioBuf.hasArray()) {
108                 // Avoid copy if the buffer is backed by an array.
109                 packet = new DatagramPacket(
110                         nioBuf.array(), nioBuf.arrayOffset() + offset, length);
111             } else {
112                 // Otherwise it will be expensive.
113                 byte[] arrayBuf = new byte[length];
114                 buf.getBytes(0, arrayBuf);
115                 packet = new DatagramPacket(arrayBuf, length);
116             }
117 
118             if (remoteAddress != null) {
119                 packet.setSocketAddress(remoteAddress);
120             }
121             channel.socket.send(packet);
122             fireWriteComplete(channel, length);
123             future.setSuccess();
124         } catch (Throwable t) {
125             future.setFailure(t);
126             fireExceptionCaught(channel, t);
127         }
128     }
129 
130     static void setInterestOps(
131             OioDatagramChannel channel, ChannelFuture future, int interestOps) {
132 
133         // Override OP_WRITE flag - a user cannot change this flag.
134         interestOps &= ~Channel.OP_WRITE;
135         interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
136 
137         boolean changed = false;
138         try {
139             if (channel.getInterestOps() != interestOps) {
140                 if ((interestOps & Channel.OP_READ) != 0) {
141                     channel.setInterestOpsNow(Channel.OP_READ);
142                 } else {
143                     channel.setInterestOpsNow(Channel.OP_NONE);
144                 }
145                 changed = true;
146             }
147 
148             future.setSuccess();
149             if (changed) {
150                 synchronized (channel.interestOpsLock) {
151                     channel.setInterestOpsNow(interestOps);
152 
153                     // Notify the worker so it stops or continues reading.
154                     Thread currentThread = Thread.currentThread();
155                     Thread workerThread = channel.workerThread;
156                     if (workerThread != null && currentThread != workerThread) {
157                         workerThread.interrupt();
158                     }
159                 }
160 
161                 fireChannelInterestChanged(channel);
162             }
163         } catch (Throwable t) {
164             future.setFailure(t);
165             fireExceptionCaught(channel, t);
166         }
167     }
168 
169     static void disconnect(OioDatagramChannel channel, ChannelFuture future) {
170         boolean connected = channel.isConnected();
171         try {
172             channel.socket.disconnect();
173             future.setSuccess();
174             if (connected) {
175                 // Update the worker's thread name to reflect the state change.
176                 Thread workerThread = channel.workerThread;
177                 if (workerThread != null) {
178                     try {
179                         workerThread.setName(
180                                 "Old I/O datagram worker (" + channel + ')');
181                     } catch (SecurityException e) {
182                         // Ignore.
183                     }
184                 }
185 
186                 // Notify.
187                 fireChannelDisconnected(channel);
188             }
189         } catch (Throwable t) {
190             future.setFailure(t);
191             fireExceptionCaught(channel, t);
192         }
193     }
194 
195     static void close(OioDatagramChannel channel, ChannelFuture future) {
196         boolean connected = channel.isConnected();
197         boolean bound = channel.isBound();
198         try {
199             channel.socket.close();
200             if (channel.setClosed()) {
201                 future.setSuccess();
202                 if (connected) {
203                     // Notify the worker so it stops reading.
204                     Thread currentThread = Thread.currentThread();
205                     Thread workerThread = channel.workerThread;
206                     if (workerThread != null && currentThread != workerThread) {
207                         workerThread.interrupt();
208                     }
209                     fireChannelDisconnected(channel);
210                 }
211                 if (bound) {
212                     fireChannelUnbound(channel);
213                 }
214                 fireChannelClosed(channel);
215             } else {
216                 future.setSuccess();
217             }
218         } catch (Throwable t) {
219             future.setFailure(t);
220             fireExceptionCaught(channel, t);
221         }
222     }
223 }