1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.stream;
17
18 import static org.jboss.netty.buffer.ChannelBuffers.*;
19
20 import java.nio.ByteBuffer;
21 import java.nio.channels.ReadableByteChannel;
22
23 import org.jboss.netty.buffer.ChannelBuffer;
24
25
26
27
28
29
30
31
32
33
34
35 public class ChunkedNioStream implements ChunkedInput {
36
37 private final ReadableByteChannel in;
38
39 private final int chunkSize;
40 private volatile long offset;
41
42
43
44
45 private final ByteBuffer byteBuffer;
46
47
48
49
50 public ChunkedNioStream(ReadableByteChannel in) {
51 this(in, ChunkedStream.DEFAULT_CHUNK_SIZE);
52 }
53
54
55
56
57
58
59
60 public ChunkedNioStream(ReadableByteChannel in, int chunkSize) {
61 if (in == null) {
62 throw new NullPointerException("in");
63 }
64 if (chunkSize <= 0) {
65 throw new IllegalArgumentException("chunkSize: " + chunkSize +
66 " (expected: a positive integer)");
67 }
68 this.in = in;
69 offset = 0;
70 this.chunkSize = chunkSize;
71 byteBuffer = ByteBuffer.allocate(chunkSize);
72 }
73
74
75
76
77 public long getTransferredBytes() {
78 return offset;
79 }
80
81 public boolean hasNextChunk() throws Exception {
82 if (byteBuffer.position() > 0) {
83
84 return true;
85 }
86 if (in.isOpen()) {
87
88 int b = in.read(byteBuffer);
89 if (b < 0) {
90 return false;
91 } else {
92 offset += b;
93 return true;
94 }
95 }
96 return false;
97 }
98
99 public boolean isEndOfInput() throws Exception {
100 return !hasNextChunk();
101 }
102
103 public void close() throws Exception {
104 in.close();
105 }
106
107 public Object nextChunk() throws Exception {
108 if (!hasNextChunk()) {
109 return null;
110 }
111
112 int readBytes = byteBuffer.position();
113 for (;;) {
114 int localReadBytes = in.read(byteBuffer);
115 if (localReadBytes < 0) {
116 break;
117 }
118 readBytes += localReadBytes;
119 offset += localReadBytes;
120
121 if (readBytes == chunkSize) {
122 break;
123 }
124 }
125 byteBuffer.flip();
126
127 ChannelBuffer buffer = copiedBuffer(byteBuffer);
128 byteBuffer.clear();
129 return buffer;
130 }
131 }