View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.net.ConnectException;
22  import java.net.SocketAddress;
23  import java.nio.channels.ClosedChannelException;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.Selector;
26  import java.util.Iterator;
27  import java.util.Queue;
28  import java.util.Set;
29  import java.util.concurrent.Executor;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import org.jboss.netty.channel.AbstractChannelSink;
35  import org.jboss.netty.channel.ChannelEvent;
36  import org.jboss.netty.channel.ChannelException;
37  import org.jboss.netty.channel.ChannelFuture;
38  import org.jboss.netty.channel.ChannelFutureListener;
39  import org.jboss.netty.channel.ChannelPipeline;
40  import org.jboss.netty.channel.ChannelState;
41  import org.jboss.netty.channel.ChannelStateEvent;
42  import org.jboss.netty.channel.MessageEvent;
43  import org.jboss.netty.logging.InternalLogger;
44  import org.jboss.netty.logging.InternalLoggerFactory;
45  import org.jboss.netty.util.ThreadRenamingRunnable;
46  import org.jboss.netty.util.internal.DeadLockProofWorker;
47  import org.jboss.netty.util.internal.LinkedTransferQueue;
48  
49  /**
50   *
51   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
52   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
53   *
54   * @version $Rev: 2144 $, $Date: 2010-02-09 12:41:12 +0900 (Tue, 09 Feb 2010) $
55   *
56   */
57  class NioClientSocketPipelineSink extends AbstractChannelSink {
58  
59      static final InternalLogger logger =
60          InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
61      private static final AtomicInteger nextId = new AtomicInteger();
62  
63      final int id = nextId.incrementAndGet();
64      final Executor bossExecutor;
65      private final Boss boss = new Boss();
66      private final NioWorker[] workers;
67      private final AtomicInteger workerIndex = new AtomicInteger();
68  
69      NioClientSocketPipelineSink(
70              Executor bossExecutor, Executor workerExecutor, int workerCount) {
71          this.bossExecutor = bossExecutor;
72          workers = new NioWorker[workerCount];
73          for (int i = 0; i < workers.length; i ++) {
74              workers[i] = new NioWorker(id, i + 1, workerExecutor);
75          }
76      }
77  
78      public void eventSunk(
79              ChannelPipeline pipeline, ChannelEvent e) throws Exception {
80          if (e instanceof ChannelStateEvent) {
81              ChannelStateEvent event = (ChannelStateEvent) e;
82              NioClientSocketChannel channel =
83                  (NioClientSocketChannel) event.getChannel();
84              ChannelFuture future = event.getFuture();
85              ChannelState state = event.getState();
86              Object value = event.getValue();
87  
88              switch (state) {
89              case OPEN:
90                  if (Boolean.FALSE.equals(value)) {
91                      channel.worker.close(channel, future);
92                  }
93                  break;
94              case BOUND:
95                  if (value != null) {
96                      bind(channel, future, (SocketAddress) value);
97                  } else {
98                      channel.worker.close(channel, future);
99                  }
100                 break;
101             case CONNECTED:
102                 if (value != null) {
103                     connect(channel, future, (SocketAddress) value);
104                 } else {
105                     channel.worker.close(channel, future);
106                 }
107                 break;
108             case INTEREST_OPS:
109                 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
110                 break;
111             }
112         } else if (e instanceof MessageEvent) {
113             MessageEvent event = (MessageEvent) e;
114             NioSocketChannel channel = (NioSocketChannel) event.getChannel();
115             boolean offered = channel.writeBuffer.offer(event);
116             assert offered;
117             channel.worker.writeFromUserCode(channel);
118         }
119     }
120 
121     private void bind(
122             NioClientSocketChannel channel, ChannelFuture future,
123             SocketAddress localAddress) {
124         try {
125             channel.socket.socket().bind(localAddress);
126             channel.boundManually = true;
127             channel.setBound();
128             future.setSuccess();
129             fireChannelBound(channel, channel.getLocalAddress());
130         } catch (Throwable t) {
131             future.setFailure(t);
132             fireExceptionCaught(channel, t);
133         }
134     }
135 
136     private void connect(
137             final NioClientSocketChannel channel, final ChannelFuture cf,
138             SocketAddress remoteAddress) {
139         try {
140             if (channel.socket.connect(remoteAddress)) {
141                 channel.worker.register(channel, cf);
142             } else {
143                 channel.getCloseFuture().addListener(new ChannelFutureListener() {
144                     public void operationComplete(ChannelFuture f)
145                             throws Exception {
146                         if (!cf.isDone()) {
147                             cf.setFailure(new ClosedChannelException());
148                         }
149                     }
150                 });
151                 cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
152                 channel.connectFuture = cf;
153                 boss.register(channel);
154             }
155 
156         } catch (Throwable t) {
157             cf.setFailure(t);
158             fireExceptionCaught(channel, t);
159             channel.worker.close(channel, succeededFuture(channel));
160         }
161     }
162 
163     NioWorker nextWorker() {
164         return workers[Math.abs(
165                 workerIndex.getAndIncrement() % workers.length)];
166     }
167 
168     private final class Boss implements Runnable {
169 
170         volatile Selector selector;
171         private boolean started;
172         private final AtomicBoolean wakenUp = new AtomicBoolean();
173         private final Object startStopLock = new Object();
174         private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
175 
176         Boss() {
177             super();
178         }
179 
180         void register(NioClientSocketChannel channel) {
181             Runnable registerTask = new RegisterTask(this, channel);
182             Selector selector;
183 
184             synchronized (startStopLock) {
185                 if (!started) {
186                     // Open a selector if this worker didn't start yet.
187                     try {
188                         this.selector = selector =  Selector.open();
189                     } catch (Throwable t) {
190                         throw new ChannelException(
191                                 "Failed to create a selector.", t);
192                     }
193 
194                     // Start the worker thread with the new Selector.
195                     boolean success = false;
196                     try {
197                         DeadLockProofWorker.start(
198                                 bossExecutor,
199                                 new ThreadRenamingRunnable(
200                                         this, "New I/O client boss #" + id));
201                         success = true;
202                     } finally {
203                         if (!success) {
204                             // Release the Selector if the execution fails.
205                             try {
206                                 selector.close();
207                             } catch (Throwable t) {
208                                 logger.warn("Failed to close a selector.", t);
209                             }
210                             this.selector = selector = null;
211                             // The method will return to the caller at this point.
212                         }
213                     }
214                 } else {
215                     // Use the existing selector if this worker has been started.
216                     selector = this.selector;
217                 }
218 
219                 assert selector != null && selector.isOpen();
220 
221                 started = true;
222                 boolean offered = registerTaskQueue.offer(registerTask);
223                 assert offered;
224             }
225 
226             if (wakenUp.compareAndSet(false, true)) {
227                 selector.wakeup();
228             }
229         }
230 
231         public void run() {
232             boolean shutdown = false;
233             Selector selector = this.selector;
234             long lastConnectTimeoutCheckTimeNanos = System.nanoTime();
235             for (;;) {
236                 wakenUp.set(false);
237 
238                 try {
239                     int selectedKeyCount = selector.select(500);
240 
241                     // 'wakenUp.compareAndSet(false, true)' is always evaluated
242                     // before calling 'selector.wakeup()' to reduce the wake-up
243                     // overhead. (Selector.wakeup() is an expensive operation.)
244                     //
245                     // However, there is a race condition in this approach.
246                     // The race condition is triggered when 'wakenUp' is set to
247                     // true too early.
248                     //
249                     // 'wakenUp' is set to true too early if:
250                     // 1) Selector is waken up between 'wakenUp.set(false)' and
251                     //    'selector.select(...)'. (BAD)
252                     // 2) Selector is waken up between 'selector.select(...)' and
253                     //    'if (wakenUp.get()) { ... }'. (OK)
254                     //
255                     // In the first case, 'wakenUp' is set to true and the
256                     // following 'selector.select(...)' will wake up immediately.
257                     // Until 'wakenUp' is set to false again in the next round,
258                     // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
259                     // any attempt to wake up the Selector will fail, too, causing
260                     // the following 'selector.select(...)' call to block
261                     // unnecessarily.
262                     //
263                     // To fix this problem, we wake up the selector again if wakenUp
264                     // is true immediately after selector.select(...).
265                     // It is inefficient in that it wakes up the selector for both
266                     // the first case (BAD - wake-up required) and the second case
267                     // (OK - no wake-up required).
268 
269                     if (wakenUp.get()) {
270                         selector.wakeup();
271                     }
272 
273                     processRegisterTaskQueue();
274 
275                     if (selectedKeyCount > 0) {
276                         processSelectedKeys(selector.selectedKeys());
277                     }
278 
279                     // Handle connection timeout every 0.5 seconds approximately.
280                     long currentTimeNanos = System.nanoTime();
281                     if (currentTimeNanos - lastConnectTimeoutCheckTimeNanos >= 500 * 1000000L) {
282                         lastConnectTimeoutCheckTimeNanos = currentTimeNanos;
283                         processConnectTimeout(selector.keys(), currentTimeNanos);
284                     }
285 
286                     // Exit the loop when there's nothing to handle.
287                     // The shutdown flag is used to delay the shutdown of this
288                     // loop to avoid excessive Selector creation when
289                     // connection attempts are made in a one-by-one manner
290                     // instead of concurrent manner.
291                     if (selector.keys().isEmpty()) {
292                         if (shutdown ||
293                             bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) {
294 
295                             synchronized (startStopLock) {
296                                 if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
297                                     started = false;
298                                     try {
299                                         selector.close();
300                                     } catch (IOException e) {
301                                         logger.warn(
302                                                 "Failed to close a selector.", e);
303                                     } finally {
304                                         this.selector = null;
305                                     }
306                                     break;
307                                 } else {
308                                     shutdown = false;
309                                 }
310                             }
311                         } else {
312                             // Give one more second.
313                             shutdown = true;
314                         }
315                     } else {
316                         shutdown = false;
317                     }
318                 } catch (Throwable t) {
319                     logger.warn(
320                             "Unexpected exception in the selector loop.", t);
321 
322                     // Prevent possible consecutive immediate failures.
323                     try {
324                         Thread.sleep(1000);
325                     } catch (InterruptedException e) {
326                         // Ignore.
327                     }
328                 }
329             }
330         }
331 
332         private void processRegisterTaskQueue() {
333             for (;;) {
334                 final Runnable task = registerTaskQueue.poll();
335                 if (task == null) {
336                     break;
337                 }
338 
339                 task.run();
340             }
341         }
342 
343         private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
344             for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
345                 SelectionKey k = i.next();
346                 i.remove();
347 
348                 if (!k.isValid()) {
349                     close(k);
350                     continue;
351                 }
352 
353                 if (k.isConnectable()) {
354                     connect(k);
355                 }
356             }
357         }
358 
359         private void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) {
360             ConnectException cause = null;
361             for (SelectionKey k: keys) {
362                 if (!k.isValid()) {
363                     continue;
364                 }
365 
366                 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
367                 if (ch.connectDeadlineNanos > 0 &&
368                         currentTimeNanos >= ch.connectDeadlineNanos) {
369 
370                     if (cause == null) {
371                         cause = new ConnectException("connection timed out");
372                     }
373 
374                     ch.connectFuture.setFailure(cause);
375                     fireExceptionCaught(ch, cause);
376                     ch.worker.close(ch, succeededFuture(ch));
377                 }
378             }
379         }
380 
381         private void connect(SelectionKey k) {
382             NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
383             try {
384                 if (ch.socket.finishConnect()) {
385                     k.cancel();
386                     ch.worker.register(ch, ch.connectFuture);
387                 }
388             } catch (Throwable t) {
389                 ch.connectFuture.setFailure(t);
390                 fireExceptionCaught(ch, t);
391                 k.cancel(); // Some JDK implementations run into an infinite loop without this.
392                 ch.worker.close(ch, succeededFuture(ch));
393             }
394         }
395 
396         private void close(SelectionKey k) {
397             NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
398             ch.worker.close(ch, succeededFuture(ch));
399         }
400     }
401 
402     private static final class RegisterTask implements Runnable {
403         private final Boss boss;
404         private final NioClientSocketChannel channel;
405 
406         RegisterTask(Boss boss, NioClientSocketChannel channel) {
407             this.boss = boss;
408             this.channel = channel;
409         }
410 
411         public void run() {
412             try {
413                 channel.socket.register(
414                         boss.selector, SelectionKey.OP_CONNECT, channel);
415             } catch (ClosedChannelException e) {
416                 channel.worker.close(channel, succeededFuture(channel));
417             }
418 
419             int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
420             if (connectTimeout > 0) {
421                 channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
422             }
423         }
424     }
425 }