View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.stream;
17  
18  import static org.jboss.netty.buffer.ChannelBuffers.*;
19  
20  import java.io.InputStream;
21  import java.io.PushbackInputStream;
22  
23  /**
24   * A {@link ChunkedInput} that fetches data from an {@link InputStream} chunk by
25   * chunk.
26   * <p>
27   * Please note that the {@link InputStream} instance that feeds data into
28   * {@link ChunkedStream} must implement {@link InputStream#available()} as
29   * accurately as possible, rather than using the default implementation.
30   * Otherwise, {@link ChunkedStream} will generate many too small chunks or
31   * block unnecessarily often.
32   *
33   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
34   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
35   * @version $Rev: 2236 $, $Date: 2010-04-12 19:22:51 +0900 (Mon, 12 Apr 2010) $
36   */
37  public class ChunkedStream implements ChunkedInput {
38  
39      static final int DEFAULT_CHUNK_SIZE = 8192;
40  
41      private final PushbackInputStream in;
42      private final int chunkSize;
43      private volatile long offset;
44  
45      /**
46       * Creates a new instance that fetches data from the specified stream.
47       */
48      public ChunkedStream(InputStream in) {
49          this(in, DEFAULT_CHUNK_SIZE);
50      }
51  
52      /**
53       * Creates a new instance that fetches data from the specified stream.
54       *
55       * @param chunkSize the number of bytes to fetch on each
56       *                  {@link #nextChunk()} call
57       */
58      public ChunkedStream(InputStream in, int chunkSize) {
59          if (in == null) {
60              throw new NullPointerException("in");
61          }
62          if (chunkSize <= 0) {
63              throw new IllegalArgumentException(
64                      "chunkSize: " + chunkSize +
65                      " (expected: a positive integer)");
66          }
67  
68          if (in instanceof PushbackInputStream) {
69              this.in = (PushbackInputStream) in;
70          } else {
71              this.in = new PushbackInputStream(in);
72          }
73          this.chunkSize = chunkSize;
74      }
75  
76      /**
77       * Returns the number of transferred bytes.
78       */
79      public long getTransferredBytes() {
80          return offset;
81      }
82  
83      public boolean hasNextChunk() throws Exception {
84          int b = in.read();
85          if (b < 0) {
86              return false;
87          } else {
88              in.unread(b);
89              return true;
90          }
91      }
92  
93      public boolean isEndOfInput() throws Exception {
94          return !hasNextChunk();
95      }
96  
97      public void close() throws Exception {
98          in.close();
99      }
100 
101     public Object nextChunk() throws Exception {
102         if (!hasNextChunk()) {
103             return null;
104         }
105 
106         final int availableBytes = in.available();
107         final int chunkSize;
108         if (availableBytes <= 0) {
109             chunkSize = this.chunkSize;
110         } else {
111             chunkSize = Math.min(this.chunkSize, in.available());
112         }
113         final byte[] chunk = new byte[chunkSize];
114         int readBytes = 0;
115         for (;;) {
116             int localReadBytes = in.read(chunk, readBytes, chunkSize - readBytes);
117             if (localReadBytes < 0) {
118                 break;
119             }
120             readBytes += localReadBytes;
121             offset += localReadBytes;
122 
123             if (readBytes == chunkSize) {
124                 break;
125             }
126         }
127 
128         return wrappedBuffer(chunk, 0, readBytes);
129     }
130 }