View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.compression;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.buffer.ChannelBuffers;
20  import org.jboss.netty.channel.Channel;
21  import org.jboss.netty.channel.ChannelHandlerContext;
22  import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
23  import org.jboss.netty.util.internal.jzlib.JZlib;
24  import org.jboss.netty.util.internal.jzlib.ZStream;
25  
26  
27  /**
28   * Decompresses a {@link ChannelBuffer} using the deflate algorithm.
29   *
30   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
31   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
32   * @version $Rev: 2241 $, $Date: 2010-04-16 13:12:43 +0900 (Fri, 16 Apr 2010) $
33   *
34   * @apiviz.landmark
35   * @apiviz.has org.jboss.netty.handler.codec.compression.ZlibWrapper
36   */
37  public class ZlibDecoder extends OneToOneDecoder {
38  
39      private final ZStream z = new ZStream();
40      private volatile boolean finished;
41  
42      /**
43       * Creates a new instance with the default wrapper ({@link ZlibWrapper#ZLIB}).
44       *
45       * @throws CompressionException if failed to initialize zlib
46       */
47      public ZlibDecoder() {
48          this(ZlibWrapper.ZLIB);
49      }
50  
51      /**
52       * Creates a new instance with the specified wrapper.
53       *
54       * @throws CompressionException if failed to initialize zlib
55       */
56      public ZlibDecoder(ZlibWrapper wrapper) {
57          if (wrapper == null) {
58              throw new NullPointerException("wrapper");
59          }
60  
61          synchronized (z) {
62              int resultCode = z.inflateInit(ZlibUtil.convertWrapperType(wrapper));
63              if (resultCode != JZlib.Z_OK) {
64                  ZlibUtil.fail(z, "initialization failure", resultCode);
65              }
66          }
67      }
68  
69      /**
70       * Creates a new instance with the specified preset dictionary. The wrapper
71       * is always {@link ZlibWrapper#ZLIB} because it is the only format that
72       * supports the preset dictionary.
73       *
74       * @throws CompressionException if failed to initialize zlib
75       */
76      public ZlibDecoder(byte[] dictionary) {
77          if (dictionary == null) {
78              throw new NullPointerException("dictionary");
79          }
80  
81          synchronized (z) {
82              int resultCode;
83              resultCode = z.inflateInit(JZlib.W_ZLIB);
84              if (resultCode != JZlib.Z_OK) {
85                  ZlibUtil.fail(z, "initialization failure", resultCode);
86              } else {
87                  resultCode = z.inflateSetDictionary(dictionary, dictionary.length);
88                  if (resultCode != JZlib.Z_OK) {
89                      ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
90                  }
91              }
92          }
93      }
94  
95      /**
96       * Returns {@code true} if and only if the end of the compressed stream
97       * has been reached.
98       */
99      public boolean isClosed() {
100         return finished;
101     }
102 
103     @Override
104     protected Object decode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
105         if (!(msg instanceof ChannelBuffer) || finished) {
106             return msg;
107         }
108 
109         synchronized (z) {
110             try {
111                 // Configure input.
112                 ChannelBuffer compressed = (ChannelBuffer) msg;
113                 byte[] in = new byte[compressed.readableBytes()];
114                 compressed.readBytes(in);
115                 z.next_in = in;
116                 z.next_in_index = 0;
117                 z.avail_in = in.length;
118 
119                 // Configure output.
120                 byte[] out = new byte[in.length << 1];
121                 ChannelBuffer decompressed = ChannelBuffers.dynamicBuffer(
122                         compressed.order(), out.length,
123                         ctx.getChannel().getConfig().getBufferFactory());
124                 z.next_out = out;
125                 z.next_out_index = 0;
126                 z.avail_out = out.length;
127 
128 
129                 loop: for (;;) {
130                     // Decompress 'in' into 'out'
131                     int resultCode = z.inflate(JZlib.Z_SYNC_FLUSH);
132                     if (z.next_out_index > 0) {
133                         decompressed.writeBytes(out, 0, z.next_out_index);
134                         z.avail_out = out.length;
135                     }
136                     z.next_out_index = 0;
137 
138                     switch (resultCode) {
139                     case JZlib.Z_STREAM_END:
140                         finished = true; // Do not decode anymore.
141                         z.inflateEnd();
142                         break loop;
143                     case JZlib.Z_OK:
144                         break;
145                     case JZlib.Z_BUF_ERROR:
146                         if (z.avail_in <= 0) {
147                             break loop;
148                         }
149                         break;
150                     default:
151                         ZlibUtil.fail(z, "decompression failure", resultCode);
152                     }
153                 }
154 
155                 if (decompressed.writerIndex() != 0) { // readerIndex is always 0
156                     return decompressed;
157                 } else {
158                     return null;
159                 }
160             } finally {
161                 // Deference the external references explicitly to tell the VM that
162                 // the allocated byte arrays are temporary so that the call stack
163                 // can be utilized.
164                 // I'm not sure if the modern VMs do this optimization though.
165                 z.next_in = null;
166                 z.next_out = null;
167             }
168         }
169     }
170 }