View Javadoc

1   /*
2    * Copyright 2010 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import java.io.IOException;
19  import java.lang.ref.SoftReference;
20  import java.net.SocketAddress;
21  import java.nio.ByteBuffer;
22  import java.nio.channels.DatagramChannel;
23  import java.nio.channels.WritableByteChannel;
24  
25  import org.jboss.netty.buffer.ChannelBuffer;
26  import org.jboss.netty.channel.DefaultFileRegion;
27  import org.jboss.netty.channel.FileRegion;
28  
29  /**
30   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
31   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
32   * @version $Rev: 2174 $, $Date: 2010-02-19 09:57:23 +0900 (Fri, 19 Feb 2010) $
33   */
34  final class SocketSendBufferPool {
35  
36      private static final SendBuffer EMPTY_BUFFER = new EmptySendBuffer();
37  
38      private static final int DEFAULT_PREALLOCATION_SIZE = 65536;
39      private static final int ALIGN_SHIFT = 4;
40      private static final int ALIGN_MASK = 15;
41  
42      PreallocationRef poolHead = null;
43      Preallocation current = new Preallocation(DEFAULT_PREALLOCATION_SIZE);
44  
45      SocketSendBufferPool() {
46          super();
47      }
48  
49      final SendBuffer acquire(Object message) {
50          if (message instanceof ChannelBuffer) {
51              return acquire((ChannelBuffer) message);
52          } else if (message instanceof FileRegion) {
53              return acquire((FileRegion) message);
54          }
55  
56          throw new IllegalArgumentException(
57                  "unsupported message type: " + message.getClass());
58      }
59  
60      private final SendBuffer acquire(FileRegion src) {
61          if (src.getCount() == 0) {
62              return EMPTY_BUFFER;
63          }
64          return new FileSendBuffer(src);
65      }
66  
67      private final SendBuffer acquire(ChannelBuffer src) {
68          final int size = src.readableBytes();
69          if (size == 0) {
70              return EMPTY_BUFFER;
71          }
72  
73          if (src.isDirect()) {
74              return new UnpooledSendBuffer(src.toByteBuffer());
75          }
76          if (src.readableBytes() > DEFAULT_PREALLOCATION_SIZE) {
77              return new UnpooledSendBuffer(src.toByteBuffer());
78          }
79  
80          Preallocation current = this.current;
81          ByteBuffer buffer = current.buffer;
82          int remaining = buffer.remaining();
83          PooledSendBuffer dst;
84  
85          if (size < remaining) {
86              int nextPos = buffer.position() + size;
87              ByteBuffer slice = buffer.duplicate();
88              buffer.position(align(nextPos));
89              slice.limit(nextPos);
90              current.refCnt ++;
91              dst = new PooledSendBuffer(current, slice);
92          } else if (size > remaining) {
93              this.current = current = getPreallocation();
94              buffer = current.buffer;
95              ByteBuffer slice = buffer.duplicate();
96              buffer.position(align(size));
97              slice.limit(size);
98              current.refCnt ++;
99              dst = new PooledSendBuffer(current, slice);
100         } else { // size == remaining
101             current.refCnt ++;
102             this.current = getPreallocation0();
103             dst = new PooledSendBuffer(current, current.buffer);
104         }
105 
106         ByteBuffer dstbuf = dst.buffer;
107         dstbuf.mark();
108         src.getBytes(src.readerIndex(), dstbuf);
109         dstbuf.reset();
110         return dst;
111     }
112 
113     private final Preallocation getPreallocation() {
114         Preallocation current = this.current;
115         if (current.refCnt == 0) {
116             current.buffer.clear();
117             return current;
118         }
119 
120         return getPreallocation0();
121     }
122 
123     private final Preallocation getPreallocation0() {
124         PreallocationRef ref = poolHead;
125         if (ref != null) {
126             do {
127                 Preallocation p = ref.get();
128                 ref = ref.next;
129 
130                 if (p != null) {
131                     poolHead = ref;
132                     return p;
133                 }
134             } while (ref != null);
135 
136             poolHead = ref;
137         }
138 
139         return new Preallocation(DEFAULT_PREALLOCATION_SIZE);
140     }
141 
142     private static final int align(int pos) {
143         int q = pos >>> ALIGN_SHIFT;
144         int r = pos & ALIGN_MASK;
145         if (r != 0) {
146             q ++;
147         }
148         return q << ALIGN_SHIFT;
149     }
150 
151     private final class Preallocation {
152         final ByteBuffer buffer;
153         int refCnt;
154 
155         Preallocation(int capacity) {
156             buffer = ByteBuffer.allocateDirect(capacity);
157         }
158     }
159 
160     private final class PreallocationRef extends SoftReference<Preallocation> {
161         final PreallocationRef next;
162 
163         PreallocationRef(Preallocation prealloation, PreallocationRef next) {
164             super(prealloation);
165             this.next = next;
166         }
167     }
168 
169     interface SendBuffer {
170         boolean finished();
171         long writtenBytes();
172         long totalBytes();
173 
174         long transferTo(WritableByteChannel ch) throws IOException;
175         long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException;
176 
177         void release();
178     }
179 
180     class UnpooledSendBuffer implements SendBuffer {
181 
182         final ByteBuffer buffer;
183         final int initialPos;
184 
185         UnpooledSendBuffer(ByteBuffer buffer) {
186             this.buffer = buffer;
187             initialPos = buffer.position();
188         }
189 
190         public final boolean finished() {
191             return !buffer.hasRemaining();
192         }
193 
194         public final long writtenBytes() {
195             return buffer.position() - initialPos;
196         }
197 
198         public final long totalBytes() {
199             return buffer.limit() - initialPos;
200         }
201 
202         public final long transferTo(WritableByteChannel ch) throws IOException {
203             return ch.write(buffer);
204         }
205 
206         public final long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
207             return ch.send(buffer, raddr);
208         }
209 
210         public void release() {
211             // Unpooled.
212         }
213     }
214 
215     final class PooledSendBuffer implements SendBuffer {
216 
217         private final Preallocation parent;
218         final ByteBuffer buffer;
219         final int initialPos;
220 
221         PooledSendBuffer(Preallocation parent, ByteBuffer buffer) {
222             this.parent = parent;
223             this.buffer = buffer;
224             initialPos = buffer.position();
225         }
226 
227         public boolean finished() {
228             return !buffer.hasRemaining();
229         }
230 
231         public long writtenBytes() {
232             return buffer.position() - initialPos;
233         }
234 
235         public long totalBytes() {
236             return buffer.limit() - initialPos;
237         }
238 
239         public long transferTo(WritableByteChannel ch) throws IOException {
240             return ch.write(buffer);
241         }
242 
243         public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
244             return ch.send(buffer, raddr);
245         }
246 
247         public void release() {
248             final Preallocation parent = this.parent;
249             if (-- parent.refCnt == 0) {
250                 parent.buffer.clear();
251                 if (parent != current) {
252                     poolHead = new PreallocationRef(parent, poolHead);
253                 }
254             }
255         }
256     }
257 
258     final class FileSendBuffer implements SendBuffer {
259 
260         private final FileRegion file;
261         private long writtenBytes;
262 
263 
264         FileSendBuffer(FileRegion file) {
265             this.file = file;
266         }
267 
268         public boolean finished() {
269             return writtenBytes >= file.getCount();
270         }
271 
272         public long writtenBytes() {
273             return writtenBytes;
274         }
275 
276         public long totalBytes() {
277             return file.getCount();
278         }
279 
280         public long transferTo(WritableByteChannel ch) throws IOException {
281             long localWrittenBytes = file.transferTo(ch, writtenBytes);
282             writtenBytes += localWrittenBytes;
283             return localWrittenBytes;
284         }
285 
286         public long transferTo(DatagramChannel ch, SocketAddress raddr)
287                 throws IOException {
288             throw new UnsupportedOperationException();
289         }
290 
291         public void release() {
292             if (file instanceof DefaultFileRegion) {
293                 if (((DefaultFileRegion)file).releaseAfterTransfer()) {
294                     // Make sure the FileRegion resource are released otherwise it may cause a FD leak or something similar
295                     file.releaseExternalResources();
296                 }
297             }
298         }
299     }
300 
301     static final class EmptySendBuffer implements SendBuffer {
302 
303         EmptySendBuffer() {
304             super();
305         }
306 
307         public final boolean finished() {
308             return true;
309         }
310 
311         public final long writtenBytes() {
312             return 0;
313         }
314 
315         public final long totalBytes() {
316             return 0;
317         }
318 
319         public final long transferTo(WritableByteChannel ch) throws IOException {
320             return 0;
321         }
322 
323         public final long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
324             return 0;
325         }
326 
327         public void release() {
328             // Unpooled.
329         }
330     }
331 }