View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.local;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import org.jboss.netty.channel.AbstractChannelSink;
21  import org.jboss.netty.channel.Channel;
22  import org.jboss.netty.channel.ChannelEvent;
23  import org.jboss.netty.channel.ChannelException;
24  import org.jboss.netty.channel.ChannelFuture;
25  import org.jboss.netty.channel.ChannelPipeline;
26  import org.jboss.netty.channel.ChannelState;
27  import org.jboss.netty.channel.ChannelStateEvent;
28  import org.jboss.netty.channel.MessageEvent;
29  
30  /**
31   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
32   * @author Andy Taylor (andy.taylor@jboss.org)
33   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
34   * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $
35   */
36  final class LocalServerChannelSink extends AbstractChannelSink {
37  
38      LocalServerChannelSink() {
39          super();
40      }
41  
42      public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
43          Channel channel = e.getChannel();
44          if (channel instanceof DefaultLocalServerChannel) {
45              handleServerChannel(e);
46          }
47          else if (channel instanceof DefaultLocalChannel) {
48              handleAcceptedChannel(e);
49          }
50      }
51  
52      private void handleServerChannel(ChannelEvent e) {
53          if (!(e instanceof ChannelStateEvent)) {
54              return;
55          }
56  
57          ChannelStateEvent event = (ChannelStateEvent) e;
58          DefaultLocalServerChannel channel =
59                (DefaultLocalServerChannel) event.getChannel();
60          ChannelFuture future = event.getFuture();
61          ChannelState state = event.getState();
62          Object value = event.getValue();
63          switch (state) {
64          case OPEN:
65              if (Boolean.FALSE.equals(value)) {
66                  close(channel, future);
67              }
68              break;
69          case BOUND:
70              if (value != null) {
71                  bind(channel, future, (LocalAddress) value);
72              } else {
73                  close(channel, future);
74              }
75              break;
76          }
77      }
78  
79      private void handleAcceptedChannel(ChannelEvent e) {
80          if (e instanceof ChannelStateEvent) {
81              ChannelStateEvent event = (ChannelStateEvent) e;
82              DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
83              ChannelFuture future = event.getFuture();
84              ChannelState state = event.getState();
85              Object value = event.getValue();
86  
87              switch (state) {
88              case OPEN:
89                  if (Boolean.FALSE.equals(value)) {
90                      channel.closeNow(future);
91                  }
92                  break;
93              case BOUND:
94              case CONNECTED:
95                  if (value == null) {
96                      channel.closeNow(future);
97                  }
98                  break;
99              case INTEREST_OPS:
100                 // Unsupported - discard silently.
101                 future.setSuccess();
102                 break;
103             }
104         } else if (e instanceof MessageEvent) {
105             MessageEvent event = (MessageEvent) e;
106             DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
107             boolean offered = channel.writeBuffer.offer(event);
108             assert offered;
109             channel.flushWriteBuffer();
110         }
111     }
112 
113     private void bind(DefaultLocalServerChannel channel, ChannelFuture future, LocalAddress localAddress) {
114         try {
115             if (!LocalChannelRegistry.register(localAddress, channel)) {
116                 throw new ChannelException("address already in use: " + localAddress);
117             }
118             if (!channel.bound.compareAndSet(false, true)) {
119                 throw new ChannelException("already bound");
120             }
121 
122             channel.localAddress = localAddress;
123             future.setSuccess();
124             fireChannelBound(channel, localAddress);
125         } catch (Throwable t) {
126             LocalChannelRegistry.unregister(localAddress);
127             future.setFailure(t);
128             fireExceptionCaught(channel, t);
129         }
130     }
131 
132     private void close(DefaultLocalServerChannel channel, ChannelFuture future) {
133         try {
134             if (channel.setClosed()) {
135                 future.setSuccess();
136                 LocalAddress localAddress = channel.localAddress;
137                 if (channel.bound.compareAndSet(true, false)) {
138                     channel.localAddress = null;
139                     LocalChannelRegistry.unregister(localAddress);
140                     fireChannelUnbound(channel);
141                 }
142                 fireChannelClosed(channel);
143             } else {
144                 future.setSuccess();
145             }
146         } catch (Throwable t) {
147             future.setFailure(t);
148             fireExceptionCaught(channel, t);
149         }
150     }
151 }