1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.local;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.nio.channels.ClosedChannelException;
21 import java.nio.channels.NotYetConnectedException;
22 import java.util.Queue;
23 import java.util.concurrent.atomic.AtomicInteger;
24
25 import org.jboss.netty.channel.AbstractChannel;
26 import org.jboss.netty.channel.ChannelConfig;
27 import org.jboss.netty.channel.ChannelException;
28 import org.jboss.netty.channel.ChannelFactory;
29 import org.jboss.netty.channel.ChannelFuture;
30 import org.jboss.netty.channel.ChannelFutureListener;
31 import org.jboss.netty.channel.ChannelPipeline;
32 import org.jboss.netty.channel.ChannelSink;
33 import org.jboss.netty.channel.DefaultChannelConfig;
34 import org.jboss.netty.channel.MessageEvent;
35 import org.jboss.netty.util.internal.LinkedTransferQueue;
36 import org.jboss.netty.util.internal.ThreadLocalBoolean;
37
38
39
40
41
42
43
44 final class DefaultLocalChannel extends AbstractChannel implements LocalChannel {
45
46
47 private static final int ST_OPEN = 0;
48 private static final int ST_BOUND = 1;
49 private static final int ST_CONNECTED = 2;
50 private static final int ST_CLOSED = -1;
51 final AtomicInteger state = new AtomicInteger(ST_OPEN);
52
53 private final ChannelConfig config;
54 private final ThreadLocalBoolean delivering = new ThreadLocalBoolean();
55
56 final Queue<MessageEvent> writeBuffer = new LinkedTransferQueue<MessageEvent>();
57
58 volatile DefaultLocalChannel pairedChannel;
59 volatile LocalAddress localAddress;
60 volatile LocalAddress remoteAddress;
61
62 DefaultLocalChannel(LocalServerChannel parent, ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink, DefaultLocalChannel pairedChannel) {
63 super(parent, factory, pipeline, sink);
64 this.pairedChannel = pairedChannel;
65 config = new DefaultChannelConfig();
66
67
68
69 getCloseFuture().addListener(new ChannelFutureListener() {
70 public void operationComplete(ChannelFuture future) throws Exception {
71 state.set(ST_CLOSED);
72 }
73 });
74
75 fireChannelOpen(this);
76 }
77
78 public ChannelConfig getConfig() {
79 return config;
80 }
81
82 @Override
83 public boolean isOpen() {
84 return state.get() >= ST_OPEN;
85 }
86
87 public boolean isBound() {
88 return state.get() >= ST_BOUND;
89 }
90
91 public boolean isConnected() {
92 return state.get() == ST_CONNECTED;
93 }
94
95 final void setBound() throws ClosedChannelException {
96 if (!state.compareAndSet(ST_OPEN, ST_BOUND)) {
97 switch (state.get()) {
98 case ST_CLOSED:
99 throw new ClosedChannelException();
100 default:
101 throw new ChannelException("already bound");
102 }
103 }
104 }
105
106 final void setConnected() {
107 if (state.get() != ST_CLOSED) {
108 state.set(ST_CONNECTED);
109 }
110 }
111
112 @Override
113 protected boolean setClosed() {
114 return super.setClosed();
115 }
116
117 public LocalAddress getLocalAddress() {
118 return localAddress;
119 }
120
121 public LocalAddress getRemoteAddress() {
122 return remoteAddress;
123 }
124
125 void closeNow(ChannelFuture future) {
126 LocalAddress localAddress = this.localAddress;
127 try {
128
129 if (!setClosed()) {
130 return;
131 }
132
133 DefaultLocalChannel pairedChannel = this.pairedChannel;
134 if (pairedChannel != null) {
135 this.pairedChannel = null;
136 fireChannelDisconnected(this);
137 fireChannelUnbound(this);
138 }
139 fireChannelClosed(this);
140
141
142 if (pairedChannel == null || !pairedChannel.setClosed()) {
143 return;
144 }
145
146 DefaultLocalChannel me = pairedChannel.pairedChannel;
147 if (me != null) {
148 pairedChannel.pairedChannel = null;
149 fireChannelDisconnected(pairedChannel);
150 fireChannelUnbound(pairedChannel);
151 }
152 fireChannelClosed(pairedChannel);
153 } finally {
154 future.setSuccess();
155 if (localAddress != null && getParent() == null) {
156 LocalChannelRegistry.unregister(localAddress);
157 }
158 }
159 }
160
161 void flushWriteBuffer() {
162 DefaultLocalChannel pairedChannel = this.pairedChannel;
163 if (pairedChannel != null) {
164 if (pairedChannel.isConnected()){
165
166
167 if (!delivering.get()) {
168 delivering.set(true);
169 try {
170 for (;;) {
171 MessageEvent e = writeBuffer.poll();
172 if(e == null) {
173 break;
174 }
175
176 e.getFuture().setSuccess();
177 fireMessageReceived(pairedChannel, e.getMessage());
178 fireWriteComplete(this, 1);
179 }
180 } finally {
181 delivering.set(false);
182 }
183 }
184 } else {
185
186
187 }
188 } else {
189
190 Exception cause;
191 if (isOpen()) {
192 cause = new NotYetConnectedException();
193 } else {
194 cause = new ClosedChannelException();
195 }
196
197 for (;;) {
198 MessageEvent e = writeBuffer.poll();
199 if(e == null) {
200 break;
201 }
202
203 e.getFuture().setFailure(cause);
204 fireExceptionCaught(this, cause);
205 }
206 }
207 }
208 }