1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.local;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.ConnectException;
22
23 import org.jboss.netty.channel.AbstractChannelSink;
24 import org.jboss.netty.channel.Channel;
25 import org.jboss.netty.channel.ChannelEvent;
26 import org.jboss.netty.channel.ChannelException;
27 import org.jboss.netty.channel.ChannelFuture;
28 import org.jboss.netty.channel.ChannelPipeline;
29 import org.jboss.netty.channel.ChannelState;
30 import org.jboss.netty.channel.ChannelStateEvent;
31 import org.jboss.netty.channel.MessageEvent;
32 import org.jboss.netty.logging.InternalLogger;
33 import org.jboss.netty.logging.InternalLoggerFactory;
34
35
36
37
38
39
40
41 final class LocalClientChannelSink extends AbstractChannelSink {
42
43 private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalClientChannelSink.class);
44
45 LocalClientChannelSink() {
46 super();
47 }
48
49 public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
50 if (e instanceof ChannelStateEvent) {
51 ChannelStateEvent event = (ChannelStateEvent) e;
52
53 DefaultLocalChannel channel =
54 (DefaultLocalChannel) event.getChannel();
55 ChannelFuture future = event.getFuture();
56 ChannelState state = event.getState();
57 Object value = event.getValue();
58 switch (state) {
59 case OPEN:
60 if (Boolean.FALSE.equals(value)) {
61 channel.closeNow(future);
62 }
63 break;
64 case BOUND:
65 if (value != null) {
66 bind(channel, future, (LocalAddress) value);
67 } else {
68 channel.closeNow(future);
69 }
70 break;
71 case CONNECTED:
72 if (value != null) {
73 connect(channel, future, (LocalAddress) value);
74 } else {
75 channel.closeNow(future);
76 }
77 break;
78 case INTEREST_OPS:
79
80 future.setSuccess();
81 break;
82 }
83 }
84 else if (e instanceof MessageEvent) {
85 MessageEvent event = (MessageEvent) e;
86 DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
87 boolean offered = channel.writeBuffer.offer(event);
88 assert offered;
89 channel.flushWriteBuffer();
90 }
91 }
92
93 private void bind(DefaultLocalChannel channel, ChannelFuture future, LocalAddress localAddress) {
94 try {
95 if (!LocalChannelRegistry.register(localAddress, channel)) {
96 throw new ChannelException("address already in use: " + localAddress);
97 }
98
99 channel.setBound();
100 channel.localAddress = localAddress;
101 future.setSuccess();
102 fireChannelBound(channel, localAddress);
103 } catch (Throwable t) {
104 LocalChannelRegistry.unregister(localAddress);
105 future.setFailure(t);
106 fireExceptionCaught(channel, t);
107 }
108 }
109
110 private void connect(DefaultLocalChannel channel, ChannelFuture future, LocalAddress remoteAddress) {
111 Channel remoteChannel = LocalChannelRegistry.getChannel(remoteAddress);
112 if (!(remoteChannel instanceof DefaultLocalServerChannel)) {
113 future.setFailure(new ConnectException("connection refused"));
114 return;
115 }
116
117 DefaultLocalServerChannel serverChannel = (DefaultLocalServerChannel) remoteChannel;
118 ChannelPipeline pipeline;
119 try {
120 pipeline = serverChannel.getConfig().getPipelineFactory().getPipeline();
121 } catch (Exception e) {
122 future.setFailure(e);
123 fireExceptionCaught(channel, e);
124 logger.warn(
125 "Failed to initialize an accepted socket.", e);
126 return;
127 }
128
129 future.setSuccess();
130 DefaultLocalChannel acceptedChannel = new DefaultLocalChannel(
131 serverChannel, serverChannel.getFactory(), pipeline, this, channel);
132 channel.pairedChannel = acceptedChannel;
133
134 bind(channel, succeededFuture(channel), new LocalAddress(LocalAddress.EPHEMERAL));
135 channel.remoteAddress = serverChannel.getLocalAddress();
136 channel.setConnected();
137 fireChannelConnected(channel, serverChannel.getLocalAddress());
138
139 acceptedChannel.localAddress = serverChannel.getLocalAddress();
140 try {
141 acceptedChannel.setBound();
142 } catch (IOException e) {
143 throw new Error(e);
144 }
145 fireChannelBound(acceptedChannel, channel.getRemoteAddress());
146 acceptedChannel.remoteAddress = channel.getLocalAddress();
147 acceptedChannel.setConnected();
148 fireChannelConnected(acceptedChannel, channel.getLocalAddress());
149
150
151 channel.flushWriteBuffer();
152 acceptedChannel.flushWriteBuffer();
153 }
154 }