1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import java.io.IOException;
19 import java.lang.ref.SoftReference;
20 import java.net.SocketAddress;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.DatagramChannel;
23 import java.nio.channels.WritableByteChannel;
24
25 import org.jboss.netty.buffer.ChannelBuffer;
26 import org.jboss.netty.channel.DefaultFileRegion;
27 import org.jboss.netty.channel.FileRegion;
28
29
30
31
32
33
34 final class SocketSendBufferPool {
35
36 private static final SendBuffer EMPTY_BUFFER = new EmptySendBuffer();
37
38 private static final int DEFAULT_PREALLOCATION_SIZE = 65536;
39 private static final int ALIGN_SHIFT = 4;
40 private static final int ALIGN_MASK = 15;
41
42 PreallocationRef poolHead = null;
43 Preallocation current = new Preallocation(DEFAULT_PREALLOCATION_SIZE);
44
45 SocketSendBufferPool() {
46 super();
47 }
48
49 final SendBuffer acquire(Object message) {
50 if (message instanceof ChannelBuffer) {
51 return acquire((ChannelBuffer) message);
52 } else if (message instanceof FileRegion) {
53 return acquire((FileRegion) message);
54 }
55
56 throw new IllegalArgumentException(
57 "unsupported message type: " + message.getClass());
58 }
59
60 private final SendBuffer acquire(FileRegion src) {
61 if (src.getCount() == 0) {
62 return EMPTY_BUFFER;
63 }
64 return new FileSendBuffer(src);
65 }
66
67 private final SendBuffer acquire(ChannelBuffer src) {
68 final int size = src.readableBytes();
69 if (size == 0) {
70 return EMPTY_BUFFER;
71 }
72
73 if (src.isDirect()) {
74 return new UnpooledSendBuffer(src.toByteBuffer());
75 }
76 if (src.readableBytes() > DEFAULT_PREALLOCATION_SIZE) {
77 return new UnpooledSendBuffer(src.toByteBuffer());
78 }
79
80 Preallocation current = this.current;
81 ByteBuffer buffer = current.buffer;
82 int remaining = buffer.remaining();
83 PooledSendBuffer dst;
84
85 if (size < remaining) {
86 int nextPos = buffer.position() + size;
87 ByteBuffer slice = buffer.duplicate();
88 buffer.position(align(nextPos));
89 slice.limit(nextPos);
90 current.refCnt ++;
91 dst = new PooledSendBuffer(current, slice);
92 } else if (size > remaining) {
93 this.current = current = getPreallocation();
94 buffer = current.buffer;
95 ByteBuffer slice = buffer.duplicate();
96 buffer.position(align(size));
97 slice.limit(size);
98 current.refCnt ++;
99 dst = new PooledSendBuffer(current, slice);
100 } else {
101 current.refCnt ++;
102 this.current = getPreallocation0();
103 dst = new PooledSendBuffer(current, current.buffer);
104 }
105
106 ByteBuffer dstbuf = dst.buffer;
107 dstbuf.mark();
108 src.getBytes(src.readerIndex(), dstbuf);
109 dstbuf.reset();
110 return dst;
111 }
112
113 private final Preallocation getPreallocation() {
114 Preallocation current = this.current;
115 if (current.refCnt == 0) {
116 current.buffer.clear();
117 return current;
118 }
119
120 return getPreallocation0();
121 }
122
123 private final Preallocation getPreallocation0() {
124 PreallocationRef ref = poolHead;
125 if (ref != null) {
126 do {
127 Preallocation p = ref.get();
128 ref = ref.next;
129
130 if (p != null) {
131 poolHead = ref;
132 return p;
133 }
134 } while (ref != null);
135
136 poolHead = ref;
137 }
138
139 return new Preallocation(DEFAULT_PREALLOCATION_SIZE);
140 }
141
142 private static final int align(int pos) {
143 int q = pos >>> ALIGN_SHIFT;
144 int r = pos & ALIGN_MASK;
145 if (r != 0) {
146 q ++;
147 }
148 return q << ALIGN_SHIFT;
149 }
150
151 private final class Preallocation {
152 final ByteBuffer buffer;
153 int refCnt;
154
155 Preallocation(int capacity) {
156 buffer = ByteBuffer.allocateDirect(capacity);
157 }
158 }
159
160 private final class PreallocationRef extends SoftReference<Preallocation> {
161 final PreallocationRef next;
162
163 PreallocationRef(Preallocation prealloation, PreallocationRef next) {
164 super(prealloation);
165 this.next = next;
166 }
167 }
168
169 interface SendBuffer {
170 boolean finished();
171 long writtenBytes();
172 long totalBytes();
173
174 long transferTo(WritableByteChannel ch) throws IOException;
175 long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException;
176
177 void release();
178 }
179
180 class UnpooledSendBuffer implements SendBuffer {
181
182 final ByteBuffer buffer;
183 final int initialPos;
184
185 UnpooledSendBuffer(ByteBuffer buffer) {
186 this.buffer = buffer;
187 initialPos = buffer.position();
188 }
189
190 public final boolean finished() {
191 return !buffer.hasRemaining();
192 }
193
194 public final long writtenBytes() {
195 return buffer.position() - initialPos;
196 }
197
198 public final long totalBytes() {
199 return buffer.limit() - initialPos;
200 }
201
202 public final long transferTo(WritableByteChannel ch) throws IOException {
203 return ch.write(buffer);
204 }
205
206 public final long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
207 return ch.send(buffer, raddr);
208 }
209
210 public void release() {
211
212 }
213 }
214
215 final class PooledSendBuffer implements SendBuffer {
216
217 private final Preallocation parent;
218 final ByteBuffer buffer;
219 final int initialPos;
220
221 PooledSendBuffer(Preallocation parent, ByteBuffer buffer) {
222 this.parent = parent;
223 this.buffer = buffer;
224 initialPos = buffer.position();
225 }
226
227 public boolean finished() {
228 return !buffer.hasRemaining();
229 }
230
231 public long writtenBytes() {
232 return buffer.position() - initialPos;
233 }
234
235 public long totalBytes() {
236 return buffer.limit() - initialPos;
237 }
238
239 public long transferTo(WritableByteChannel ch) throws IOException {
240 return ch.write(buffer);
241 }
242
243 public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
244 return ch.send(buffer, raddr);
245 }
246
247 public void release() {
248 final Preallocation parent = this.parent;
249 if (-- parent.refCnt == 0) {
250 parent.buffer.clear();
251 if (parent != current) {
252 poolHead = new PreallocationRef(parent, poolHead);
253 }
254 }
255 }
256 }
257
258 final class FileSendBuffer implements SendBuffer {
259
260 private final FileRegion file;
261 private long writtenBytes;
262
263
264 FileSendBuffer(FileRegion file) {
265 this.file = file;
266 }
267
268 public boolean finished() {
269 return writtenBytes >= file.getCount();
270 }
271
272 public long writtenBytes() {
273 return writtenBytes;
274 }
275
276 public long totalBytes() {
277 return file.getCount();
278 }
279
280 public long transferTo(WritableByteChannel ch) throws IOException {
281 long localWrittenBytes = file.transferTo(ch, writtenBytes);
282 writtenBytes += localWrittenBytes;
283 return localWrittenBytes;
284 }
285
286 public long transferTo(DatagramChannel ch, SocketAddress raddr)
287 throws IOException {
288 throw new UnsupportedOperationException();
289 }
290
291 public void release() {
292 if (file instanceof DefaultFileRegion) {
293 if (((DefaultFileRegion)file).releaseAfterTransfer()) {
294
295 file.releaseExternalResources();
296 }
297 }
298 }
299 }
300
301 static final class EmptySendBuffer implements SendBuffer {
302
303 EmptySendBuffer() {
304 super();
305 }
306
307 public final boolean finished() {
308 return true;
309 }
310
311 public final long writtenBytes() {
312 return 0;
313 }
314
315 public final long totalBytes() {
316 return 0;
317 }
318
319 public final long transferTo(WritableByteChannel ch) throws IOException {
320 return 0;
321 }
322
323 public final long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
324 return 0;
325 }
326
327 public void release() {
328
329 }
330 }
331 }