1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.oio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.Socket;
22 import java.net.SocketAddress;
23 import java.net.SocketTimeoutException;
24 import java.util.concurrent.Executor;
25
26 import org.jboss.netty.channel.AbstractChannelSink;
27 import org.jboss.netty.channel.Channel;
28 import org.jboss.netty.channel.ChannelEvent;
29 import org.jboss.netty.channel.ChannelFuture;
30 import org.jboss.netty.channel.ChannelPipeline;
31 import org.jboss.netty.channel.ChannelState;
32 import org.jboss.netty.channel.ChannelStateEvent;
33 import org.jboss.netty.channel.MessageEvent;
34 import org.jboss.netty.logging.InternalLogger;
35 import org.jboss.netty.logging.InternalLoggerFactory;
36 import org.jboss.netty.util.ThreadRenamingRunnable;
37 import org.jboss.netty.util.internal.DeadLockProofWorker;
38
39
40
41
42
43
44
45
46
47 class OioServerSocketPipelineSink extends AbstractChannelSink {
48
49 static final InternalLogger logger =
50 InternalLoggerFactory.getInstance(OioServerSocketPipelineSink.class);
51
52 final Executor workerExecutor;
53
54 OioServerSocketPipelineSink(Executor workerExecutor) {
55 this.workerExecutor = workerExecutor;
56 }
57
58 public void eventSunk(
59 ChannelPipeline pipeline, ChannelEvent e) throws Exception {
60 Channel channel = e.getChannel();
61 if (channel instanceof OioServerSocketChannel) {
62 handleServerSocket(e);
63 } else if (channel instanceof OioAcceptedSocketChannel) {
64 handleAcceptedSocket(e);
65 }
66 }
67
68 private void handleServerSocket(ChannelEvent e) {
69 if (!(e instanceof ChannelStateEvent)) {
70 return;
71 }
72
73 ChannelStateEvent event = (ChannelStateEvent) e;
74 OioServerSocketChannel channel =
75 (OioServerSocketChannel) event.getChannel();
76 ChannelFuture future = event.getFuture();
77 ChannelState state = event.getState();
78 Object value = event.getValue();
79
80 switch (state) {
81 case OPEN:
82 if (Boolean.FALSE.equals(value)) {
83 close(channel, future);
84 }
85 break;
86 case BOUND:
87 if (value != null) {
88 bind(channel, future, (SocketAddress) value);
89 } else {
90 close(channel, future);
91 }
92 break;
93 }
94 }
95
96 private void handleAcceptedSocket(ChannelEvent e) {
97 if (e instanceof ChannelStateEvent) {
98 ChannelStateEvent event = (ChannelStateEvent) e;
99 OioAcceptedSocketChannel channel =
100 (OioAcceptedSocketChannel) event.getChannel();
101 ChannelFuture future = event.getFuture();
102 ChannelState state = event.getState();
103 Object value = event.getValue();
104
105 switch (state) {
106 case OPEN:
107 if (Boolean.FALSE.equals(value)) {
108 OioWorker.close(channel, future);
109 }
110 break;
111 case BOUND:
112 case CONNECTED:
113 if (value == null) {
114 OioWorker.close(channel, future);
115 }
116 break;
117 case INTEREST_OPS:
118 OioWorker.setInterestOps(channel, future, ((Integer) value).intValue());
119 break;
120 }
121 } else if (e instanceof MessageEvent) {
122 MessageEvent event = (MessageEvent) e;
123 OioSocketChannel channel = (OioSocketChannel) event.getChannel();
124 ChannelFuture future = event.getFuture();
125 Object message = event.getMessage();
126 OioWorker.write(channel, future, message);
127 }
128 }
129
130 private void bind(
131 OioServerSocketChannel channel, ChannelFuture future,
132 SocketAddress localAddress) {
133
134 boolean bound = false;
135 boolean bossStarted = false;
136 try {
137 channel.socket.bind(localAddress, channel.getConfig().getBacklog());
138 bound = true;
139
140 future.setSuccess();
141 localAddress = channel.getLocalAddress();
142 fireChannelBound(channel, localAddress);
143
144 Executor bossExecutor =
145 ((OioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
146 DeadLockProofWorker.start(
147 bossExecutor,
148 new ThreadRenamingRunnable(
149 new Boss(channel),
150 "Old I/O server boss (" + channel + ')'));
151 bossStarted = true;
152 } catch (Throwable t) {
153 future.setFailure(t);
154 fireExceptionCaught(channel, t);
155 } finally {
156 if (!bossStarted && bound) {
157 close(channel, future);
158 }
159 }
160 }
161
162 private void close(OioServerSocketChannel channel, ChannelFuture future) {
163 boolean bound = channel.isBound();
164 try {
165 channel.socket.close();
166
167
168
169
170 channel.shutdownLock.lock();
171 try {
172 if (channel.setClosed()) {
173 future.setSuccess();
174 if (bound) {
175 fireChannelUnbound(channel);
176 }
177 fireChannelClosed(channel);
178 } else {
179 future.setSuccess();
180 }
181 } finally {
182 channel.shutdownLock.unlock();
183 }
184 } catch (Throwable t) {
185 future.setFailure(t);
186 fireExceptionCaught(channel, t);
187 }
188 }
189
190 private final class Boss implements Runnable {
191 private final OioServerSocketChannel channel;
192
193 Boss(OioServerSocketChannel channel) {
194 this.channel = channel;
195 }
196
197 public void run() {
198 channel.shutdownLock.lock();
199 try {
200 while (channel.isBound()) {
201 try {
202 Socket acceptedSocket = channel.socket.accept();
203 try {
204 ChannelPipeline pipeline =
205 channel.getConfig().getPipelineFactory().getPipeline();
206 final OioAcceptedSocketChannel acceptedChannel =
207 new OioAcceptedSocketChannel(
208 channel,
209 channel.getFactory(),
210 pipeline,
211 OioServerSocketPipelineSink.this,
212 acceptedSocket);
213 DeadLockProofWorker.start(
214 workerExecutor,
215 new ThreadRenamingRunnable(
216 new OioWorker(acceptedChannel),
217 "Old I/O server worker (parentId: " +
218 channel.getId() + ", " + channel + ')'));
219 } catch (Exception e) {
220 logger.warn(
221 "Failed to initialize an accepted socket.", e);
222 try {
223 acceptedSocket.close();
224 } catch (IOException e2) {
225 logger.warn(
226 "Failed to close a partially accepted socket.",
227 e2);
228 }
229 }
230 } catch (SocketTimeoutException e) {
231
232 } catch (Throwable e) {
233
234
235 if (!channel.socket.isBound() || channel.socket.isClosed()) {
236 break;
237 }
238
239 logger.warn(
240 "Failed to accept a connection.", e);
241 try {
242 Thread.sleep(1000);
243 } catch (InterruptedException e1) {
244
245 }
246 }
247 }
248 } finally {
249 channel.shutdownLock.unlock();
250 }
251 }
252 }
253 }