1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.codec.compression;
17
18 import java.util.concurrent.atomic.AtomicBoolean;
19
20 import org.jboss.netty.buffer.ChannelBuffer;
21 import org.jboss.netty.buffer.ChannelBuffers;
22 import org.jboss.netty.channel.Channel;
23 import org.jboss.netty.channel.ChannelEvent;
24 import org.jboss.netty.channel.ChannelFuture;
25 import org.jboss.netty.channel.ChannelFutureListener;
26 import org.jboss.netty.channel.ChannelHandlerContext;
27 import org.jboss.netty.channel.ChannelStateEvent;
28 import org.jboss.netty.channel.Channels;
29 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
30 import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
31 import org.jboss.netty.util.internal.jzlib.JZlib;
32 import org.jboss.netty.util.internal.jzlib.ZStream;
33
34
35
36
37
38
39
40
41
42
43
44
45 public class ZlibEncoder extends OneToOneEncoder implements LifeCycleAwareChannelHandler {
46
47 private static final byte[] EMPTY_ARRAY = new byte[0];
48
49 private final ZStream z = new ZStream();
50 private final AtomicBoolean finished = new AtomicBoolean();
51 private volatile ChannelHandlerContext ctx;
52
53
54
55
56
57
58
59 public ZlibEncoder() {
60 this(6);
61 }
62
63
64
65
66
67
68
69
70
71
72
73
74 public ZlibEncoder(int compressionLevel) {
75 this(ZlibWrapper.ZLIB, compressionLevel);
76 }
77
78
79
80
81
82
83
84 public ZlibEncoder(ZlibWrapper wrapper) {
85 this(wrapper, 6);
86 }
87
88
89
90
91
92
93
94
95
96
97
98 public ZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
99 if (compressionLevel < 0 || compressionLevel > 9) {
100 throw new IllegalArgumentException(
101 "compressionLevel: " + compressionLevel +
102 " (expected: 0-9)");
103 }
104 if (wrapper == null) {
105 throw new NullPointerException("wrapper");
106 }
107 if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
108 throw new IllegalArgumentException(
109 "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
110 "allowed for compression.");
111 }
112
113 synchronized (z) {
114 int resultCode = z.deflateInit(compressionLevel, ZlibUtil.convertWrapperType(wrapper));
115 if (resultCode != JZlib.Z_OK) {
116 ZlibUtil.fail(z, "initialization failure", resultCode);
117 }
118 }
119 }
120
121
122
123
124
125
126
127
128
129
130
131 public ZlibEncoder(byte[] dictionary) {
132 this(6, dictionary);
133 }
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 public ZlibEncoder(int compressionLevel, byte[] dictionary) {
150 if (compressionLevel < 0 || compressionLevel > 9) {
151 throw new IllegalArgumentException("compressionLevel: " + compressionLevel + " (expected: 0-9)");
152 }
153
154 if (dictionary == null) {
155 throw new NullPointerException("dictionary");
156 }
157
158 synchronized (z) {
159 int resultCode;
160 resultCode = z.deflateInit(compressionLevel, JZlib.W_ZLIB);
161 if (resultCode != JZlib.Z_OK) {
162 ZlibUtil.fail(z, "initialization failure", resultCode);
163 } else {
164 resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
165 if (resultCode != JZlib.Z_OK){
166 ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
167 }
168 }
169 }
170 }
171
172 public ChannelFuture close() {
173 ChannelHandlerContext ctx = this.ctx;
174 if (ctx == null) {
175 throw new IllegalStateException("not added to a pipeline");
176 }
177 return finishEncode(ctx, null);
178 }
179
180 public boolean isClosed() {
181 return finished.get();
182 }
183
184 @Override
185 protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
186 if (!(msg instanceof ChannelBuffer) || finished.get()) {
187 return msg;
188 }
189
190 ChannelBuffer result;
191 synchronized (z) {
192 try {
193
194 ChannelBuffer uncompressed = (ChannelBuffer) msg;
195 byte[] in = new byte[uncompressed.readableBytes()];
196 uncompressed.readBytes(in);
197 z.next_in = in;
198 z.next_in_index = 0;
199 z.avail_in = in.length;
200
201
202 byte[] out = new byte[(int) Math.ceil(in.length * 1.001) + 12];
203 z.next_out = out;
204 z.next_out_index = 0;
205 z.avail_out = out.length;
206
207
208 int resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
209 if (resultCode != JZlib.Z_OK) {
210 ZlibUtil.fail(z, "compression failure", resultCode);
211 }
212
213 if (z.next_out_index != 0) {
214 result = ctx.getChannel().getConfig().getBufferFactory().getBuffer(
215 uncompressed.order(), out, 0, z.next_out_index);
216 } else {
217 result = ChannelBuffers.EMPTY_BUFFER;
218 }
219 } finally {
220
221
222
223
224 z.next_in = null;
225 z.next_out = null;
226 }
227 }
228
229 return result;
230 }
231
232 @Override
233 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
234 throws Exception {
235 if (evt instanceof ChannelStateEvent) {
236 ChannelStateEvent e = (ChannelStateEvent) evt;
237 switch (e.getState()) {
238 case OPEN:
239 case CONNECTED:
240 case BOUND:
241 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
242 finishEncode(ctx, evt);
243 return;
244 }
245 }
246 }
247
248 super.handleDownstream(ctx, evt);
249 }
250
251 private ChannelFuture finishEncode(final ChannelHandlerContext ctx, final ChannelEvent evt) {
252 if (!finished.compareAndSet(false, true)) {
253 if (evt != null) {
254 ctx.sendDownstream(evt);
255 }
256 return Channels.succeededFuture(ctx.getChannel());
257 }
258
259 ChannelBuffer footer;
260 ChannelFuture future;
261 synchronized (z) {
262 try {
263
264 z.next_in = EMPTY_ARRAY;
265 z.next_in_index = 0;
266 z.avail_in = 0;
267
268
269 byte[] out = new byte[32];
270 z.next_out = out;
271 z.next_out_index = 0;
272 z.avail_out = out.length;
273
274
275 int resultCode = z.deflate(JZlib.Z_FINISH);
276 if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
277 future = Channels.failedFuture(
278 ctx.getChannel(),
279 ZlibUtil.exception(z, "compression failure", resultCode));
280 footer = null;
281 } else if (z.next_out_index != 0) {
282 future = Channels.future(ctx.getChannel());
283 footer =
284 ctx.getChannel().getConfig().getBufferFactory().getBuffer(
285 out, 0, z.next_out_index);
286 } else {
287
288
289
290 future = Channels.future(ctx.getChannel());
291 footer = ChannelBuffers.EMPTY_BUFFER;
292 }
293 } finally {
294 z.deflateEnd();
295
296
297
298
299
300 z.next_in = null;
301 z.next_out = null;
302 }
303 }
304
305 if (footer != null) {
306 Channels.write(ctx, future, footer);
307 }
308
309 if (evt != null) {
310 future.addListener(new ChannelFutureListener() {
311 public void operationComplete(ChannelFuture future) throws Exception {
312 ctx.sendDownstream(evt);
313 }
314 });
315 }
316
317 return future;
318 }
319
320 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
321 this.ctx = ctx;
322 }
323
324 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
325
326 }
327
328 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
329
330 }
331
332 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
333
334 }
335 }