1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.local;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import org.jboss.netty.channel.AbstractChannelSink;
21 import org.jboss.netty.channel.Channel;
22 import org.jboss.netty.channel.ChannelEvent;
23 import org.jboss.netty.channel.ChannelException;
24 import org.jboss.netty.channel.ChannelFuture;
25 import org.jboss.netty.channel.ChannelPipeline;
26 import org.jboss.netty.channel.ChannelState;
27 import org.jboss.netty.channel.ChannelStateEvent;
28 import org.jboss.netty.channel.MessageEvent;
29
30
31
32
33
34
35
36 final class LocalServerChannelSink extends AbstractChannelSink {
37
38 LocalServerChannelSink() {
39 super();
40 }
41
42 public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
43 Channel channel = e.getChannel();
44 if (channel instanceof DefaultLocalServerChannel) {
45 handleServerChannel(e);
46 }
47 else if (channel instanceof DefaultLocalChannel) {
48 handleAcceptedChannel(e);
49 }
50 }
51
52 private void handleServerChannel(ChannelEvent e) {
53 if (!(e instanceof ChannelStateEvent)) {
54 return;
55 }
56
57 ChannelStateEvent event = (ChannelStateEvent) e;
58 DefaultLocalServerChannel channel =
59 (DefaultLocalServerChannel) event.getChannel();
60 ChannelFuture future = event.getFuture();
61 ChannelState state = event.getState();
62 Object value = event.getValue();
63 switch (state) {
64 case OPEN:
65 if (Boolean.FALSE.equals(value)) {
66 close(channel, future);
67 }
68 break;
69 case BOUND:
70 if (value != null) {
71 bind(channel, future, (LocalAddress) value);
72 } else {
73 close(channel, future);
74 }
75 break;
76 }
77 }
78
79 private void handleAcceptedChannel(ChannelEvent e) {
80 if (e instanceof ChannelStateEvent) {
81 ChannelStateEvent event = (ChannelStateEvent) e;
82 DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
83 ChannelFuture future = event.getFuture();
84 ChannelState state = event.getState();
85 Object value = event.getValue();
86
87 switch (state) {
88 case OPEN:
89 if (Boolean.FALSE.equals(value)) {
90 channel.closeNow(future);
91 }
92 break;
93 case BOUND:
94 case CONNECTED:
95 if (value == null) {
96 channel.closeNow(future);
97 }
98 break;
99 case INTEREST_OPS:
100
101 future.setSuccess();
102 break;
103 }
104 } else if (e instanceof MessageEvent) {
105 MessageEvent event = (MessageEvent) e;
106 DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
107 boolean offered = channel.writeBuffer.offer(event);
108 assert offered;
109 channel.flushWriteBuffer();
110 }
111 }
112
113 private void bind(DefaultLocalServerChannel channel, ChannelFuture future, LocalAddress localAddress) {
114 try {
115 if (!LocalChannelRegistry.register(localAddress, channel)) {
116 throw new ChannelException("address already in use: " + localAddress);
117 }
118 if (!channel.bound.compareAndSet(false, true)) {
119 throw new ChannelException("already bound");
120 }
121
122 channel.localAddress = localAddress;
123 future.setSuccess();
124 fireChannelBound(channel, localAddress);
125 } catch (Throwable t) {
126 LocalChannelRegistry.unregister(localAddress);
127 future.setFailure(t);
128 fireExceptionCaught(channel, t);
129 }
130 }
131
132 private void close(DefaultLocalServerChannel channel, ChannelFuture future) {
133 try {
134 if (channel.setClosed()) {
135 future.setSuccess();
136 LocalAddress localAddress = channel.localAddress;
137 if (channel.bound.compareAndSet(true, false)) {
138 channel.localAddress = null;
139 LocalChannelRegistry.unregister(localAddress);
140 fireChannelUnbound(channel);
141 }
142 fireChannelClosed(channel);
143 } else {
144 future.setSuccess();
145 }
146 } catch (Throwable t) {
147 future.setFailure(t);
148 fireExceptionCaught(channel, t);
149 }
150 }
151 }