1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.oio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.PushbackInputStream;
21 import java.net.SocketAddress;
22 import java.util.concurrent.Executor;
23
24 import org.jboss.netty.channel.AbstractChannelSink;
25 import org.jboss.netty.channel.ChannelEvent;
26 import org.jboss.netty.channel.ChannelFuture;
27 import org.jboss.netty.channel.ChannelFutureListener;
28 import org.jboss.netty.channel.ChannelPipeline;
29 import org.jboss.netty.channel.ChannelState;
30 import org.jboss.netty.channel.ChannelStateEvent;
31 import org.jboss.netty.channel.MessageEvent;
32 import org.jboss.netty.util.ThreadRenamingRunnable;
33 import org.jboss.netty.util.internal.DeadLockProofWorker;
34
35
36
37
38
39
40
41
42
43 class OioClientSocketPipelineSink extends AbstractChannelSink {
44
45 private final Executor workerExecutor;
46
47 OioClientSocketPipelineSink(Executor workerExecutor) {
48 this.workerExecutor = workerExecutor;
49 }
50
51 public void eventSunk(
52 ChannelPipeline pipeline, ChannelEvent e) throws Exception {
53 OioClientSocketChannel channel = (OioClientSocketChannel) e.getChannel();
54 ChannelFuture future = e.getFuture();
55 if (e instanceof ChannelStateEvent) {
56 ChannelStateEvent stateEvent = (ChannelStateEvent) e;
57 ChannelState state = stateEvent.getState();
58 Object value = stateEvent.getValue();
59 switch (state) {
60 case OPEN:
61 if (Boolean.FALSE.equals(value)) {
62 OioWorker.close(channel, future);
63 }
64 break;
65 case BOUND:
66 if (value != null) {
67 bind(channel, future, (SocketAddress) value);
68 } else {
69 OioWorker.close(channel, future);
70 }
71 break;
72 case CONNECTED:
73 if (value != null) {
74 connect(channel, future, (SocketAddress) value);
75 } else {
76 OioWorker.close(channel, future);
77 }
78 break;
79 case INTEREST_OPS:
80 OioWorker.setInterestOps(channel, future, ((Integer) value).intValue());
81 break;
82 }
83 } else if (e instanceof MessageEvent) {
84 OioWorker.write(
85 channel, future,
86 ((MessageEvent) e).getMessage());
87 }
88 }
89
90 private void bind(
91 OioClientSocketChannel channel, ChannelFuture future,
92 SocketAddress localAddress) {
93 try {
94 channel.socket.bind(localAddress);
95 future.setSuccess();
96 fireChannelBound(channel, channel.getLocalAddress());
97 } catch (Throwable t) {
98 future.setFailure(t);
99 fireExceptionCaught(channel, t);
100 }
101 }
102
103 private void connect(
104 OioClientSocketChannel channel, ChannelFuture future,
105 SocketAddress remoteAddress) {
106
107 boolean bound = channel.isBound();
108 boolean connected = false;
109 boolean workerStarted = false;
110
111 future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
112
113 try {
114 channel.socket.connect(
115 remoteAddress, channel.getConfig().getConnectTimeoutMillis());
116 connected = true;
117
118
119 channel.in = new PushbackInputStream(channel.socket.getInputStream(), 1);
120 channel.out = channel.socket.getOutputStream();
121
122
123 future.setSuccess();
124 if (!bound) {
125 fireChannelBound(channel, channel.getLocalAddress());
126 }
127 fireChannelConnected(channel, channel.getRemoteAddress());
128
129
130 DeadLockProofWorker.start(
131 workerExecutor,
132 new ThreadRenamingRunnable(
133 new OioWorker(channel),
134 "Old I/O client worker (" + channel + ')'));
135 workerStarted = true;
136 } catch (Throwable t) {
137 future.setFailure(t);
138 fireExceptionCaught(channel, t);
139 } finally {
140 if (connected && !workerStarted) {
141 OioWorker.close(channel, future);
142 }
143 }
144 }
145 }