View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.oio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.net.Socket;
22  import java.net.SocketAddress;
23  import java.net.SocketTimeoutException;
24  import java.util.concurrent.Executor;
25  
26  import org.jboss.netty.channel.AbstractChannelSink;
27  import org.jboss.netty.channel.Channel;
28  import org.jboss.netty.channel.ChannelEvent;
29  import org.jboss.netty.channel.ChannelFuture;
30  import org.jboss.netty.channel.ChannelPipeline;
31  import org.jboss.netty.channel.ChannelState;
32  import org.jboss.netty.channel.ChannelStateEvent;
33  import org.jboss.netty.channel.MessageEvent;
34  import org.jboss.netty.logging.InternalLogger;
35  import org.jboss.netty.logging.InternalLoggerFactory;
36  import org.jboss.netty.util.ThreadRenamingRunnable;
37  import org.jboss.netty.util.internal.DeadLockProofWorker;
38  
39  /**
40   *
41   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
42   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
43   *
44   * @version $Rev: 2352 $, $Date: 2010-08-26 12:13:14 +0900 (Thu, 26 Aug 2010) $
45   *
46   */
47  class OioServerSocketPipelineSink extends AbstractChannelSink {
48  
49      static final InternalLogger logger =
50          InternalLoggerFactory.getInstance(OioServerSocketPipelineSink.class);
51  
52      final Executor workerExecutor;
53  
54      OioServerSocketPipelineSink(Executor workerExecutor) {
55          this.workerExecutor = workerExecutor;
56      }
57  
58      public void eventSunk(
59              ChannelPipeline pipeline, ChannelEvent e) throws Exception {
60          Channel channel = e.getChannel();
61          if (channel instanceof OioServerSocketChannel) {
62              handleServerSocket(e);
63          } else if (channel instanceof OioAcceptedSocketChannel) {
64              handleAcceptedSocket(e);
65          }
66      }
67  
68      private void handleServerSocket(ChannelEvent e) {
69          if (!(e instanceof ChannelStateEvent)) {
70              return;
71          }
72  
73          ChannelStateEvent event = (ChannelStateEvent) e;
74          OioServerSocketChannel channel =
75              (OioServerSocketChannel) event.getChannel();
76          ChannelFuture future = event.getFuture();
77          ChannelState state = event.getState();
78          Object value = event.getValue();
79  
80          switch (state) {
81          case OPEN:
82              if (Boolean.FALSE.equals(value)) {
83                  close(channel, future);
84              }
85              break;
86          case BOUND:
87              if (value != null) {
88                  bind(channel, future, (SocketAddress) value);
89              } else {
90                  close(channel, future);
91              }
92              break;
93          }
94      }
95  
96      private void handleAcceptedSocket(ChannelEvent e) {
97          if (e instanceof ChannelStateEvent) {
98              ChannelStateEvent event = (ChannelStateEvent) e;
99              OioAcceptedSocketChannel channel =
100                 (OioAcceptedSocketChannel) event.getChannel();
101             ChannelFuture future = event.getFuture();
102             ChannelState state = event.getState();
103             Object value = event.getValue();
104 
105             switch (state) {
106             case OPEN:
107                 if (Boolean.FALSE.equals(value)) {
108                     OioWorker.close(channel, future);
109                 }
110                 break;
111             case BOUND:
112             case CONNECTED:
113                 if (value == null) {
114                     OioWorker.close(channel, future);
115                 }
116                 break;
117             case INTEREST_OPS:
118                 OioWorker.setInterestOps(channel, future, ((Integer) value).intValue());
119                 break;
120             }
121         } else if (e instanceof MessageEvent) {
122             MessageEvent event = (MessageEvent) e;
123             OioSocketChannel channel = (OioSocketChannel) event.getChannel();
124             ChannelFuture future = event.getFuture();
125             Object message = event.getMessage();
126             OioWorker.write(channel, future, message);
127         }
128     }
129 
130     private void bind(
131             OioServerSocketChannel channel, ChannelFuture future,
132             SocketAddress localAddress) {
133 
134         boolean bound = false;
135         boolean bossStarted = false;
136         try {
137             channel.socket.bind(localAddress, channel.getConfig().getBacklog());
138             bound = true;
139 
140             future.setSuccess();
141             localAddress = channel.getLocalAddress();
142             fireChannelBound(channel, localAddress);
143 
144             Executor bossExecutor =
145                 ((OioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
146             DeadLockProofWorker.start(
147                     bossExecutor,
148                     new ThreadRenamingRunnable(
149                             new Boss(channel),
150                             "Old I/O server boss (" + channel + ')'));
151             bossStarted = true;
152         } catch (Throwable t) {
153             future.setFailure(t);
154             fireExceptionCaught(channel, t);
155         } finally {
156             if (!bossStarted && bound) {
157                 close(channel, future);
158             }
159         }
160     }
161 
162     private void close(OioServerSocketChannel channel, ChannelFuture future) {
163         boolean bound = channel.isBound();
164         try {
165             channel.socket.close();
166 
167             // Make sure the boss thread is not running so that that the future
168             // is notified after a new connection cannot be accepted anymore.
169             // See NETTY-256 for more information.
170             channel.shutdownLock.lock();
171             try {
172                 if (channel.setClosed()) {
173                     future.setSuccess();
174                     if (bound) {
175                         fireChannelUnbound(channel);
176                     }
177                     fireChannelClosed(channel);
178                 } else {
179                     future.setSuccess();
180                 }
181             } finally {
182                 channel.shutdownLock.unlock();
183             }
184         } catch (Throwable t) {
185             future.setFailure(t);
186             fireExceptionCaught(channel, t);
187         }
188     }
189 
190     private final class Boss implements Runnable {
191         private final OioServerSocketChannel channel;
192 
193         Boss(OioServerSocketChannel channel) {
194             this.channel = channel;
195         }
196 
197         public void run() {
198             channel.shutdownLock.lock();
199             try {
200                 while (channel.isBound()) {
201                     try {
202                         Socket acceptedSocket = channel.socket.accept();
203                         try {
204                             ChannelPipeline pipeline =
205                                 channel.getConfig().getPipelineFactory().getPipeline();
206                             final OioAcceptedSocketChannel acceptedChannel =
207                                 new OioAcceptedSocketChannel(
208                                         channel,
209                                         channel.getFactory(),
210                                         pipeline,
211                                         OioServerSocketPipelineSink.this,
212                                         acceptedSocket);
213                             DeadLockProofWorker.start(
214                                     workerExecutor,
215                                     new ThreadRenamingRunnable(
216                                             new OioWorker(acceptedChannel),
217                                             "Old I/O server worker (parentId: " +
218                                             channel.getId() + ", " + channel + ')'));
219                         } catch (Exception e) {
220                             logger.warn(
221                                     "Failed to initialize an accepted socket.", e);
222                             try {
223                                 acceptedSocket.close();
224                             } catch (IOException e2) {
225                                 logger.warn(
226                                         "Failed to close a partially accepted socket.",
227                                         e2);
228                             }
229                         }
230                     } catch (SocketTimeoutException e) {
231                         // Thrown every second to stop when requested.
232                     } catch (Throwable e) {
233                         // Do not log the exception if the server socket was closed
234                         // by a user.
235                         if (!channel.socket.isBound() || channel.socket.isClosed()) {
236                             break;
237                         }
238 
239                         logger.warn(
240                                 "Failed to accept a connection.", e);
241                         try {
242                             Thread.sleep(1000);
243                         } catch (InterruptedException e1) {
244                             // Ignore
245                         }
246                     }
247                 }
248             } finally {
249                 channel.shutdownLock.unlock();
250             }
251         }
252     }
253 }