View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.oio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.PushbackInputStream;
21  import java.net.SocketAddress;
22  import java.util.concurrent.Executor;
23  
24  import org.jboss.netty.channel.AbstractChannelSink;
25  import org.jboss.netty.channel.ChannelEvent;
26  import org.jboss.netty.channel.ChannelFuture;
27  import org.jboss.netty.channel.ChannelFutureListener;
28  import org.jboss.netty.channel.ChannelPipeline;
29  import org.jboss.netty.channel.ChannelState;
30  import org.jboss.netty.channel.ChannelStateEvent;
31  import org.jboss.netty.channel.MessageEvent;
32  import org.jboss.netty.util.ThreadRenamingRunnable;
33  import org.jboss.netty.util.internal.DeadLockProofWorker;
34  
35  /**
36   *
37   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
38   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
39   *
40   * @version $Rev: 2341 $, $Date: 2010-07-07 13:44:23 +0900 (Wed, 07 Jul 2010) $
41   *
42   */
43  class OioClientSocketPipelineSink extends AbstractChannelSink {
44  
45      private final Executor workerExecutor;
46  
47      OioClientSocketPipelineSink(Executor workerExecutor) {
48          this.workerExecutor = workerExecutor;
49      }
50  
51      public void eventSunk(
52              ChannelPipeline pipeline, ChannelEvent e) throws Exception {
53          OioClientSocketChannel channel = (OioClientSocketChannel) e.getChannel();
54          ChannelFuture future = e.getFuture();
55          if (e instanceof ChannelStateEvent) {
56              ChannelStateEvent stateEvent = (ChannelStateEvent) e;
57              ChannelState state = stateEvent.getState();
58              Object value = stateEvent.getValue();
59              switch (state) {
60              case OPEN:
61                  if (Boolean.FALSE.equals(value)) {
62                      OioWorker.close(channel, future);
63                  }
64                  break;
65              case BOUND:
66                  if (value != null) {
67                      bind(channel, future, (SocketAddress) value);
68                  } else {
69                      OioWorker.close(channel, future);
70                  }
71                  break;
72              case CONNECTED:
73                  if (value != null) {
74                      connect(channel, future, (SocketAddress) value);
75                  } else {
76                      OioWorker.close(channel, future);
77                  }
78                  break;
79              case INTEREST_OPS:
80                  OioWorker.setInterestOps(channel, future, ((Integer) value).intValue());
81                  break;
82              }
83          } else if (e instanceof MessageEvent) {
84              OioWorker.write(
85                      channel, future,
86                      ((MessageEvent) e).getMessage());
87          }
88      }
89  
90      private void bind(
91              OioClientSocketChannel channel, ChannelFuture future,
92              SocketAddress localAddress) {
93          try {
94              channel.socket.bind(localAddress);
95              future.setSuccess();
96              fireChannelBound(channel, channel.getLocalAddress());
97          } catch (Throwable t) {
98              future.setFailure(t);
99              fireExceptionCaught(channel, t);
100         }
101     }
102 
103     private void connect(
104             OioClientSocketChannel channel, ChannelFuture future,
105             SocketAddress remoteAddress) {
106 
107         boolean bound = channel.isBound();
108         boolean connected = false;
109         boolean workerStarted = false;
110 
111         future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
112 
113         try {
114             channel.socket.connect(
115                     remoteAddress, channel.getConfig().getConnectTimeoutMillis());
116             connected = true;
117 
118             // Obtain I/O stream.
119             channel.in = new PushbackInputStream(channel.socket.getInputStream(), 1);
120             channel.out = channel.socket.getOutputStream();
121 
122             // Fire events.
123             future.setSuccess();
124             if (!bound) {
125                 fireChannelBound(channel, channel.getLocalAddress());
126             }
127             fireChannelConnected(channel, channel.getRemoteAddress());
128 
129             // Start the business.
130             DeadLockProofWorker.start(
131                     workerExecutor,
132                     new ThreadRenamingRunnable(
133                             new OioWorker(channel),
134                             "Old I/O client worker (" + channel + ')'));
135             workerStarted = true;
136         } catch (Throwable t) {
137             future.setFailure(t);
138             fireExceptionCaught(channel, t);
139         } finally {
140             if (connected && !workerStarted) {
141                 OioWorker.close(channel, future);
142             }
143         }
144     }
145 }