1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.embedder;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.lang.reflect.Array;
21 import java.util.ConcurrentModificationException;
22 import java.util.LinkedList;
23 import java.util.Queue;
24
25 import org.jboss.netty.buffer.ChannelBufferFactory;
26 import org.jboss.netty.channel.Channel;
27 import org.jboss.netty.channel.ChannelEvent;
28 import org.jboss.netty.channel.ChannelHandler;
29 import org.jboss.netty.channel.ChannelHandlerContext;
30 import org.jboss.netty.channel.ChannelPipeline;
31 import org.jboss.netty.channel.ChannelPipelineException;
32 import org.jboss.netty.channel.ChannelSink;
33 import org.jboss.netty.channel.ChannelUpstreamHandler;
34 import org.jboss.netty.channel.DefaultChannelPipeline;
35 import org.jboss.netty.channel.ExceptionEvent;
36 import org.jboss.netty.channel.MessageEvent;
37
38
39
40
41
42
43
44
45 abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
46
47 private final Channel channel;
48 private final ChannelPipeline pipeline;
49 private final EmbeddedChannelSink sink = new EmbeddedChannelSink();
50
51 final Queue<Object> productQueue = new LinkedList<Object>();
52
53
54
55
56
57 protected AbstractCodecEmbedder(ChannelHandler... handlers) {
58 pipeline = new EmbeddedChannelPipeline();
59 configurePipeline(handlers);
60 channel = new EmbeddedChannel(pipeline, sink);
61 fireInitialEvents();
62 }
63
64
65
66
67
68
69
70
71 protected AbstractCodecEmbedder(ChannelBufferFactory bufferFactory, ChannelHandler... handlers) {
72 this(handlers);
73 getChannel().getConfig().setBufferFactory(bufferFactory);
74 }
75
76 private void fireInitialEvents() {
77
78 fireChannelOpen(channel);
79 fireChannelBound(channel, channel.getLocalAddress());
80 fireChannelConnected(channel, channel.getRemoteAddress());
81 }
82
83 private void configurePipeline(ChannelHandler... handlers) {
84 if (handlers == null) {
85 throw new NullPointerException("handlers");
86 }
87
88 if (handlers.length == 0) {
89 throw new IllegalArgumentException(
90 "handlers should contain at least one " +
91 ChannelHandler.class.getSimpleName() + '.');
92 }
93
94 for (int i = 0; i < handlers.length; i ++) {
95 ChannelHandler h = handlers[i];
96 if (h == null) {
97 throw new NullPointerException("handlers[" + i + "]");
98 }
99 pipeline.addLast(String.valueOf(i), handlers[i]);
100 }
101 pipeline.addLast("SINK", sink);
102 }
103
104 public boolean finish() {
105 close(channel);
106 fireChannelDisconnected(channel);
107 fireChannelUnbound(channel);
108 fireChannelClosed(channel);
109 return !productQueue.isEmpty();
110 }
111
112
113
114
115
116 protected final Channel getChannel() {
117 return channel;
118 }
119
120
121
122
123
124 protected final boolean isEmpty() {
125 return productQueue.isEmpty();
126 }
127
128 @SuppressWarnings("unchecked")
129 public final E poll() {
130 return (E) productQueue.poll();
131 }
132
133 @SuppressWarnings("unchecked")
134 public final E peek() {
135 return (E) productQueue.peek();
136 }
137
138 public final Object[] pollAll() {
139 final int size = size();
140 Object[] a = new Object[size];
141 for (int i = 0; i < size; i ++) {
142 E product = poll();
143 if (product == null) {
144 throw new ConcurrentModificationException();
145 }
146 a[i] = product;
147 }
148 return a;
149 }
150
151 @SuppressWarnings("unchecked")
152 public final <T> T[] pollAll(T[] a) {
153 if (a == null) {
154 throw new NullPointerException("a");
155 }
156
157 final int size = size();
158
159
160 if (a.length < size) {
161 a = (T[]) Array.newInstance(a.getClass().getComponentType(), size);
162 }
163
164 for (int i = 0;; i ++) {
165 T product = (T) poll();
166 if (product == null) {
167 break;
168 }
169 a[i] = product;
170 }
171
172
173 if (a.length > size) {
174 a[size] = null;
175 }
176
177 return a;
178 }
179
180 public final int size() {
181 return productQueue.size();
182 }
183
184 public ChannelPipeline getPipeline() {
185 return pipeline;
186 }
187
188 private final class EmbeddedChannelSink implements ChannelSink, ChannelUpstreamHandler {
189 EmbeddedChannelSink() {
190 super();
191 }
192
193 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) {
194 handleEvent(e);
195 }
196
197 public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) {
198 handleEvent(e);
199 }
200
201 private void handleEvent(ChannelEvent e) {
202 if (e instanceof MessageEvent) {
203 boolean offered = productQueue.offer(((MessageEvent) e).getMessage());
204 assert offered;
205 } else if (e instanceof ExceptionEvent) {
206 throw new CodecEmbedderException(((ExceptionEvent) e).getCause());
207 }
208
209
210 }
211
212 public void exceptionCaught(
213 ChannelPipeline pipeline, ChannelEvent e,
214 ChannelPipelineException cause) throws Exception {
215 Throwable actualCause = cause.getCause();
216 if (actualCause == null) {
217 actualCause = cause;
218 }
219
220 throw new CodecEmbedderException(actualCause);
221 }
222 }
223
224 private static final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
225
226 EmbeddedChannelPipeline() {
227 super();
228 }
229
230 @Override
231 protected void notifyHandlerException(ChannelEvent e, Throwable t) {
232 while (t instanceof ChannelPipelineException && t.getCause() != null) {
233 t = t.getCause();
234 }
235 if (t instanceof CodecEmbedderException) {
236 throw (CodecEmbedderException) t;
237 } else {
238 throw new CodecEmbedderException(t);
239 }
240 }
241 }
242 }