View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.net.InetSocketAddress;
21  import java.net.SocketAddress;
22  import java.util.concurrent.Executor;
23  import java.util.concurrent.atomic.AtomicInteger;
24  
25  import org.jboss.netty.channel.AbstractChannelSink;
26  import org.jboss.netty.channel.ChannelEvent;
27  import org.jboss.netty.channel.ChannelFuture;
28  import org.jboss.netty.channel.ChannelFutureListener;
29  import org.jboss.netty.channel.ChannelPipeline;
30  import org.jboss.netty.channel.ChannelState;
31  import org.jboss.netty.channel.ChannelStateEvent;
32  import org.jboss.netty.channel.MessageEvent;
33  
34  /**
35   * Receives downstream events from a {@link ChannelPipeline}.  It contains
36   * an array of I/O workers.
37   *
38   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
39   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
40   * @author Daniel Bevenius (dbevenius@jboss.com)
41   *
42   * @version $Rev: 2340 $, $Date: 2010-07-07 13:37:10 +0900 (Wed, 07 Jul 2010) $
43   */
44  class NioDatagramPipelineSink extends AbstractChannelSink {
45  
46      private static final AtomicInteger nextId = new AtomicInteger();
47  
48      private final int id = nextId.incrementAndGet();
49      private final NioDatagramWorker[] workers;
50      private final AtomicInteger workerIndex = new AtomicInteger();
51  
52      /**
53       * Creates a new {@link NioDatagramPipelineSink} with a the number of {@link NioDatagramWorker}s specified in workerCount.
54       * The {@link NioDatagramWorker}s take care of reading and writing for the {@link NioDatagramChannel}.
55       *
56       * @param workerExecutor
57       *        the {@link Executor} that will run the {@link NioDatagramWorker}s
58       *        for this sink
59       * @param workerCount
60       *        the number of {@link NioDatagramWorker}s for this sink
61       */
62      NioDatagramPipelineSink(final Executor workerExecutor, final int workerCount) {
63          workers = new NioDatagramWorker[workerCount];
64          for (int i = 0; i < workers.length; i ++) {
65              workers[i] = new NioDatagramWorker(id, i + 1, workerExecutor);
66          }
67      }
68  
69      /**
70       * Handle downstream event.
71       *
72       * @param pipeline the {@link ChannelPipeline} that passes down the
73       *                 downstream event.
74       * @param e The downstream event.
75       */
76      public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e)
77              throws Exception {
78          final NioDatagramChannel channel = (NioDatagramChannel) e.getChannel();
79          final ChannelFuture future = e.getFuture();
80          if (e instanceof ChannelStateEvent) {
81              final ChannelStateEvent stateEvent = (ChannelStateEvent) e;
82              final ChannelState state = stateEvent.getState();
83              final Object value = stateEvent.getValue();
84              switch (state) {
85              case OPEN:
86                  if (Boolean.FALSE.equals(value)) {
87                      channel.worker.close(channel, future);
88                  }
89                  break;
90              case BOUND:
91                  if (value != null) {
92                      bind(channel, future, (InetSocketAddress) value);
93                  } else {
94                      channel.worker.close(channel, future);
95                  }
96                  break;
97              case CONNECTED:
98                  if (value != null) {
99                      connect(channel, future, (InetSocketAddress) value);
100                 } else {
101                     NioDatagramWorker.disconnect(channel, future);
102                 }
103                 break;
104             case INTEREST_OPS:
105                 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
106                 break;
107             }
108         } else if (e instanceof MessageEvent) {
109             final MessageEvent event = (MessageEvent) e;
110             final boolean offered = channel.writeBufferQueue.offer(event);
111             assert offered;
112             channel.worker.writeFromUserCode(channel);
113         }
114     }
115 
116     private void close(NioDatagramChannel channel, ChannelFuture future) {
117         try {
118             channel.getDatagramChannel().socket().close();
119             if (channel.setClosed()) {
120                 future.setSuccess();
121                 if (channel.isBound()) {
122                     fireChannelUnbound(channel);
123                 }
124                 fireChannelClosed(channel);
125             } else {
126                 future.setSuccess();
127             }
128         } catch (final Throwable t) {
129             future.setFailure(t);
130             fireExceptionCaught(channel, t);
131         }
132     }
133 
134     /**
135      * Will bind the DatagramSocket to the passed-in address.
136      * Every call bind will spawn a new thread using the that basically in turn
137      */
138     private void bind(final NioDatagramChannel channel,
139             final ChannelFuture future, final InetSocketAddress address) {
140         boolean bound = false;
141         boolean started = false;
142         try {
143             // First bind the DatagramSocket the specified port.
144             channel.getDatagramChannel().socket().bind(address);
145             bound = true;
146 
147             future.setSuccess();
148             fireChannelBound(channel, address);
149 
150             channel.worker.register(channel, null);
151             started = true;
152         } catch (final Throwable t) {
153             future.setFailure(t);
154             fireExceptionCaught(channel, t);
155         } finally {
156             if (!started && bound) {
157                 close(channel, future);
158             }
159         }
160     }
161 
162     private void connect(
163             NioDatagramChannel channel, ChannelFuture future,
164             SocketAddress remoteAddress) {
165 
166         boolean bound = channel.isBound();
167         boolean connected = false;
168         boolean workerStarted = false;
169 
170         future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
171 
172         // Clear the cached address so that the next getRemoteAddress() call
173         // updates the cache.
174         channel.remoteAddress = null;
175 
176         try {
177             channel.getDatagramChannel().connect(remoteAddress);
178             connected = true;
179 
180             // Fire events.
181             future.setSuccess();
182             if (!bound) {
183                 fireChannelBound(channel, channel.getLocalAddress());
184             }
185             fireChannelConnected(channel, channel.getRemoteAddress());
186 
187             if (!bound) {
188                 channel.worker.register(channel, future);
189             }
190 
191             workerStarted = true;
192         } catch (Throwable t) {
193             future.setFailure(t);
194             fireExceptionCaught(channel, t);
195         } finally {
196             if (connected && !workerStarted) {
197                 channel.worker.close(channel, future);
198             }
199         }
200     }
201 
202     NioDatagramWorker nextWorker() {
203         return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
204     }
205 
206 }