View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.oio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.net.SocketAddress;
21  import java.util.concurrent.Executor;
22  
23  import org.jboss.netty.channel.AbstractChannelSink;
24  import org.jboss.netty.channel.ChannelEvent;
25  import org.jboss.netty.channel.ChannelFuture;
26  import org.jboss.netty.channel.ChannelFutureListener;
27  import org.jboss.netty.channel.ChannelPipeline;
28  import org.jboss.netty.channel.ChannelState;
29  import org.jboss.netty.channel.ChannelStateEvent;
30  import org.jboss.netty.channel.MessageEvent;
31  import org.jboss.netty.util.ThreadRenamingRunnable;
32  import org.jboss.netty.util.internal.DeadLockProofWorker;
33  
34  /**
35   *
36   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
37   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
38   *
39   * @version $Rev: 2341 $, $Date: 2010-07-07 13:44:23 +0900 (Wed, 07 Jul 2010) $
40   *
41   */
42  class OioDatagramPipelineSink extends AbstractChannelSink {
43  
44      private final Executor workerExecutor;
45  
46      OioDatagramPipelineSink(Executor workerExecutor) {
47          this.workerExecutor = workerExecutor;
48      }
49  
50      public void eventSunk(
51              ChannelPipeline pipeline, ChannelEvent e) throws Exception {
52          OioDatagramChannel channel = (OioDatagramChannel) e.getChannel();
53          ChannelFuture future = e.getFuture();
54          if (e instanceof ChannelStateEvent) {
55              ChannelStateEvent stateEvent = (ChannelStateEvent) e;
56              ChannelState state = stateEvent.getState();
57              Object value = stateEvent.getValue();
58              switch (state) {
59              case OPEN:
60                  if (Boolean.FALSE.equals(value)) {
61                      OioDatagramWorker.close(channel, future);
62                  }
63                  break;
64              case BOUND:
65                  if (value != null) {
66                      bind(channel, future, (SocketAddress) value);
67                  } else {
68                      OioDatagramWorker.close(channel, future);
69                  }
70                  break;
71              case CONNECTED:
72                  if (value != null) {
73                      connect(channel, future, (SocketAddress) value);
74                  } else {
75                      OioDatagramWorker.disconnect(channel, future);
76                  }
77                  break;
78              case INTEREST_OPS:
79                  OioDatagramWorker.setInterestOps(channel, future, ((Integer) value).intValue());
80                  break;
81              }
82          } else if (e instanceof MessageEvent) {
83              MessageEvent evt = (MessageEvent) e;
84              OioDatagramWorker.write(
85                      channel, future, evt.getMessage(), evt.getRemoteAddress());
86          }
87      }
88  
89      private void bind(
90              OioDatagramChannel channel, ChannelFuture future,
91              SocketAddress localAddress) {
92          boolean bound = false;
93          boolean workerStarted = false;
94          try {
95              channel.socket.bind(localAddress);
96              bound = true;
97  
98              // Fire events
99              future.setSuccess();
100             fireChannelBound(channel, channel.getLocalAddress());
101 
102             // Start the business.
103             DeadLockProofWorker.start(
104                     workerExecutor,
105                     new ThreadRenamingRunnable(
106                             new OioDatagramWorker(channel),
107                             "Old I/O datagram worker (" + channel + ')'));
108             workerStarted = true;
109         } catch (Throwable t) {
110             future.setFailure(t);
111             fireExceptionCaught(channel, t);
112         } finally {
113             if (bound && !workerStarted) {
114                 OioDatagramWorker.close(channel, future);
115             }
116         }
117     }
118 
119     private void connect(
120             OioDatagramChannel channel, ChannelFuture future,
121             SocketAddress remoteAddress) {
122 
123         boolean bound = channel.isBound();
124         boolean connected = false;
125         boolean workerStarted = false;
126 
127         future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
128 
129         // Clear the cached address so that the next getRemoteAddress() call
130         // updates the cache.
131         channel.remoteAddress = null;
132 
133         try {
134             channel.socket.connect(remoteAddress);
135             connected = true;
136 
137             // Fire events.
138             future.setSuccess();
139             if (!bound) {
140                 fireChannelBound(channel, channel.getLocalAddress());
141             }
142             fireChannelConnected(channel, channel.getRemoteAddress());
143 
144             String threadName = "Old I/O datagram worker (" + channel + ')';
145             if (!bound) {
146                 // Start the business.
147                 DeadLockProofWorker.start(
148                         workerExecutor,
149                         new ThreadRenamingRunnable(
150                                 new OioDatagramWorker(channel), threadName));
151             } else {
152                 // Worker started by bind() - just rename.
153                 Thread workerThread = channel.workerThread;
154                 if (workerThread != null) {
155                     try {
156                         workerThread.setName(threadName);
157                     } catch (SecurityException e) {
158                         // Ignore.
159                     }
160                 }
161             }
162 
163             workerStarted = true;
164         } catch (Throwable t) {
165             future.setFailure(t);
166             fireExceptionCaught(channel, t);
167         } finally {
168             if (connected && !workerStarted) {
169                 OioDatagramWorker.close(channel, future);
170             }
171         }
172     }
173 }