1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.oio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.OutputStream;
21 import java.io.PushbackInputStream;
22 import java.net.SocketException;
23 import java.nio.channels.Channels;
24 import java.nio.channels.ClosedChannelException;
25 import java.nio.channels.WritableByteChannel;
26 import java.util.regex.Pattern;
27
28 import org.jboss.netty.buffer.ChannelBuffer;
29 import org.jboss.netty.channel.Channel;
30 import org.jboss.netty.channel.ChannelFuture;
31 import org.jboss.netty.channel.DefaultFileRegion;
32 import org.jboss.netty.channel.FileRegion;
33
34
35
36
37
38
39
40
41
42 class OioWorker implements Runnable {
43
44 private static final Pattern SOCKET_CLOSED_MESSAGE = Pattern.compile(
45 "^.*(?:Socket.*closed).*$", Pattern.CASE_INSENSITIVE);
46
47 private final OioSocketChannel channel;
48
49 OioWorker(OioSocketChannel channel) {
50 this.channel = channel;
51 }
52
53 public void run() {
54 channel.workerThread = Thread.currentThread();
55 final PushbackInputStream in = channel.getInputStream();
56
57 boolean fireConnected = channel instanceof OioAcceptedSocketChannel;
58
59 while (channel.isOpen()) {
60 if (fireConnected) {
61 fireConnected = false;
62 fireChannelConnected(channel, channel.getRemoteAddress());
63 }
64 synchronized (channel.interestOpsLock) {
65 while (!channel.isReadable()) {
66 try {
67
68
69 channel.interestOpsLock.wait();
70 } catch (InterruptedException e) {
71 if (!channel.isOpen()) {
72 break;
73 }
74 }
75 }
76 }
77
78 byte[] buf;
79 int readBytes;
80 try {
81 int bytesToRead = in.available();
82 if (bytesToRead > 0) {
83 buf = new byte[bytesToRead];
84 readBytes = in.read(buf);
85 } else {
86 int b = in.read();
87 if (b < 0) {
88 break;
89 }
90 in.unread(b);
91 continue;
92 }
93 } catch (Throwable t) {
94 if (!channel.socket.isClosed()) {
95 fireExceptionCaught(channel, t);
96 }
97 break;
98 }
99
100 fireMessageReceived(
101 channel,
102 channel.getConfig().getBufferFactory().getBuffer(buf, 0, readBytes));
103 }
104
105
106
107 channel.workerThread = null;
108
109
110 close(channel, succeededFuture(channel));
111 }
112
113 static void write(
114 OioSocketChannel channel, ChannelFuture future,
115 Object message) {
116
117 OutputStream out = channel.getOutputStream();
118 if (out == null) {
119 Exception e = new ClosedChannelException();
120 future.setFailure(e);
121 fireExceptionCaught(channel, e);
122 return;
123 }
124
125 try {
126 int length = 0;
127
128
129
130 if (message instanceof FileRegion) {
131 FileRegion fr = (FileRegion) message;
132 try {
133 synchronized (out) {
134 WritableByteChannel bchannel = Channels.newChannel(out);
135
136 long i = 0;
137 while ((i = fr.transferTo(bchannel, length)) > 0) {
138 length += i;
139 if (length >= fr.getCount()) {
140 break;
141 }
142 }
143 }
144 } finally {
145 if (fr instanceof DefaultFileRegion) {
146 if (((DefaultFileRegion)fr).releaseAfterTransfer()) {
147 fr.releaseExternalResources();
148 }
149 }
150
151 }
152 } else {
153 ChannelBuffer a = (ChannelBuffer) message;
154 length = a.readableBytes();
155 synchronized (out) {
156 a.getBytes(a.readerIndex(), out, length);
157 }
158 }
159
160 fireWriteComplete(channel, length);
161 future.setSuccess();
162
163 } catch (Throwable t) {
164
165
166 if (t instanceof SocketException &&
167 SOCKET_CLOSED_MESSAGE.matcher(
168 String.valueOf(t.getMessage())).matches()) {
169 t = new ClosedChannelException();
170 }
171 future.setFailure(t);
172 fireExceptionCaught(channel, t);
173 }
174 }
175
176 static void setInterestOps(
177 OioSocketChannel channel, ChannelFuture future, int interestOps) {
178
179
180 interestOps &= ~Channel.OP_WRITE;
181 interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
182
183 boolean changed = false;
184 try {
185 if (channel.getInterestOps() != interestOps) {
186 if ((interestOps & Channel.OP_READ) != 0) {
187 channel.setInterestOpsNow(Channel.OP_READ);
188 } else {
189 channel.setInterestOpsNow(Channel.OP_NONE);
190 }
191 changed = true;
192 }
193
194 future.setSuccess();
195 if (changed) {
196 synchronized (channel.interestOpsLock) {
197 channel.setInterestOpsNow(interestOps);
198
199
200 Thread currentThread = Thread.currentThread();
201 Thread workerThread = channel.workerThread;
202 if (workerThread != null && currentThread != workerThread) {
203 workerThread.interrupt();
204 }
205 }
206
207 fireChannelInterestChanged(channel);
208 }
209 } catch (Throwable t) {
210 future.setFailure(t);
211 fireExceptionCaught(channel, t);
212 }
213 }
214
215 static void close(OioSocketChannel channel, ChannelFuture future) {
216 boolean connected = channel.isConnected();
217 boolean bound = channel.isBound();
218 try {
219 channel.socket.close();
220 if (channel.setClosed()) {
221 future.setSuccess();
222 if (connected) {
223
224 Thread currentThread = Thread.currentThread();
225 Thread workerThread = channel.workerThread;
226 if (workerThread != null && currentThread != workerThread) {
227 workerThread.interrupt();
228 }
229 fireChannelDisconnected(channel);
230 }
231 if (bound) {
232 fireChannelUnbound(channel);
233 }
234 fireChannelClosed(channel);
235 } else {
236 future.setSuccess();
237 }
238 } catch (Throwable t) {
239 future.setFailure(t);
240 fireExceptionCaught(channel, t);
241 }
242 }
243 }