View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.buffer;
17  
18  import java.io.IOException;
19  import java.io.InputStream;
20  import java.io.OutputStream;
21  import java.nio.ByteBuffer;
22  import java.nio.ByteOrder;
23  import java.nio.channels.GatheringByteChannel;
24  import java.nio.channels.ScatteringByteChannel;
25  import java.util.ArrayList;
26  import java.util.Collections;
27  import java.util.List;
28  
29  
30  /**
31   * A virtual buffer which shows multiple buffers as a single merged buffer.  It
32   * is recommended to use {@link ChannelBuffers#wrappedBuffer(ChannelBuffer...)}
33   * instead of calling the constructor explicitly.
34   *
35   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
36   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
37   * @author Frederic Bregier (fredbregier@free.fr)
38   *
39   * @version $Rev: 2206 $, $Date: 2010-03-03 14:35:01 +0900 (Wed, 03 Mar 2010) $
40   *
41   */
42  public class CompositeChannelBuffer extends AbstractChannelBuffer {
43  
44      private final ByteOrder order;
45      private ChannelBuffer[] components;
46      private int[] indices;
47      private int lastAccessedComponentId;
48  
49      public CompositeChannelBuffer(ByteOrder endianness, List<ChannelBuffer> buffers) {
50          order = endianness;
51          setComponents(buffers);
52      }
53  
54      /**
55       * Same with {@link #slice(int, int)} except that this method returns a list.
56       */
57      public List<ChannelBuffer> decompose(int index, int length) {
58          if (length == 0) {
59              return Collections.emptyList();
60          }
61  
62          if (index + length > capacity()) {
63              throw new IndexOutOfBoundsException();
64          }
65  
66          int componentId = componentId(index);
67          List<ChannelBuffer> slice = new ArrayList<ChannelBuffer>(components.length);
68  
69          // The first component
70          ChannelBuffer first = components[componentId].duplicate();
71          first.readerIndex(index - indices[componentId]);
72  
73          ChannelBuffer buf = first;
74          int bytesToSlice = length;
75          do {
76              int readableBytes = buf.readableBytes();
77              if (bytesToSlice <= readableBytes) {
78                  // Last component
79                  buf.writerIndex(buf.readerIndex() + bytesToSlice);
80                  slice.add(buf);
81                  break;
82              } else {
83                  // Not the last component
84                  slice.add(buf);
85                  bytesToSlice -= readableBytes;
86                  componentId ++;
87  
88                  // Fetch the next component.
89                  buf = components[componentId].duplicate();
90              }
91          } while (bytesToSlice > 0);
92  
93          // Slice all components because only readable bytes are interesting.
94          for (int i = 0; i < slice.size(); i ++) {
95              slice.set(i, slice.get(i).slice());
96          }
97  
98          return slice;
99      }
100 
101     /**
102      * Setup this ChannelBuffer from the list
103      */
104     private void setComponents(List<ChannelBuffer> newComponents) {
105         assert !newComponents.isEmpty();
106 
107         // Clear the cache.
108         lastAccessedComponentId = 0;
109 
110         // Build the component array.
111         components = new ChannelBuffer[newComponents.size()];
112         for (int i = 0; i < components.length; i ++) {
113             ChannelBuffer c = newComponents.get(i);
114             if (c.order() != order()) {
115                 throw new IllegalArgumentException(
116                         "All buffers must have the same endianness.");
117             }
118 
119             assert c.readerIndex() == 0;
120             assert c.writerIndex() == c.capacity();
121 
122             components[i] = c;
123         }
124 
125         // Build the component lookup table.
126         indices = new int[components.length + 1];
127         indices[0] = 0;
128         for (int i = 1; i <= components.length; i ++) {
129             indices[i] = indices[i - 1] + components[i - 1].capacity();
130         }
131 
132         // Reset the indexes.
133         setIndex(0, capacity());
134     }
135 
136     private CompositeChannelBuffer(CompositeChannelBuffer buffer) {
137         order = buffer.order;
138         components = buffer.components.clone();
139         indices = buffer.indices.clone();
140         setIndex(buffer.readerIndex(), buffer.writerIndex());
141     }
142 
143     public ChannelBufferFactory factory() {
144         return HeapChannelBufferFactory.getInstance(order());
145     }
146 
147     public ByteOrder order() {
148         return order;
149     }
150 
151     public boolean isDirect() {
152         return false;
153     }
154 
155     public boolean hasArray() {
156         return false;
157     }
158 
159     public byte[] array() {
160         throw new UnsupportedOperationException();
161     }
162 
163     public int arrayOffset() {
164         throw new UnsupportedOperationException();
165     }
166 
167     public int capacity() {
168         return indices[components.length];
169     }
170 
171     public byte getByte(int index) {
172         int componentId = componentId(index);
173         return components[componentId].getByte(index - indices[componentId]);
174     }
175 
176     public short getShort(int index) {
177         int componentId = componentId(index);
178         if (index + 2 <= indices[componentId + 1]) {
179             return components[componentId].getShort(index - indices[componentId]);
180         } else if (order() == ByteOrder.BIG_ENDIAN) {
181             return (short) ((getByte(index) & 0xff) << 8 | getByte(index + 1) & 0xff);
182         } else {
183             return (short) (getByte(index) & 0xff | (getByte(index + 1) & 0xff) << 8);
184         }
185     }
186 
187     public int getUnsignedMedium(int index) {
188         int componentId = componentId(index);
189         if (index + 3 <= indices[componentId + 1]) {
190             return components[componentId].getUnsignedMedium(index - indices[componentId]);
191         } else if (order() == ByteOrder.BIG_ENDIAN) {
192             return (getShort(index) & 0xffff) << 8 | getByte(index + 2) & 0xff;
193         } else {
194             return getShort(index) & 0xFFFF | (getByte(index + 2) & 0xFF) << 16;
195         }
196     }
197 
198     public int getInt(int index) {
199         int componentId = componentId(index);
200         if (index + 4 <= indices[componentId + 1]) {
201             return components[componentId].getInt(index - indices[componentId]);
202         } else if (order() == ByteOrder.BIG_ENDIAN) {
203             return (getShort(index) & 0xffff) << 16 | getShort(index + 2) & 0xffff;
204         } else {
205             return getShort(index) & 0xFFFF | (getShort(index + 2) & 0xFFFF) << 16;
206         }
207     }
208 
209     public long getLong(int index) {
210         int componentId = componentId(index);
211         if (index + 8 <= indices[componentId + 1]) {
212             return components[componentId].getLong(index - indices[componentId]);
213         } else if (order() == ByteOrder.BIG_ENDIAN) {
214             return (getInt(index) & 0xffffffffL) << 32 | getInt(index + 4) & 0xffffffffL;
215         } else {
216             return getInt(index) & 0xFFFFFFFFL | (getInt(index + 4) & 0xFFFFFFFFL) << 32;
217         }
218     }
219 
220     public void getBytes(int index, byte[] dst, int dstIndex, int length) {
221         int componentId = componentId(index);
222         if (index > capacity() - length || dstIndex > dst.length - length) {
223             throw new IndexOutOfBoundsException();
224         }
225 
226         int i = componentId;
227         while (length > 0) {
228             ChannelBuffer s = components[i];
229             int adjustment = indices[i];
230             int localLength = Math.min(length, s.capacity() - (index - adjustment));
231             s.getBytes(index - adjustment, dst, dstIndex, localLength);
232             index += localLength;
233             dstIndex += localLength;
234             length -= localLength;
235             i ++;
236         }
237     }
238 
239     public void getBytes(int index, ByteBuffer dst) {
240         int componentId = componentId(index);
241         int limit = dst.limit();
242         int length = dst.remaining();
243         if (index > capacity() - length) {
244             throw new IndexOutOfBoundsException();
245         }
246 
247         int i = componentId;
248         try {
249             while (length > 0) {
250                 ChannelBuffer s = components[i];
251                 int adjustment = indices[i];
252                 int localLength = Math.min(length, s.capacity() - (index - adjustment));
253                 dst.limit(dst.position() + localLength);
254                 s.getBytes(index - adjustment, dst);
255                 index += localLength;
256                 length -= localLength;
257                 i ++;
258             }
259         } finally {
260             dst.limit(limit);
261         }
262     }
263 
264     public void getBytes(int index, ChannelBuffer dst, int dstIndex, int length) {
265         int componentId = componentId(index);
266         if (index > capacity() - length || dstIndex > dst.capacity() - length) {
267             throw new IndexOutOfBoundsException();
268         }
269 
270         int i = componentId;
271         while (length > 0) {
272             ChannelBuffer s = components[i];
273             int adjustment = indices[i];
274             int localLength = Math.min(length, s.capacity() - (index - adjustment));
275             s.getBytes(index - adjustment, dst, dstIndex, localLength);
276             index += localLength;
277             dstIndex += localLength;
278             length -= localLength;
279             i ++;
280         }
281     }
282 
283     public int getBytes(int index, GatheringByteChannel out, int length)
284             throws IOException {
285         // XXX Gathering write is not supported because of a known issue.
286         //     See http://bugs.sun.com/view_bug.do?bug_id=6210541
287         //     This issue appeared in 2004 and is still unresolved!?
288         return out.write(toByteBuffer(index, length));
289     }
290 
291     public void getBytes(int index, OutputStream out, int length)
292             throws IOException {
293         int componentId = componentId(index);
294         if (index > capacity() - length) {
295             throw new IndexOutOfBoundsException();
296         }
297 
298         int i = componentId;
299         while (length > 0) {
300             ChannelBuffer s = components[i];
301             int adjustment = indices[i];
302             int localLength = Math.min(length, s.capacity() - (index - adjustment));
303             s.getBytes(index - adjustment, out, localLength);
304             index += localLength;
305             length -= localLength;
306             i ++;
307         }
308     }
309 
310     public void setByte(int index, int value) {
311         int componentId = componentId(index);
312         components[componentId].setByte(index - indices[componentId], value);
313     }
314 
315     public void setShort(int index, int value) {
316         int componentId = componentId(index);
317         if (index + 2 <= indices[componentId + 1]) {
318             components[componentId].setShort(index - indices[componentId], value);
319         } else if (order() == ByteOrder.BIG_ENDIAN) {
320             setByte(index, (byte) (value >>> 8));
321             setByte(index + 1, (byte) value);
322         } else {
323             setByte(index    , (byte) value);
324             setByte(index + 1, (byte) (value >>> 8));
325         }
326     }
327 
328     public void setMedium(int index, int value) {
329         int componentId = componentId(index);
330         if (index + 3 <= indices[componentId + 1]) {
331             components[componentId].setMedium(index - indices[componentId], value);
332         } else if (order() == ByteOrder.BIG_ENDIAN) {
333             setShort(index, (short) (value >> 8));
334             setByte(index + 2, (byte) value);
335         } else {
336             setShort(index    , (short) value);
337             setByte (index + 2, (byte) (value >>> 16));
338         }
339     }
340 
341     public void setInt(int index, int value) {
342         int componentId = componentId(index);
343         if (index + 4 <= indices[componentId + 1]) {
344             components[componentId].setInt(index - indices[componentId], value);
345         } else if (order() == ByteOrder.BIG_ENDIAN) {
346             setShort(index, (short) (value >>> 16));
347             setShort(index + 2, (short) value);
348         } else {
349             setShort(index    , (short) value);
350             setShort(index + 2, (short) (value >>> 16));
351         }
352     }
353 
354     public void setLong(int index, long value) {
355         int componentId = componentId(index);
356         if (index + 8 <= indices[componentId + 1]) {
357             components[componentId].setLong(index - indices[componentId], value);
358         } else if (order() == ByteOrder.BIG_ENDIAN) {
359             setInt(index, (int) (value >>> 32));
360             setInt(index + 4, (int) value);
361         } else {
362             setInt(index    , (int) value);
363             setInt(index + 4, (int) (value >>> 32));
364         }
365     }
366 
367     public void setBytes(int index, byte[] src, int srcIndex, int length) {
368         int componentId = componentId(index);
369         if (index > capacity() - length || srcIndex > src.length - length) {
370             throw new IndexOutOfBoundsException();
371         }
372 
373         int i = componentId;
374         while (length > 0) {
375             ChannelBuffer s = components[i];
376             int adjustment = indices[i];
377             int localLength = Math.min(length, s.capacity() - (index - adjustment));
378             s.setBytes(index - adjustment, src, srcIndex, localLength);
379             index += localLength;
380             srcIndex += localLength;
381             length -= localLength;
382             i ++;
383         }
384     }
385 
386     public void setBytes(int index, ByteBuffer src) {
387         int componentId = componentId(index);
388         int limit = src.limit();
389         int length = src.remaining();
390         if (index > capacity() - length) {
391             throw new IndexOutOfBoundsException();
392         }
393 
394         int i = componentId;
395         try {
396             while (length > 0) {
397                 ChannelBuffer s = components[i];
398                 int adjustment = indices[i];
399                 int localLength = Math.min(length, s.capacity() - (index - adjustment));
400                 src.limit(src.position() + localLength);
401                 s.setBytes(index - adjustment, src);
402                 index += localLength;
403                 length -= localLength;
404                 i ++;
405             }
406         } finally {
407             src.limit(limit);
408         }
409     }
410 
411     public void setBytes(int index, ChannelBuffer src, int srcIndex, int length) {
412         int componentId = componentId(index);
413         if (index > capacity() - length || srcIndex > src.capacity() - length) {
414             throw new IndexOutOfBoundsException();
415         }
416 
417         int i = componentId;
418         while (length > 0) {
419             ChannelBuffer s = components[i];
420             int adjustment = indices[i];
421             int localLength = Math.min(length, s.capacity() - (index - adjustment));
422             s.setBytes(index - adjustment, src, srcIndex, localLength);
423             index += localLength;
424             srcIndex += localLength;
425             length -= localLength;
426             i ++;
427         }
428     }
429 
430     public int setBytes(int index, InputStream in, int length)
431             throws IOException {
432         int componentId = componentId(index);
433         if (index > capacity() - length) {
434             throw new IndexOutOfBoundsException();
435         }
436 
437         int i = componentId;
438         int readBytes = 0;
439 
440         do {
441             ChannelBuffer s = components[i];
442             int adjustment = indices[i];
443             int localLength = Math.min(length, s.capacity() - (index - adjustment));
444             int localReadBytes = s.setBytes(index - adjustment, in, localLength);
445             if (localReadBytes < 0) {
446                 if (readBytes == 0) {
447                     return -1;
448                 } else {
449                     break;
450                 }
451             }
452 
453             if (localReadBytes == localLength) {
454                 index += localLength;
455                 length -= localLength;
456                 readBytes += localLength;
457                 i ++;
458             } else {
459                 index += localReadBytes;
460                 length -= localReadBytes;
461                 readBytes += localReadBytes;
462             }
463         } while (length > 0);
464 
465         return readBytes;
466     }
467 
468     public int setBytes(int index, ScatteringByteChannel in, int length)
469             throws IOException {
470         int componentId = componentId(index);
471         if (index > capacity() - length) {
472             throw new IndexOutOfBoundsException();
473         }
474 
475         int i = componentId;
476         int readBytes = 0;
477         do {
478             ChannelBuffer s = components[i];
479             int adjustment = indices[i];
480             int localLength = Math.min(length, s.capacity() - (index - adjustment));
481             int localReadBytes = s.setBytes(index - adjustment, in, localLength);
482 
483             if (localReadBytes == localLength) {
484                 index += localLength;
485                 length -= localLength;
486                 readBytes += localLength;
487                 i ++;
488             } else {
489                 index += localReadBytes;
490                 length -= localReadBytes;
491                 readBytes += localReadBytes;
492             }
493         } while (length > 0);
494 
495         return readBytes;
496     }
497 
498     public ChannelBuffer duplicate() {
499         ChannelBuffer duplicate = new CompositeChannelBuffer(this);
500         duplicate.setIndex(readerIndex(), writerIndex());
501         return duplicate;
502     }
503 
504     public ChannelBuffer copy(int index, int length) {
505         int componentId = componentId(index);
506         if (index > capacity() - length) {
507             throw new IndexOutOfBoundsException();
508         }
509 
510         ChannelBuffer dst = factory().getBuffer(order(), length);
511         copyTo(index, length, componentId, dst);
512         return dst;
513     }
514 
515     private void copyTo(int index, int length, int componentId, ChannelBuffer dst) {
516         int dstIndex = 0;
517         int i = componentId;
518 
519         while (length > 0) {
520             ChannelBuffer s = components[i];
521             int adjustment = indices[i];
522             int localLength = Math.min(length, s.capacity() - (index - adjustment));
523             s.getBytes(index - adjustment, dst, dstIndex, localLength);
524             index += localLength;
525             dstIndex += localLength;
526             length -= localLength;
527             i ++;
528         }
529 
530         dst.writerIndex(dst.capacity());
531     }
532 
533     public ChannelBuffer slice(int index, int length) {
534         if (index == 0) {
535             if (length == 0) {
536                 return ChannelBuffers.EMPTY_BUFFER;
537             }
538         } else if (index < 0 || index > capacity() - length) {
539             throw new IndexOutOfBoundsException();
540         } else if (length == 0) {
541             return ChannelBuffers.EMPTY_BUFFER;
542         }
543 
544         List<ChannelBuffer> components = decompose(index, length);
545         switch (components.size()) {
546         case 0:
547             return ChannelBuffers.EMPTY_BUFFER;
548         case 1:
549             return components.get(0);
550         default:
551             return new CompositeChannelBuffer(order(), components);
552         }
553     }
554 
555     public ByteBuffer toByteBuffer(int index, int length) {
556         if (components.length == 1) {
557             return components[0].toByteBuffer(index, length);
558         }
559 
560         ByteBuffer[] buffers = toByteBuffers(index, length);
561         ByteBuffer merged = ByteBuffer.allocate(length).order(order());
562         for (ByteBuffer b: buffers) {
563             merged.put(b);
564         }
565         merged.flip();
566         return merged;
567     }
568 
569     @Override
570     public ByteBuffer[] toByteBuffers(int index, int length) {
571         int componentId = componentId(index);
572         if (index + length > capacity()) {
573             throw new IndexOutOfBoundsException();
574         }
575 
576         List<ByteBuffer> buffers = new ArrayList<ByteBuffer>(components.length);
577 
578         int i = componentId;
579         while (length > 0) {
580             ChannelBuffer s = components[i];
581             int adjustment = indices[i];
582             int localLength = Math.min(length, s.capacity() - (index - adjustment));
583             buffers.add(s.toByteBuffer(index - adjustment, localLength));
584             index += localLength;
585             length -= localLength;
586             i ++;
587         }
588 
589         return buffers.toArray(new ByteBuffer[buffers.size()]);
590     }
591 
592     private int componentId(int index) {
593         int lastComponentId = lastAccessedComponentId;
594         if (index >= indices[lastComponentId]) {
595             if (index < indices[lastComponentId + 1]) {
596                 return lastComponentId;
597             }
598 
599             // Search right
600             for (int i = lastComponentId + 1; i < components.length; i ++) {
601                 if (index < indices[i + 1]) {
602                     lastAccessedComponentId = i;
603                     return i;
604                 }
605             }
606         } else {
607             // Search left
608             for (int i = lastComponentId - 1; i >= 0; i --) {
609                 if (index >= indices[i]) {
610                     lastAccessedComponentId = i;
611                     return i;
612                 }
613             }
614         }
615 
616         throw new IndexOutOfBoundsException();
617     }
618 
619     @Override
620     public void discardReadBytes() {
621         // Only the bytes between readerIndex and writerIndex will be kept.
622         // New readerIndex and writerIndex will become 0 and
623         // (previous writerIndex - previous readerIndex) respectively.
624 
625         final int localReaderIndex = this.readerIndex();
626         if (localReaderIndex == 0) {
627             return;
628         }
629         int localWriterIndex = this.writerIndex();
630 
631         final int bytesToMove = capacity() - localReaderIndex;
632         List<ChannelBuffer> list = decompose(localReaderIndex, bytesToMove);
633 
634         // Add a new buffer so that the capacity of this composite buffer does
635         // not decrease due to the discarded components.
636         // XXX Might create too many components if discarded by small amount.
637         final ChannelBuffer padding = ChannelBuffers.buffer(order(), localReaderIndex);
638         padding.writerIndex(localReaderIndex);
639         list.add(padding);
640 
641         // Reset the index markers to get the index marker values.
642         int localMarkedReaderIndex = localReaderIndex;
643         try {
644             resetReaderIndex();
645             localMarkedReaderIndex = this.readerIndex();
646         } catch (IndexOutOfBoundsException e) {
647             // ignore
648         }
649         int localMarkedWriterIndex = localWriterIndex;
650         try {
651             resetWriterIndex();
652             localMarkedWriterIndex = this.writerIndex();
653         } catch (IndexOutOfBoundsException e) {
654             // ignore
655         }
656 
657         setComponents(list);
658 
659         // reset marked Indexes
660         localMarkedReaderIndex = Math.max(localMarkedReaderIndex - localReaderIndex, 0);
661         localMarkedWriterIndex = Math.max(localMarkedWriterIndex - localReaderIndex, 0);
662         setIndex(localMarkedReaderIndex, localMarkedWriterIndex);
663         markReaderIndex();
664         markWriterIndex();
665         // reset real indexes
666         localWriterIndex = Math.max(localWriterIndex - localReaderIndex, 0);
667         setIndex(0, localWriterIndex);
668     }
669 
670     @Override
671     public String toString() {
672         String result = super.toString();
673         result = result.substring(0, result.length() - 1);
674         return result + ", components=" + components.length + ")";
675     }
676 }