View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.local;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.nio.channels.ClosedChannelException;
21  import java.nio.channels.NotYetConnectedException;
22  import java.util.Queue;
23  import java.util.concurrent.atomic.AtomicInteger;
24  
25  import org.jboss.netty.channel.AbstractChannel;
26  import org.jboss.netty.channel.ChannelConfig;
27  import org.jboss.netty.channel.ChannelException;
28  import org.jboss.netty.channel.ChannelFactory;
29  import org.jboss.netty.channel.ChannelFuture;
30  import org.jboss.netty.channel.ChannelFutureListener;
31  import org.jboss.netty.channel.ChannelPipeline;
32  import org.jboss.netty.channel.ChannelSink;
33  import org.jboss.netty.channel.DefaultChannelConfig;
34  import org.jboss.netty.channel.MessageEvent;
35  import org.jboss.netty.util.internal.LinkedTransferQueue;
36  import org.jboss.netty.util.internal.ThreadLocalBoolean;
37  
38  /**
39   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
40   * @author Andy Taylor (andy.taylor@jboss.org)
41   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
42   * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $
43   */
44  final class DefaultLocalChannel extends AbstractChannel implements LocalChannel {
45  
46      // TODO Move the state management up to AbstractChannel to remove duplication.
47      private static final int ST_OPEN = 0;
48      private static final int ST_BOUND = 1;
49      private static final int ST_CONNECTED = 2;
50      private static final int ST_CLOSED = -1;
51      final AtomicInteger state = new AtomicInteger(ST_OPEN);
52  
53      private final ChannelConfig config;
54      private final ThreadLocalBoolean delivering = new ThreadLocalBoolean();
55  
56      final Queue<MessageEvent> writeBuffer = new LinkedTransferQueue<MessageEvent>();
57  
58      volatile DefaultLocalChannel pairedChannel;
59      volatile LocalAddress localAddress;
60      volatile LocalAddress remoteAddress;
61  
62      DefaultLocalChannel(LocalServerChannel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, DefaultLocalChannel pairedChannel) {
63          super(parent, factory, pipeline, sink);
64          this.pairedChannel = pairedChannel;
65          config = new DefaultChannelConfig();
66  
67          // TODO Move the state variable to AbstractChannel so that we don't need
68          //      to add many listeners.
69          getCloseFuture().addListener(new ChannelFutureListener() {
70              public void operationComplete(ChannelFuture future) throws Exception {
71                  state.set(ST_CLOSED);
72              }
73          });
74  
75          fireChannelOpen(this);
76      }
77  
78      public ChannelConfig getConfig() {
79          return config;
80      }
81  
82      @Override
83      public boolean isOpen() {
84          return state.get() >= ST_OPEN;
85      }
86  
87      public boolean isBound() {
88          return state.get() >= ST_BOUND;
89      }
90  
91      public boolean isConnected() {
92          return state.get() == ST_CONNECTED;
93      }
94  
95      final void setBound() throws ClosedChannelException {
96          if (!state.compareAndSet(ST_OPEN, ST_BOUND)) {
97              switch (state.get()) {
98              case ST_CLOSED:
99                  throw new ClosedChannelException();
100             default:
101                 throw new ChannelException("already bound");
102             }
103         }
104     }
105 
106     final void setConnected() {
107         if (state.get() != ST_CLOSED) {
108             state.set(ST_CONNECTED);
109         }
110     }
111 
112     @Override
113     protected boolean setClosed() {
114         return super.setClosed();
115     }
116 
117     public LocalAddress getLocalAddress() {
118         return localAddress;
119     }
120 
121     public LocalAddress getRemoteAddress() {
122         return remoteAddress;
123     }
124 
125     void closeNow(ChannelFuture future) {
126         LocalAddress localAddress = this.localAddress;
127         try {
128             // Close the self.
129             if (!setClosed()) {
130                 return;
131             }
132 
133             DefaultLocalChannel pairedChannel = this.pairedChannel;
134             if (pairedChannel != null) {
135                 this.pairedChannel = null;
136                 fireChannelDisconnected(this);
137                 fireChannelUnbound(this);
138             }
139             fireChannelClosed(this);
140 
141             // Close the peer.
142             if (pairedChannel == null || !pairedChannel.setClosed()) {
143                 return;
144             }
145 
146             DefaultLocalChannel me = pairedChannel.pairedChannel;
147             if (me != null) {
148                 pairedChannel.pairedChannel = null;
149                 fireChannelDisconnected(pairedChannel);
150                 fireChannelUnbound(pairedChannel);
151             }
152             fireChannelClosed(pairedChannel);
153         } finally {
154             future.setSuccess();
155             if (localAddress != null && getParent() == null) {
156                 LocalChannelRegistry.unregister(localAddress);
157             }
158         }
159     }
160 
161     void flushWriteBuffer() {
162         DefaultLocalChannel pairedChannel = this.pairedChannel;
163         if (pairedChannel != null) {
164             if (pairedChannel.isConnected()){
165                 // Channel is open and connected and channelConnected event has
166                 // been fired.
167                 if (!delivering.get()) {
168                     delivering.set(true);
169                     try {
170                         for (;;) {
171                             MessageEvent e = writeBuffer.poll();
172                             if(e == null) {
173                                 break;
174                             }
175 
176                             e.getFuture().setSuccess();
177                             fireMessageReceived(pairedChannel, e.getMessage());
178                             fireWriteComplete(this, 1);
179                         }
180                     } finally {
181                         delivering.set(false);
182                     }
183                 }
184             } else {
185                 // Channel is open and connected but channelConnected event has
186                 // not been fired yet.
187             }
188         } else {
189             // Channel is closed or not connected yet - notify as failures.
190             Exception cause;
191             if (isOpen()) {
192                 cause = new NotYetConnectedException();
193             } else {
194                 cause = new ClosedChannelException();
195             }
196 
197             for (;;) {
198                 MessageEvent e = writeBuffer.poll();
199                 if(e == null) {
200                     break;
201                 }
202 
203                 e.getFuture().setFailure(cause);
204                 fireExceptionCaught(this, cause);
205             }
206         }
207     }
208 }