1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.net.InetSocketAddress;
21 import java.net.SocketAddress;
22 import java.nio.channels.SocketChannel;
23 import java.util.Queue;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 import org.jboss.netty.buffer.ChannelBuffer;
28 import org.jboss.netty.channel.AbstractChannel;
29 import org.jboss.netty.channel.Channel;
30 import org.jboss.netty.channel.ChannelFactory;
31 import org.jboss.netty.channel.ChannelFuture;
32 import org.jboss.netty.channel.ChannelPipeline;
33 import org.jboss.netty.channel.ChannelSink;
34 import org.jboss.netty.channel.MessageEvent;
35 import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
36 import org.jboss.netty.util.internal.LinkedTransferQueue;
37 import org.jboss.netty.util.internal.ThreadLocalBoolean;
38
39
40
41
42
43
44
45
46 class NioSocketChannel extends AbstractChannel
47 implements org.jboss.netty.channel.socket.SocketChannel {
48
49 private static final int ST_OPEN = 0;
50 private static final int ST_BOUND = 1;
51 private static final int ST_CONNECTED = 2;
52 private static final int ST_CLOSED = -1;
53 volatile int state = ST_OPEN;
54
55 final SocketChannel socket;
56 final NioWorker worker;
57 private final NioSocketChannelConfig config;
58 private volatile InetSocketAddress localAddress;
59 private volatile InetSocketAddress remoteAddress;
60
61 final Object interestOpsLock = new Object();
62 final Object writeLock = new Object();
63
64 final Runnable writeTask = new WriteTask();
65 final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
66
67 final Queue<MessageEvent> writeBuffer = new WriteRequestQueue();
68 final AtomicInteger writeBufferSize = new AtomicInteger();
69 final AtomicInteger highWaterMarkCounter = new AtomicInteger();
70 boolean inWriteNowLoop;
71 boolean writeSuspended;
72
73 MessageEvent currentWriteEvent;
74 SendBuffer currentWriteBuffer;
75
76 public NioSocketChannel(
77 Channel parent, ChannelFactory factory,
78 ChannelPipeline pipeline, ChannelSink sink,
79 SocketChannel socket, NioWorker worker) {
80 super(parent, factory, pipeline, sink);
81
82 this.socket = socket;
83 this.worker = worker;
84 config = new DefaultNioSocketChannelConfig(socket.socket());
85 }
86
87 public NioSocketChannelConfig getConfig() {
88 return config;
89 }
90
91 public InetSocketAddress getLocalAddress() {
92 InetSocketAddress localAddress = this.localAddress;
93 if (localAddress == null) {
94 try {
95 this.localAddress = localAddress =
96 (InetSocketAddress) socket.socket().getLocalSocketAddress();
97 } catch (Throwable t) {
98
99 return null;
100 }
101 }
102 return localAddress;
103 }
104
105 public InetSocketAddress getRemoteAddress() {
106 InetSocketAddress remoteAddress = this.remoteAddress;
107 if (remoteAddress == null) {
108 try {
109 this.remoteAddress = remoteAddress =
110 (InetSocketAddress) socket.socket().getRemoteSocketAddress();
111 } catch (Throwable t) {
112
113 return null;
114 }
115 }
116 return remoteAddress;
117 }
118
119 @Override
120 public boolean isOpen() {
121 return state >= ST_OPEN;
122 }
123
124 public boolean isBound() {
125 return state >= ST_BOUND;
126 }
127
128 public boolean isConnected() {
129 return state == ST_CONNECTED;
130 }
131
132 final void setBound() {
133 assert state == ST_OPEN : "Invalid state: " + state;
134 state = ST_BOUND;
135 }
136
137 final void setConnected() {
138 if (state != ST_CLOSED) {
139 state = ST_CONNECTED;
140 }
141 }
142
143 @Override
144 protected boolean setClosed() {
145 state = ST_CLOSED;
146 return super.setClosed();
147 }
148
149 @Override
150 public int getInterestOps() {
151 if (!isOpen()) {
152 return Channel.OP_WRITE;
153 }
154
155 int interestOps = getRawInterestOps();
156 int writeBufferSize = this.writeBufferSize.get();
157 if (writeBufferSize != 0) {
158 if (highWaterMarkCounter.get() > 0) {
159 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
160 if (writeBufferSize >= lowWaterMark) {
161 interestOps |= Channel.OP_WRITE;
162 } else {
163 interestOps &= ~Channel.OP_WRITE;
164 }
165 } else {
166 int highWaterMark = getConfig().getWriteBufferHighWaterMark();
167 if (writeBufferSize >= highWaterMark) {
168 interestOps |= Channel.OP_WRITE;
169 } else {
170 interestOps &= ~Channel.OP_WRITE;
171 }
172 }
173 } else {
174 interestOps &= ~Channel.OP_WRITE;
175 }
176
177 return interestOps;
178 }
179
180 int getRawInterestOps() {
181 return super.getInterestOps();
182 }
183
184 void setRawInterestOpsNow(int interestOps) {
185 super.setInterestOpsNow(interestOps);
186 }
187
188 @Override
189 public ChannelFuture write(Object message, SocketAddress remoteAddress) {
190 if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
191 return super.write(message, null);
192 } else {
193 return getUnsupportedOperationFuture();
194 }
195 }
196
197 private final class WriteRequestQueue extends LinkedTransferQueue<MessageEvent> {
198
199 private static final long serialVersionUID = -246694024103520626L;
200
201 private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
202
203 WriteRequestQueue() {
204 super();
205 }
206
207 @Override
208 public boolean offer(MessageEvent e) {
209 boolean success = super.offer(e);
210 assert success;
211
212 int messageSize = getMessageSize(e);
213 int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
214 int highWaterMark = getConfig().getWriteBufferHighWaterMark();
215
216 if (newWriteBufferSize >= highWaterMark) {
217 if (newWriteBufferSize - messageSize < highWaterMark) {
218 highWaterMarkCounter.incrementAndGet();
219 if (!notifying.get()) {
220 notifying.set(Boolean.TRUE);
221 fireChannelInterestChanged(NioSocketChannel.this);
222 notifying.set(Boolean.FALSE);
223 }
224 }
225 }
226 return true;
227 }
228
229 @Override
230 public MessageEvent poll() {
231 MessageEvent e = super.poll();
232 if (e != null) {
233 int messageSize = getMessageSize(e);
234 int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
235 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
236
237 if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
238 if (newWriteBufferSize + messageSize >= lowWaterMark) {
239 highWaterMarkCounter.decrementAndGet();
240 if (isConnected() && !notifying.get()) {
241 notifying.set(Boolean.TRUE);
242 fireChannelInterestChanged(NioSocketChannel.this);
243 notifying.set(Boolean.FALSE);
244 }
245 }
246 }
247 }
248 return e;
249 }
250
251 private int getMessageSize(MessageEvent e) {
252 Object m = e.getMessage();
253 if (m instanceof ChannelBuffer) {
254 return ((ChannelBuffer) m).readableBytes();
255 }
256 return 0;
257 }
258 }
259
260 private final class WriteTask implements Runnable {
261
262 WriteTask() {
263 super();
264 }
265
266 public void run() {
267 writeTaskInTaskQueue.set(false);
268 worker.writeFromTaskLoop(NioSocketChannel.this);
269 }
270 }
271 }