1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.stream;
17
18 import static org.jboss.netty.buffer.ChannelBuffers.*;
19
20 import java.io.InputStream;
21 import java.io.PushbackInputStream;
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 public class ChunkedStream implements ChunkedInput {
38
39 static final int DEFAULT_CHUNK_SIZE = 8192;
40
41 private final PushbackInputStream in;
42 private final int chunkSize;
43 private volatile long offset;
44
45
46
47
48 public ChunkedStream(InputStream in) {
49 this(in, DEFAULT_CHUNK_SIZE);
50 }
51
52
53
54
55
56
57
58 public ChunkedStream(InputStream in, int chunkSize) {
59 if (in == null) {
60 throw new NullPointerException("in");
61 }
62 if (chunkSize <= 0) {
63 throw new IllegalArgumentException(
64 "chunkSize: " + chunkSize +
65 " (expected: a positive integer)");
66 }
67
68 if (in instanceof PushbackInputStream) {
69 this.in = (PushbackInputStream) in;
70 } else {
71 this.in = new PushbackInputStream(in);
72 }
73 this.chunkSize = chunkSize;
74 }
75
76
77
78
79 public long getTransferredBytes() {
80 return offset;
81 }
82
83 public boolean hasNextChunk() throws Exception {
84 int b = in.read();
85 if (b < 0) {
86 return false;
87 } else {
88 in.unread(b);
89 return true;
90 }
91 }
92
93 public boolean isEndOfInput() throws Exception {
94 return !hasNextChunk();
95 }
96
97 public void close() throws Exception {
98 in.close();
99 }
100
101 public Object nextChunk() throws Exception {
102 if (!hasNextChunk()) {
103 return null;
104 }
105
106 final int availableBytes = in.available();
107 final int chunkSize;
108 if (availableBytes <= 0) {
109 chunkSize = this.chunkSize;
110 } else {
111 chunkSize = Math.min(this.chunkSize, in.available());
112 }
113 final byte[] chunk = new byte[chunkSize];
114 int readBytes = 0;
115 for (;;) {
116 int localReadBytes = in.read(chunk, readBytes, chunkSize - readBytes);
117 if (localReadBytes < 0) {
118 break;
119 }
120 readBytes += localReadBytes;
121 offset += localReadBytes;
122
123 if (readBytes == chunkSize) {
124 break;
125 }
126 }
127
128 return wrappedBuffer(chunk, 0, readBytes);
129 }
130 }