View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.net.InetSocketAddress;
21  import java.net.SocketAddress;
22  import java.nio.channels.SocketChannel;
23  import java.util.Queue;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  import java.util.concurrent.atomic.AtomicInteger;
26  
27  import org.jboss.netty.buffer.ChannelBuffer;
28  import org.jboss.netty.channel.AbstractChannel;
29  import org.jboss.netty.channel.Channel;
30  import org.jboss.netty.channel.ChannelFactory;
31  import org.jboss.netty.channel.ChannelFuture;
32  import org.jboss.netty.channel.ChannelPipeline;
33  import org.jboss.netty.channel.ChannelSink;
34  import org.jboss.netty.channel.MessageEvent;
35  import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
36  import org.jboss.netty.util.internal.LinkedTransferQueue;
37  import org.jboss.netty.util.internal.ThreadLocalBoolean;
38  
39  /**
40   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
41   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
42   *
43   * @version $Rev: 2202 $, $Date: 2010-02-23 16:18:58 +0900 (Tue, 23 Feb 2010) $
44   *
45   */
46  class NioSocketChannel extends AbstractChannel
47                                  implements org.jboss.netty.channel.socket.SocketChannel {
48  
49      private static final int ST_OPEN = 0;
50      private static final int ST_BOUND = 1;
51      private static final int ST_CONNECTED = 2;
52      private static final int ST_CLOSED = -1;
53      volatile int state = ST_OPEN;
54  
55      final SocketChannel socket;
56      final NioWorker worker;
57      private final NioSocketChannelConfig config;
58      private volatile InetSocketAddress localAddress;
59      private volatile InetSocketAddress remoteAddress;
60  
61      final Object interestOpsLock = new Object();
62      final Object writeLock = new Object();
63  
64      final Runnable writeTask = new WriteTask();
65      final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
66  
67      final Queue<MessageEvent> writeBuffer = new WriteRequestQueue();
68      final AtomicInteger writeBufferSize = new AtomicInteger();
69      final AtomicInteger highWaterMarkCounter = new AtomicInteger();
70      boolean inWriteNowLoop;
71      boolean writeSuspended;
72  
73      MessageEvent currentWriteEvent;
74      SendBuffer currentWriteBuffer;
75  
76      public NioSocketChannel(
77              Channel parent, ChannelFactory factory,
78              ChannelPipeline pipeline, ChannelSink sink,
79              SocketChannel socket, NioWorker worker) {
80          super(parent, factory, pipeline, sink);
81  
82          this.socket = socket;
83          this.worker = worker;
84          config = new DefaultNioSocketChannelConfig(socket.socket());
85      }
86  
87      public NioSocketChannelConfig getConfig() {
88          return config;
89      }
90  
91      public InetSocketAddress getLocalAddress() {
92          InetSocketAddress localAddress = this.localAddress;
93          if (localAddress == null) {
94              try {
95                  this.localAddress = localAddress =
96                      (InetSocketAddress) socket.socket().getLocalSocketAddress();
97              } catch (Throwable t) {
98                  // Sometimes fails on a closed socket in Windows.
99                  return null;
100             }
101         }
102         return localAddress;
103     }
104 
105     public InetSocketAddress getRemoteAddress() {
106         InetSocketAddress remoteAddress = this.remoteAddress;
107         if (remoteAddress == null) {
108             try {
109                 this.remoteAddress = remoteAddress =
110                     (InetSocketAddress) socket.socket().getRemoteSocketAddress();
111             } catch (Throwable t) {
112                 // Sometimes fails on a closed socket in Windows.
113                 return null;
114             }
115         }
116         return remoteAddress;
117     }
118 
119     @Override
120     public boolean isOpen() {
121         return state >= ST_OPEN;
122     }
123 
124     public boolean isBound() {
125         return state >= ST_BOUND;
126     }
127 
128     public boolean isConnected() {
129         return state == ST_CONNECTED;
130     }
131 
132     final void setBound() {
133         assert state == ST_OPEN : "Invalid state: " + state;
134         state = ST_BOUND;
135     }
136 
137     final void setConnected() {
138         if (state != ST_CLOSED) {
139             state = ST_CONNECTED;
140         }
141     }
142 
143     @Override
144     protected boolean setClosed() {
145         state = ST_CLOSED;
146         return super.setClosed();
147     }
148 
149     @Override
150     public int getInterestOps() {
151         if (!isOpen()) {
152             return Channel.OP_WRITE;
153         }
154 
155         int interestOps = getRawInterestOps();
156         int writeBufferSize = this.writeBufferSize.get();
157         if (writeBufferSize != 0) {
158             if (highWaterMarkCounter.get() > 0) {
159                 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
160                 if (writeBufferSize >= lowWaterMark) {
161                     interestOps |= Channel.OP_WRITE;
162                 } else {
163                     interestOps &= ~Channel.OP_WRITE;
164                 }
165             } else {
166                 int highWaterMark = getConfig().getWriteBufferHighWaterMark();
167                 if (writeBufferSize >= highWaterMark) {
168                     interestOps |= Channel.OP_WRITE;
169                 } else {
170                     interestOps &= ~Channel.OP_WRITE;
171                 }
172             }
173         } else {
174             interestOps &= ~Channel.OP_WRITE;
175         }
176 
177         return interestOps;
178     }
179 
180     int getRawInterestOps() {
181         return super.getInterestOps();
182     }
183 
184     void setRawInterestOpsNow(int interestOps) {
185         super.setInterestOpsNow(interestOps);
186     }
187 
188     @Override
189     public ChannelFuture write(Object message, SocketAddress remoteAddress) {
190         if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
191             return super.write(message, null);
192         } else {
193             return getUnsupportedOperationFuture();
194         }
195     }
196 
197     private final class WriteRequestQueue extends LinkedTransferQueue<MessageEvent> {
198 
199         private static final long serialVersionUID = -246694024103520626L;
200 
201         private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
202 
203         WriteRequestQueue() {
204             super();
205         }
206 
207         @Override
208         public boolean offer(MessageEvent e) {
209             boolean success = super.offer(e);
210             assert success;
211 
212             int messageSize = getMessageSize(e);
213             int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
214             int highWaterMark = getConfig().getWriteBufferHighWaterMark();
215 
216             if (newWriteBufferSize >= highWaterMark) {
217                 if (newWriteBufferSize - messageSize < highWaterMark) {
218                     highWaterMarkCounter.incrementAndGet();
219                     if (!notifying.get()) {
220                         notifying.set(Boolean.TRUE);
221                         fireChannelInterestChanged(NioSocketChannel.this);
222                         notifying.set(Boolean.FALSE);
223                     }
224                 }
225             }
226             return true;
227         }
228 
229         @Override
230         public MessageEvent poll() {
231             MessageEvent e = super.poll();
232             if (e != null) {
233                 int messageSize = getMessageSize(e);
234                 int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
235                 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
236 
237                 if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
238                     if (newWriteBufferSize + messageSize >= lowWaterMark) {
239                         highWaterMarkCounter.decrementAndGet();
240                         if (isConnected() && !notifying.get()) {
241                             notifying.set(Boolean.TRUE);
242                             fireChannelInterestChanged(NioSocketChannel.this);
243                             notifying.set(Boolean.FALSE);
244                         }
245                     }
246                 }
247             }
248             return e;
249         }
250 
251         private int getMessageSize(MessageEvent e) {
252             Object m = e.getMessage();
253             if (m instanceof ChannelBuffer) {
254                 return ((ChannelBuffer) m).readableBytes();
255             }
256             return 0;
257         }
258     }
259 
260     private final class WriteTask implements Runnable {
261 
262         WriteTask() {
263             super();
264         }
265 
266         public void run() {
267             writeTaskInTaskQueue.set(false);
268             worker.writeFromTaskLoop(NioSocketChannel.this);
269         }
270     }
271 }