1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.oio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.net.SocketAddress;
21 import java.util.concurrent.Executor;
22
23 import org.jboss.netty.channel.AbstractChannelSink;
24 import org.jboss.netty.channel.ChannelEvent;
25 import org.jboss.netty.channel.ChannelFuture;
26 import org.jboss.netty.channel.ChannelFutureListener;
27 import org.jboss.netty.channel.ChannelPipeline;
28 import org.jboss.netty.channel.ChannelState;
29 import org.jboss.netty.channel.ChannelStateEvent;
30 import org.jboss.netty.channel.MessageEvent;
31 import org.jboss.netty.util.ThreadRenamingRunnable;
32 import org.jboss.netty.util.internal.DeadLockProofWorker;
33
34
35
36
37
38
39
40
41
42 class OioDatagramPipelineSink extends AbstractChannelSink {
43
44 private final Executor workerExecutor;
45
46 OioDatagramPipelineSink(Executor workerExecutor) {
47 this.workerExecutor = workerExecutor;
48 }
49
50 public void eventSunk(
51 ChannelPipeline pipeline, ChannelEvent e) throws Exception {
52 OioDatagramChannel channel = (OioDatagramChannel) e.getChannel();
53 ChannelFuture future = e.getFuture();
54 if (e instanceof ChannelStateEvent) {
55 ChannelStateEvent stateEvent = (ChannelStateEvent) e;
56 ChannelState state = stateEvent.getState();
57 Object value = stateEvent.getValue();
58 switch (state) {
59 case OPEN:
60 if (Boolean.FALSE.equals(value)) {
61 OioDatagramWorker.close(channel, future);
62 }
63 break;
64 case BOUND:
65 if (value != null) {
66 bind(channel, future, (SocketAddress) value);
67 } else {
68 OioDatagramWorker.close(channel, future);
69 }
70 break;
71 case CONNECTED:
72 if (value != null) {
73 connect(channel, future, (SocketAddress) value);
74 } else {
75 OioDatagramWorker.disconnect(channel, future);
76 }
77 break;
78 case INTEREST_OPS:
79 OioDatagramWorker.setInterestOps(channel, future, ((Integer) value).intValue());
80 break;
81 }
82 } else if (e instanceof MessageEvent) {
83 MessageEvent evt = (MessageEvent) e;
84 OioDatagramWorker.write(
85 channel, future, evt.getMessage(), evt.getRemoteAddress());
86 }
87 }
88
89 private void bind(
90 OioDatagramChannel channel, ChannelFuture future,
91 SocketAddress localAddress) {
92 boolean bound = false;
93 boolean workerStarted = false;
94 try {
95 channel.socket.bind(localAddress);
96 bound = true;
97
98
99 future.setSuccess();
100 fireChannelBound(channel, channel.getLocalAddress());
101
102
103 DeadLockProofWorker.start(
104 workerExecutor,
105 new ThreadRenamingRunnable(
106 new OioDatagramWorker(channel),
107 "Old I/O datagram worker (" + channel + ')'));
108 workerStarted = true;
109 } catch (Throwable t) {
110 future.setFailure(t);
111 fireExceptionCaught(channel, t);
112 } finally {
113 if (bound && !workerStarted) {
114 OioDatagramWorker.close(channel, future);
115 }
116 }
117 }
118
119 private void connect(
120 OioDatagramChannel channel, ChannelFuture future,
121 SocketAddress remoteAddress) {
122
123 boolean bound = channel.isBound();
124 boolean connected = false;
125 boolean workerStarted = false;
126
127 future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
128
129
130
131 channel.remoteAddress = null;
132
133 try {
134 channel.socket.connect(remoteAddress);
135 connected = true;
136
137
138 future.setSuccess();
139 if (!bound) {
140 fireChannelBound(channel, channel.getLocalAddress());
141 }
142 fireChannelConnected(channel, channel.getRemoteAddress());
143
144 String threadName = "Old I/O datagram worker (" + channel + ')';
145 if (!bound) {
146
147 DeadLockProofWorker.start(
148 workerExecutor,
149 new ThreadRenamingRunnable(
150 new OioDatagramWorker(channel), threadName));
151 } else {
152
153 Thread workerThread = channel.workerThread;
154 if (workerThread != null) {
155 try {
156 workerThread.setName(threadName);
157 } catch (SecurityException e) {
158
159 }
160 }
161 }
162
163 workerStarted = true;
164 } catch (Throwable t) {
165 future.setFailure(t);
166 fireExceptionCaught(channel, t);
167 } finally {
168 if (connected && !workerStarted) {
169 OioDatagramWorker.close(channel, future);
170 }
171 }
172 }
173 }