View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.net.SocketAddress;
22  import java.net.SocketTimeoutException;
23  import java.nio.channels.CancelledKeyException;
24  import java.nio.channels.ClosedChannelException;
25  import java.nio.channels.ClosedSelectorException;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.nio.channels.SocketChannel;
29  import java.util.concurrent.Executor;
30  import java.util.concurrent.atomic.AtomicInteger;
31  
32  import org.jboss.netty.channel.AbstractChannelSink;
33  import org.jboss.netty.channel.Channel;
34  import org.jboss.netty.channel.ChannelEvent;
35  import org.jboss.netty.channel.ChannelFuture;
36  import org.jboss.netty.channel.ChannelPipeline;
37  import org.jboss.netty.channel.ChannelState;
38  import org.jboss.netty.channel.ChannelStateEvent;
39  import org.jboss.netty.channel.MessageEvent;
40  import org.jboss.netty.logging.InternalLogger;
41  import org.jboss.netty.logging.InternalLoggerFactory;
42  import org.jboss.netty.util.ThreadRenamingRunnable;
43  import org.jboss.netty.util.internal.DeadLockProofWorker;
44  
45  /**
46   *
47   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
48   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
49   *
50   * @version $Rev: 2352 $, $Date: 2010-08-26 12:13:14 +0900 (Thu, 26 Aug 2010) $
51   *
52   */
53  class NioServerSocketPipelineSink extends AbstractChannelSink {
54  
55      static final InternalLogger logger =
56          InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);
57      private static final AtomicInteger nextId = new AtomicInteger();
58  
59      private final int id = nextId.incrementAndGet();
60      private final NioWorker[] workers;
61      private final AtomicInteger workerIndex = new AtomicInteger();
62  
63      NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) {
64          workers = new NioWorker[workerCount];
65          for (int i = 0; i < workers.length; i ++) {
66              workers[i] = new NioWorker(id, i + 1, workerExecutor);
67          }
68      }
69  
70      public void eventSunk(
71              ChannelPipeline pipeline, ChannelEvent e) throws Exception {
72          Channel channel = e.getChannel();
73          if (channel instanceof NioServerSocketChannel) {
74              handleServerSocket(e);
75          } else if (channel instanceof NioSocketChannel) {
76              handleAcceptedSocket(e);
77          }
78      }
79  
80      private void handleServerSocket(ChannelEvent e) {
81          if (!(e instanceof ChannelStateEvent)) {
82              return;
83          }
84  
85          ChannelStateEvent event = (ChannelStateEvent) e;
86          NioServerSocketChannel channel =
87              (NioServerSocketChannel) event.getChannel();
88          ChannelFuture future = event.getFuture();
89          ChannelState state = event.getState();
90          Object value = event.getValue();
91  
92          switch (state) {
93          case OPEN:
94              if (Boolean.FALSE.equals(value)) {
95                  close(channel, future);
96              }
97              break;
98          case BOUND:
99              if (value != null) {
100                 bind(channel, future, (SocketAddress) value);
101             } else {
102                 close(channel, future);
103             }
104             break;
105         }
106     }
107 
108     private void handleAcceptedSocket(ChannelEvent e) {
109         if (e instanceof ChannelStateEvent) {
110             ChannelStateEvent event = (ChannelStateEvent) e;
111             NioSocketChannel channel = (NioSocketChannel) event.getChannel();
112             ChannelFuture future = event.getFuture();
113             ChannelState state = event.getState();
114             Object value = event.getValue();
115 
116             switch (state) {
117             case OPEN:
118                 if (Boolean.FALSE.equals(value)) {
119                     channel.worker.close(channel, future);
120                 }
121                 break;
122             case BOUND:
123             case CONNECTED:
124                 if (value == null) {
125                     channel.worker.close(channel, future);
126                 }
127                 break;
128             case INTEREST_OPS:
129                 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
130                 break;
131             }
132         } else if (e instanceof MessageEvent) {
133             MessageEvent event = (MessageEvent) e;
134             NioSocketChannel channel = (NioSocketChannel) event.getChannel();
135             boolean offered = channel.writeBuffer.offer(event);
136             assert offered;
137             channel.worker.writeFromUserCode(channel);
138         }
139     }
140 
141     private void bind(
142             NioServerSocketChannel channel, ChannelFuture future,
143             SocketAddress localAddress) {
144 
145         boolean bound = false;
146         boolean bossStarted = false;
147         try {
148             channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
149             bound = true;
150 
151             future.setSuccess();
152             fireChannelBound(channel, channel.getLocalAddress());
153 
154             Executor bossExecutor =
155                 ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
156             DeadLockProofWorker.start(
157                     bossExecutor,
158                     new ThreadRenamingRunnable(
159                             new Boss(channel),
160                             "New I/O server boss #" + id + " (" + channel + ')'));
161             bossStarted = true;
162         } catch (Throwable t) {
163             future.setFailure(t);
164             fireExceptionCaught(channel, t);
165         } finally {
166             if (!bossStarted && bound) {
167                 close(channel, future);
168             }
169         }
170     }
171 
172     private void close(NioServerSocketChannel channel, ChannelFuture future) {
173         boolean bound = channel.isBound();
174         try {
175             if (channel.socket.isOpen()) {
176                 channel.socket.close();
177                 Selector selector = channel.selector;
178                 if (selector != null) {
179                     selector.wakeup();
180                 }
181             }
182 
183             // Make sure the boss thread is not running so that that the future
184             // is notified after a new connection cannot be accepted anymore.
185             // See NETTY-256 for more information.
186             channel.shutdownLock.lock();
187             try {
188                 if (channel.setClosed()) {
189                     future.setSuccess();
190                     if (bound) {
191                         fireChannelUnbound(channel);
192                     }
193                     fireChannelClosed(channel);
194                 } else {
195                     future.setSuccess();
196                 }
197             } finally {
198                 channel.shutdownLock.unlock();
199             }
200         } catch (Throwable t) {
201             future.setFailure(t);
202             fireExceptionCaught(channel, t);
203         }
204     }
205 
206     NioWorker nextWorker() {
207         return workers[Math.abs(
208                 workerIndex.getAndIncrement() % workers.length)];
209     }
210 
211     private final class Boss implements Runnable {
212         private final Selector selector;
213         private final NioServerSocketChannel channel;
214 
215         Boss(NioServerSocketChannel channel) throws IOException {
216             this.channel = channel;
217 
218             selector = Selector.open();
219 
220             boolean registered = false;
221             try {
222                 channel.socket.register(selector, SelectionKey.OP_ACCEPT);
223                 registered = true;
224             } finally {
225                 if (!registered) {
226                     closeSelector();
227                 }
228             }
229 
230             channel.selector = selector;
231         }
232 
233         public void run() {
234             final Thread currentThread = Thread.currentThread();
235 
236             channel.shutdownLock.lock();
237             try {
238                 for (;;) {
239                     try {
240                         if (selector.select(1000) > 0) {
241                             selector.selectedKeys().clear();
242                         }
243 
244                         SocketChannel acceptedSocket = channel.socket.accept();
245                         if (acceptedSocket != null) {
246                             registerAcceptedChannel(acceptedSocket, currentThread);
247                         }
248                     } catch (SocketTimeoutException e) {
249                         // Thrown every second to get ClosedChannelException
250                         // raised.
251                     } catch (CancelledKeyException e) {
252                         // Raised by accept() when the server socket was closed.
253                     } catch (ClosedSelectorException e) {
254                         // Raised by accept() when the server socket was closed.
255                     } catch (ClosedChannelException e) {
256                         // Closed as requested.
257                         break;
258                     } catch (Throwable e) {
259                         logger.warn(
260                                 "Failed to accept a connection.", e);
261                         try {
262                             Thread.sleep(1000);
263                         } catch (InterruptedException e1) {
264                             // Ignore
265                         }
266                     }
267                 }
268             } finally {
269                 channel.shutdownLock.unlock();
270                 closeSelector();
271             }
272         }
273 
274         private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
275             try {
276                 ChannelPipeline pipeline =
277                     channel.getConfig().getPipelineFactory().getPipeline();
278                 NioWorker worker = nextWorker();
279                 worker.register(new NioAcceptedSocketChannel(
280                         channel.getFactory(), pipeline, channel,
281                         NioServerSocketPipelineSink.this, acceptedSocket,
282                         worker, currentThread), null);
283             } catch (Exception e) {
284                 logger.warn(
285                         "Failed to initialize an accepted socket.", e);
286                 try {
287                     acceptedSocket.close();
288                 } catch (IOException e2) {
289                     logger.warn(
290                             "Failed to close a partially accepted socket.",
291                             e2);
292                 }
293             }
294         }
295 
296         private void closeSelector() {
297             channel.selector = null;
298             try {
299                 selector.close();
300             } catch (Exception e) {
301                 logger.warn("Failed to close a selector.", e);
302             }
303         }
304     }
305 }