View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.oio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.OutputStream;
21  import java.io.PushbackInputStream;
22  import java.net.SocketException;
23  import java.nio.channels.Channels;
24  import java.nio.channels.ClosedChannelException;
25  import java.nio.channels.WritableByteChannel;
26  import java.util.regex.Pattern;
27  
28  import org.jboss.netty.buffer.ChannelBuffer;
29  import org.jboss.netty.channel.Channel;
30  import org.jboss.netty.channel.ChannelFuture;
31  import org.jboss.netty.channel.DefaultFileRegion;
32  import org.jboss.netty.channel.FileRegion;
33  
34  /**
35   *
36   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
37   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
38   *
39   * @version $Rev: 2307 $, $Date: 2010-06-16 12:33:29 +0900 (Wed, 16 Jun 2010) $
40   *
41   */
42  class OioWorker implements Runnable {
43  
44      private static final Pattern SOCKET_CLOSED_MESSAGE = Pattern.compile(
45              "^.*(?:Socket.*closed).*$", Pattern.CASE_INSENSITIVE);
46  
47      private final OioSocketChannel channel;
48  
49      OioWorker(OioSocketChannel channel) {
50          this.channel = channel;
51      }
52  
53      public void run() {
54          channel.workerThread = Thread.currentThread();
55          final PushbackInputStream in = channel.getInputStream();
56  
57          boolean fireConnected = channel instanceof OioAcceptedSocketChannel;
58  
59          while (channel.isOpen()) {
60              if (fireConnected) {
61                  fireConnected = false;
62                  fireChannelConnected(channel, channel.getRemoteAddress());
63              }
64              synchronized (channel.interestOpsLock) {
65                  while (!channel.isReadable()) {
66                      try {
67                          // notify() is not called at all.
68                          // close() and setInterestOps() calls Thread.interrupt()
69                          channel.interestOpsLock.wait();
70                      } catch (InterruptedException e) {
71                          if (!channel.isOpen()) {
72                              break;
73                          }
74                      }
75                  }
76              }
77  
78              byte[] buf;
79              int readBytes;
80              try {
81                  int bytesToRead = in.available();
82                  if (bytesToRead > 0) {
83                      buf = new byte[bytesToRead];
84                      readBytes = in.read(buf);
85                  } else {
86                      int b = in.read();
87                      if (b < 0) {
88                          break;
89                      }
90                      in.unread(b);
91                      continue;
92                  }
93              } catch (Throwable t) {
94                  if (!channel.socket.isClosed()) {
95                      fireExceptionCaught(channel, t);
96                  }
97                  break;
98              }
99  
100             fireMessageReceived(
101                     channel,
102                     channel.getConfig().getBufferFactory().getBuffer(buf, 0, readBytes));
103         }
104 
105         // Setting the workerThread to null will prevent any channel
106         // operations from interrupting this thread from now on.
107         channel.workerThread = null;
108 
109         // Clean up.
110         close(channel, succeededFuture(channel));
111     }
112 
113     static void write(
114             OioSocketChannel channel, ChannelFuture future,
115             Object message) {
116 
117         OutputStream out = channel.getOutputStream();
118         if (out == null) {
119             Exception e = new ClosedChannelException();
120             future.setFailure(e);
121             fireExceptionCaught(channel, e);
122             return;
123         }
124 
125         try {
126             int length = 0;
127 
128             // Add support to write a FileRegion. This in fact will not give any performance gain but at least it not fail and 
129             // we did the best to emulate it
130             if (message instanceof FileRegion) {
131                 FileRegion fr = (FileRegion) message;
132                 try {
133                     synchronized (out) {
134                         WritableByteChannel  bchannel = Channels.newChannel(out);
135                         
136                         long i = 0;
137                         while ((i = fr.transferTo(bchannel, length)) > 0) {
138                             length += i;
139                             if (length >= fr.getCount()) {
140                                 break;
141                             }
142                         }
143                     }
144                 } finally {
145                     if (fr instanceof DefaultFileRegion) {
146                         if (((DefaultFileRegion)fr).releaseAfterTransfer()) {
147                             fr.releaseExternalResources();
148                         }
149                     }
150 
151                 }
152             } else {
153                 ChannelBuffer a = (ChannelBuffer) message;
154                 length = a.readableBytes();
155                 synchronized (out) {
156                     a.getBytes(a.readerIndex(), out, length);
157                 }
158             }
159 
160             fireWriteComplete(channel, length);
161             future.setSuccess();
162             
163         } catch (Throwable t) {
164             // Convert 'SocketException: Socket closed' to
165             // ClosedChannelException.
166             if (t instanceof SocketException &&
167                     SOCKET_CLOSED_MESSAGE.matcher(
168                             String.valueOf(t.getMessage())).matches()) {
169                 t = new ClosedChannelException();
170             }
171             future.setFailure(t);
172             fireExceptionCaught(channel, t);
173         }
174     }
175 
176     static void setInterestOps(
177             OioSocketChannel channel, ChannelFuture future, int interestOps) {
178 
179         // Override OP_WRITE flag - a user cannot change this flag.
180         interestOps &= ~Channel.OP_WRITE;
181         interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
182 
183         boolean changed = false;
184         try {
185             if (channel.getInterestOps() != interestOps) {
186                 if ((interestOps & Channel.OP_READ) != 0) {
187                     channel.setInterestOpsNow(Channel.OP_READ);
188                 } else {
189                     channel.setInterestOpsNow(Channel.OP_NONE);
190                 }
191                 changed = true;
192             }
193 
194             future.setSuccess();
195             if (changed) {
196                 synchronized (channel.interestOpsLock) {
197                     channel.setInterestOpsNow(interestOps);
198 
199                     // Notify the worker so it stops or continues reading.
200                     Thread currentThread = Thread.currentThread();
201                     Thread workerThread = channel.workerThread;
202                     if (workerThread != null && currentThread != workerThread) {
203                         workerThread.interrupt();
204                     }
205                 }
206 
207                 fireChannelInterestChanged(channel);
208             }
209         } catch (Throwable t) {
210             future.setFailure(t);
211             fireExceptionCaught(channel, t);
212         }
213     }
214 
215     static void close(OioSocketChannel channel, ChannelFuture future) {
216         boolean connected = channel.isConnected();
217         boolean bound = channel.isBound();
218         try {
219             channel.socket.close();
220             if (channel.setClosed()) {
221                 future.setSuccess();
222                 if (connected) {
223                     // Notify the worker so it stops reading.
224                     Thread currentThread = Thread.currentThread();
225                     Thread workerThread = channel.workerThread;
226                     if (workerThread != null && currentThread != workerThread) {
227                         workerThread.interrupt();
228                     }
229                     fireChannelDisconnected(channel);
230                 }
231                 if (bound) {
232                     fireChannelUnbound(channel);
233                 }
234                 fireChannelClosed(channel);
235             } else {
236                 future.setSuccess();
237             }
238         } catch (Throwable t) {
239             future.setFailure(t);
240             fireExceptionCaught(channel, t);
241         }
242     }
243 }