View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.compression;
17  
18  import java.util.concurrent.atomic.AtomicBoolean;
19  
20  import org.jboss.netty.buffer.ChannelBuffer;
21  import org.jboss.netty.buffer.ChannelBuffers;
22  import org.jboss.netty.channel.Channel;
23  import org.jboss.netty.channel.ChannelEvent;
24  import org.jboss.netty.channel.ChannelFuture;
25  import org.jboss.netty.channel.ChannelFutureListener;
26  import org.jboss.netty.channel.ChannelHandlerContext;
27  import org.jboss.netty.channel.ChannelStateEvent;
28  import org.jboss.netty.channel.Channels;
29  import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
30  import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
31  import org.jboss.netty.util.internal.jzlib.JZlib;
32  import org.jboss.netty.util.internal.jzlib.ZStream;
33  
34  
35  /**
36   * Compresses a {@link ChannelBuffer} using the deflate algorithm.
37   *
38   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
39   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
40   * @version $Rev: 2241 $, $Date: 2010-04-16 13:12:43 +0900 (Fri, 16 Apr 2010) $
41   *
42   * @apiviz.landmark
43   * @apiviz.has org.jboss.netty.handler.codec.compression.ZlibWrapper
44   */
45  public class ZlibEncoder extends OneToOneEncoder implements LifeCycleAwareChannelHandler {
46  
47      private static final byte[] EMPTY_ARRAY = new byte[0];
48  
49      private final ZStream z = new ZStream();
50      private final AtomicBoolean finished = new AtomicBoolean();
51      private volatile ChannelHandlerContext ctx;
52  
53      /**
54       * Creates a new zlib encoder with the default compression level ({@code 6})
55       * and the default wrapper ({@link ZlibWrapper#ZLIB}).
56       *
57       * @throws CompressionException if failed to initialize zlib
58       */
59      public ZlibEncoder() {
60          this(6);
61      }
62  
63      /**
64       * Creates a new zlib encoder with the specified {@code compressionLevel}
65       * and the default wrapper ({@link ZlibWrapper#ZLIB}).
66       *
67       * @param compressionLevel
68       *        {@code 1} yields the fastest compression and {@code 9} yields the
69       *        best compression.  {@code 0} means no compression.  The default
70       *        compression level is {@code 6}.
71       *
72       * @throws CompressionException if failed to initialize zlib
73       */
74      public ZlibEncoder(int compressionLevel) {
75          this(ZlibWrapper.ZLIB, compressionLevel);
76      }
77  
78      /**
79       * Creates a new zlib encoder with the default compression level ({@code 6})
80       * and the specified wrapper.
81       *
82       * @throws CompressionException if failed to initialize zlib
83       */
84      public ZlibEncoder(ZlibWrapper wrapper) {
85          this(wrapper, 6);
86      }
87  
88      /**
89       * Creates a new zlib encoder with the specified {@code compressionLevel}
90       * and the specified wrapper.
91       * @param compressionLevel
92       *        {@code 1} yields the fastest compression and {@code 9} yields the
93       *        best compression.  {@code 0} means no compression.  The default
94       *        compression level is {@code 6}.
95       *
96       * @throws CompressionException if failed to initialize zlib
97       */
98      public ZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
99          if (compressionLevel < 0 || compressionLevel > 9) {
100             throw new IllegalArgumentException(
101                     "compressionLevel: " + compressionLevel +
102                     " (expected: 0-9)");
103         }
104         if (wrapper == null) {
105             throw new NullPointerException("wrapper");
106         }
107         if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
108             throw new IllegalArgumentException(
109                     "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
110                     "allowed for compression.");
111         }
112 
113         synchronized (z) {
114             int resultCode = z.deflateInit(compressionLevel, ZlibUtil.convertWrapperType(wrapper));
115             if (resultCode != JZlib.Z_OK) {
116                 ZlibUtil.fail(z, "initialization failure", resultCode);
117             }
118         }
119     }
120 
121     /**
122      * Creates a new zlib encoder with the default compression level ({@code 6})
123      * and the specified preset dictionary.  The wrapper is always
124      * {@link ZlibWrapper#ZLIB} because it is the only format that supports
125      * the preset dictionary.
126      *
127      * @param dictionary  the preset dictionary
128      *
129      * @throws CompressionException if failed to initialize zlib
130      */
131     public ZlibEncoder(byte[] dictionary) {
132         this(6, dictionary);
133     }
134 
135     /**
136      * Creates a new zlib encoder with the specified {@code compressionLevel}
137      * and the specified preset dictionary.  The wrapper is always
138      * {@link ZlibWrapper#ZLIB} because it is the only format that supports
139      * the preset dictionary.
140      *
141      * @param compressionLevel
142      *        {@code 1} yields the fastest compression and {@code 9} yields the
143      *        best compression.  {@code 0} means no compression.  The default
144      *        compression level is {@code 6}.
145      * @param dictionary  the preset dictionary
146      *
147      * @throws CompressionException if failed to initialize zlib
148      */
149     public ZlibEncoder(int compressionLevel, byte[] dictionary) {
150         if (compressionLevel < 0 || compressionLevel > 9) {
151             throw new IllegalArgumentException("compressionLevel: " + compressionLevel + " (expected: 0-9)");
152         }
153 
154         if (dictionary == null) {
155             throw new NullPointerException("dictionary");
156         }
157 
158         synchronized (z) {
159             int resultCode;
160             resultCode = z.deflateInit(compressionLevel, JZlib.W_ZLIB); // Default: ZLIB format
161             if (resultCode != JZlib.Z_OK) {
162                 ZlibUtil.fail(z, "initialization failure", resultCode);
163             } else {
164                 resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
165                 if (resultCode != JZlib.Z_OK){
166                     ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
167                 }
168             }
169         }
170     }
171 
172     public ChannelFuture close() {
173         ChannelHandlerContext ctx = this.ctx;
174         if (ctx == null) {
175             throw new IllegalStateException("not added to a pipeline");
176         }
177         return finishEncode(ctx, null);
178     }
179 
180     public boolean isClosed() {
181         return finished.get();
182     }
183 
184     @Override
185     protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
186         if (!(msg instanceof ChannelBuffer) || finished.get()) {
187             return msg;
188         }
189 
190         ChannelBuffer result;
191         synchronized (z) {
192             try {
193                 // Configure input.
194                 ChannelBuffer uncompressed = (ChannelBuffer) msg;
195                 byte[] in = new byte[uncompressed.readableBytes()];
196                 uncompressed.readBytes(in);
197                 z.next_in = in;
198                 z.next_in_index = 0;
199                 z.avail_in = in.length;
200 
201                 // Configure output.
202                 byte[] out = new byte[(int) Math.ceil(in.length * 1.001) + 12];
203                 z.next_out = out;
204                 z.next_out_index = 0;
205                 z.avail_out = out.length;
206 
207                 // Note that Z_PARTIAL_FLUSH has been deprecated.
208                 int resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
209                 if (resultCode != JZlib.Z_OK) {
210                     ZlibUtil.fail(z, "compression failure", resultCode);
211                 }
212 
213                 if (z.next_out_index != 0) {
214                     result = ctx.getChannel().getConfig().getBufferFactory().getBuffer(
215                             uncompressed.order(), out, 0, z.next_out_index);
216                 } else {
217                     result = ChannelBuffers.EMPTY_BUFFER;
218                 }
219             } finally {
220                 // Deference the external references explicitly to tell the VM that
221                 // the allocated byte arrays are temporary so that the call stack
222                 // can be utilized.
223                 // I'm not sure if the modern VMs do this optimization though.
224                 z.next_in = null;
225                 z.next_out = null;
226             }
227         }
228 
229         return result;
230     }
231 
232     @Override
233     public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent evt)
234             throws Exception {
235         if (evt instanceof ChannelStateEvent) {
236             ChannelStateEvent e = (ChannelStateEvent) evt;
237             switch (e.getState()) {
238             case OPEN:
239             case CONNECTED:
240             case BOUND:
241                 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
242                     finishEncode(ctx, evt);
243                     return;
244                 }
245             }
246         }
247 
248         super.handleDownstream(ctx, evt);
249     }
250 
251     private ChannelFuture finishEncode(final ChannelHandlerContext ctx, final ChannelEvent evt) {
252         if (!finished.compareAndSet(false, true)) {
253             if (evt != null) {
254                 ctx.sendDownstream(evt);
255             }
256             return Channels.succeededFuture(ctx.getChannel());
257         }
258 
259         ChannelBuffer footer;
260         ChannelFuture future;
261         synchronized (z) {
262             try {
263                 // Configure input.
264                 z.next_in = EMPTY_ARRAY;
265                 z.next_in_index = 0;
266                 z.avail_in = 0;
267 
268                 // Configure output.
269                 byte[] out = new byte[32]; // room for ADLER32 + ZLIB / CRC32 + GZIP header
270                 z.next_out = out;
271                 z.next_out_index = 0;
272                 z.avail_out = out.length;
273 
274                 // Write the ADLER32 checksum (stream footer).
275                 int resultCode = z.deflate(JZlib.Z_FINISH);
276                 if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
277                     future = Channels.failedFuture(
278                             ctx.getChannel(),
279                             ZlibUtil.exception(z, "compression failure", resultCode));
280                     footer = null;
281                 } else if (z.next_out_index != 0) {
282                     future = Channels.future(ctx.getChannel());
283                     footer =
284                         ctx.getChannel().getConfig().getBufferFactory().getBuffer(
285                                 out, 0, z.next_out_index);
286                 } else {
287                     // Note that we should never use a SucceededChannelFuture
288                     // here just in case any downstream handler or a sink wants
289                     // to notify a write error.
290                     future = Channels.future(ctx.getChannel());
291                     footer = ChannelBuffers.EMPTY_BUFFER;
292                 }
293             } finally {
294                 z.deflateEnd();
295 
296                 // Deference the external references explicitly to tell the VM that
297                 // the allocated byte arrays are temporary so that the call stack
298                 // can be utilized.
299                 // I'm not sure if the modern VMs do this optimization though.
300                 z.next_in = null;
301                 z.next_out = null;
302             }
303         }
304 
305         if (footer != null) {
306             Channels.write(ctx, future, footer);
307         }
308 
309         if (evt != null) {
310             future.addListener(new ChannelFutureListener() {
311                 public void operationComplete(ChannelFuture future) throws Exception {
312                     ctx.sendDownstream(evt);
313                 }
314             });
315         }
316 
317         return future;
318     }
319 
320     public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
321         this.ctx = ctx;
322     }
323 
324     public void afterAdd(ChannelHandlerContext ctx) throws Exception {
325         // Unused
326     }
327 
328     public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
329         // Unused
330     }
331 
332     public void afterRemove(ChannelHandlerContext ctx) throws Exception {
333         // Unused
334     }
335 }