View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.local;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.net.ConnectException;
22  
23  import org.jboss.netty.channel.AbstractChannelSink;
24  import org.jboss.netty.channel.Channel;
25  import org.jboss.netty.channel.ChannelEvent;
26  import org.jboss.netty.channel.ChannelException;
27  import org.jboss.netty.channel.ChannelFuture;
28  import org.jboss.netty.channel.ChannelPipeline;
29  import org.jboss.netty.channel.ChannelState;
30  import org.jboss.netty.channel.ChannelStateEvent;
31  import org.jboss.netty.channel.MessageEvent;
32  import org.jboss.netty.logging.InternalLogger;
33  import org.jboss.netty.logging.InternalLoggerFactory;
34  
35  /**
36   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
37   * @author Andy Taylor (andy.taylor@jboss.org)
38   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
39   * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $
40   */
41  final class LocalClientChannelSink extends AbstractChannelSink {
42  
43      private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalClientChannelSink.class);
44  
45      LocalClientChannelSink() {
46          super();
47      }
48  
49      public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
50          if (e instanceof ChannelStateEvent) {
51              ChannelStateEvent event = (ChannelStateEvent) e;
52  
53              DefaultLocalChannel channel =
54                    (DefaultLocalChannel) event.getChannel();
55              ChannelFuture future = event.getFuture();
56              ChannelState state = event.getState();
57              Object value = event.getValue();
58              switch (state) {
59              case OPEN:
60                  if (Boolean.FALSE.equals(value)) {
61                      channel.closeNow(future);
62                  }
63                  break;
64              case BOUND:
65                  if (value != null) {
66                      bind(channel, future, (LocalAddress) value);
67                  } else {
68                      channel.closeNow(future);
69                  }
70                  break;
71              case CONNECTED:
72                  if (value != null) {
73                      connect(channel, future, (LocalAddress) value);
74                  } else {
75                      channel.closeNow(future);
76                  }
77                  break;
78              case INTEREST_OPS:
79                  // Unsupported - discard silently.
80                  future.setSuccess();
81                  break;
82              }
83          }
84          else if (e instanceof MessageEvent) {
85              MessageEvent event = (MessageEvent) e;
86              DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
87              boolean offered = channel.writeBuffer.offer(event);
88              assert offered;
89              channel.flushWriteBuffer();
90          }
91      }
92  
93      private void bind(DefaultLocalChannel channel, ChannelFuture future, LocalAddress localAddress) {
94          try {
95              if (!LocalChannelRegistry.register(localAddress, channel)) {
96                  throw new ChannelException("address already in use: " + localAddress);
97              }
98  
99              channel.setBound();
100             channel.localAddress = localAddress;
101             future.setSuccess();
102             fireChannelBound(channel, localAddress);
103         } catch (Throwable t) {
104             LocalChannelRegistry.unregister(localAddress);
105             future.setFailure(t);
106             fireExceptionCaught(channel, t);
107         }
108     }
109 
110     private void connect(DefaultLocalChannel channel, ChannelFuture future, LocalAddress remoteAddress) {
111         Channel remoteChannel = LocalChannelRegistry.getChannel(remoteAddress);
112         if (!(remoteChannel instanceof DefaultLocalServerChannel)) {
113             future.setFailure(new ConnectException("connection refused"));
114             return;
115         }
116 
117         DefaultLocalServerChannel serverChannel = (DefaultLocalServerChannel) remoteChannel;
118         ChannelPipeline pipeline;
119         try {
120             pipeline = serverChannel.getConfig().getPipelineFactory().getPipeline();
121         } catch (Exception e) {
122             future.setFailure(e);
123             fireExceptionCaught(channel, e);
124             logger.warn(
125                     "Failed to initialize an accepted socket.", e);
126             return;
127         }
128 
129         future.setSuccess();
130         DefaultLocalChannel acceptedChannel = new DefaultLocalChannel(
131                 serverChannel, serverChannel.getFactory(), pipeline, this, channel);
132         channel.pairedChannel = acceptedChannel;
133 
134         bind(channel, succeededFuture(channel), new LocalAddress(LocalAddress.EPHEMERAL));
135         channel.remoteAddress = serverChannel.getLocalAddress();
136         channel.setConnected();
137         fireChannelConnected(channel, serverChannel.getLocalAddress());
138 
139         acceptedChannel.localAddress = serverChannel.getLocalAddress();
140         try {
141             acceptedChannel.setBound();
142         } catch (IOException e) {
143             throw new Error(e);
144         }
145         fireChannelBound(acceptedChannel, channel.getRemoteAddress());
146         acceptedChannel.remoteAddress = channel.getLocalAddress();
147         acceptedChannel.setConnected();
148         fireChannelConnected(acceptedChannel, channel.getLocalAddress());
149 
150         // Flush something that was written in channelBound / channelConnected
151         channel.flushWriteBuffer();
152         acceptedChannel.flushWriteBuffer();
153     }
154 }