001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.core.io.buffer;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.io.OutputStream;
022import java.nio.Buffer;
023import java.nio.ByteBuffer;
024import java.nio.charset.Charset;
025import java.util.Arrays;
026import java.util.function.IntPredicate;
027
028import org.springframework.lang.Nullable;
029import org.springframework.util.Assert;
030import org.springframework.util.ObjectUtils;
031
032/**
033 * Default implementation of the {@link DataBuffer} interface that uses a
034 * {@link ByteBuffer} internally. with separate read and write positions.
035 * Constructed using the {@link DefaultDataBufferFactory}.
036 *
037 * <p>Inspired by Netty's {@code ByteBuf}. Introduced so that non-Netty runtimes
038 * (i.e. Servlet) do not require Netty on the classpath.
039 *
040 * @author Arjen Poutsma
041 * @author Juergen Hoeller
042 * @author Brian Clozel
043 * @since 5.0
044 * @see DefaultDataBufferFactory
045 */
046public class DefaultDataBuffer implements DataBuffer {
047
048        private static final int MAX_CAPACITY = Integer.MAX_VALUE;
049
050        private static final int CAPACITY_THRESHOLD = 1024 * 1024 * 4;
051
052
053        private final DefaultDataBufferFactory dataBufferFactory;
054
055        private ByteBuffer byteBuffer;
056
057        private int capacity;
058
059        private int readPosition;
060
061        private int writePosition;
062
063
064        private DefaultDataBuffer(DefaultDataBufferFactory dataBufferFactory, ByteBuffer byteBuffer) {
065                Assert.notNull(dataBufferFactory, "DefaultDataBufferFactory must not be null");
066                Assert.notNull(byteBuffer, "ByteBuffer must not be null");
067                this.dataBufferFactory = dataBufferFactory;
068                ByteBuffer slice = byteBuffer.slice();
069                this.byteBuffer = slice;
070                this.capacity = slice.remaining();
071        }
072
073        static DefaultDataBuffer fromFilledByteBuffer(DefaultDataBufferFactory dataBufferFactory, ByteBuffer byteBuffer) {
074                DefaultDataBuffer dataBuffer = new DefaultDataBuffer(dataBufferFactory, byteBuffer);
075                dataBuffer.writePosition(byteBuffer.remaining());
076                return dataBuffer;
077        }
078
079        static DefaultDataBuffer fromEmptyByteBuffer(DefaultDataBufferFactory dataBufferFactory, ByteBuffer byteBuffer) {
080                return new DefaultDataBuffer(dataBufferFactory, byteBuffer);
081        }
082
083
084        /**
085         * Directly exposes the native {@code ByteBuffer} that this buffer is based
086         * on also updating the {@code ByteBuffer's} position and limit to match
087         * the current {@link #readPosition()} and {@link #readableByteCount()}.
088         * @return the wrapped byte buffer
089         */
090        public ByteBuffer getNativeBuffer() {
091                this.byteBuffer.position(this.readPosition);
092                this.byteBuffer.limit(readableByteCount());
093                return this.byteBuffer;
094        }
095
096        private void setNativeBuffer(ByteBuffer byteBuffer) {
097                this.byteBuffer = byteBuffer;
098                this.capacity = byteBuffer.remaining();
099        }
100
101
102        @Override
103        public DefaultDataBufferFactory factory() {
104                return this.dataBufferFactory;
105        }
106
107        @Override
108        public int indexOf(IntPredicate predicate, int fromIndex) {
109                Assert.notNull(predicate, "IntPredicate must not be null");
110                if (fromIndex < 0) {
111                        fromIndex = 0;
112                }
113                else if (fromIndex >= this.writePosition) {
114                        return -1;
115                }
116                for (int i = fromIndex; i < this.writePosition; i++) {
117                        byte b = this.byteBuffer.get(i);
118                        if (predicate.test(b)) {
119                                return i;
120                        }
121                }
122                return -1;
123        }
124
125        @Override
126        public int lastIndexOf(IntPredicate predicate, int fromIndex) {
127                Assert.notNull(predicate, "IntPredicate must not be null");
128                int i = Math.min(fromIndex, this.writePosition - 1);
129                for (; i >= 0; i--) {
130                        byte b = this.byteBuffer.get(i);
131                        if (predicate.test(b)) {
132                                return i;
133                        }
134                }
135                return -1;
136        }
137
138        @Override
139        public int readableByteCount() {
140                return this.writePosition - this.readPosition;
141        }
142
143        @Override
144        public int writableByteCount() {
145                return this.capacity - this.writePosition;
146        }
147
148        @Override
149        public int readPosition() {
150                return this.readPosition;
151        }
152
153        @Override
154        public DefaultDataBuffer readPosition(int readPosition) {
155                assertIndex(readPosition >= 0, "'readPosition' %d must be >= 0", readPosition);
156                assertIndex(readPosition <= this.writePosition, "'readPosition' %d must be <= %d",
157                                readPosition, this.writePosition);
158                this.readPosition = readPosition;
159                return this;
160        }
161
162        @Override
163        public int writePosition() {
164                return this.writePosition;
165        }
166
167        @Override
168        public DefaultDataBuffer writePosition(int writePosition) {
169                assertIndex(writePosition >= this.readPosition, "'writePosition' %d must be >= %d",
170                                writePosition, this.readPosition);
171                assertIndex(writePosition <= this.capacity, "'writePosition' %d must be <= %d",
172                                writePosition, this.capacity);
173                this.writePosition = writePosition;
174                return this;
175        }
176
177        @Override
178        public int capacity() {
179                return this.capacity;
180        }
181
182        @Override
183        public DefaultDataBuffer capacity(int newCapacity) {
184                if (newCapacity <= 0) {
185                        throw new IllegalArgumentException(String.format("'newCapacity' %d must be higher than 0", newCapacity));
186                }
187                int readPosition = readPosition();
188                int writePosition = writePosition();
189                int oldCapacity = capacity();
190
191                if (newCapacity > oldCapacity) {
192                        ByteBuffer oldBuffer = this.byteBuffer;
193                        ByteBuffer newBuffer = allocate(newCapacity, oldBuffer.isDirect());
194                        oldBuffer.position(0).limit(oldBuffer.capacity());
195                        newBuffer.position(0).limit(oldBuffer.capacity());
196                        newBuffer.put(oldBuffer);
197                        newBuffer.clear();
198                        setNativeBuffer(newBuffer);
199                }
200                else if (newCapacity < oldCapacity) {
201                        ByteBuffer oldBuffer = this.byteBuffer;
202                        ByteBuffer newBuffer = allocate(newCapacity, oldBuffer.isDirect());
203                        if (readPosition < newCapacity) {
204                                if (writePosition > newCapacity) {
205                                        writePosition = newCapacity;
206                                        writePosition(writePosition);
207                                }
208                                oldBuffer.position(readPosition).limit(writePosition);
209                                newBuffer.position(readPosition).limit(writePosition);
210                                newBuffer.put(oldBuffer);
211                                newBuffer.clear();
212                        }
213                        else {
214                                readPosition(newCapacity);
215                                writePosition(newCapacity);
216                        }
217                        setNativeBuffer(newBuffer);
218                }
219                return this;
220        }
221
222        @Override
223        public DataBuffer ensureCapacity(int length) {
224                if (length > writableByteCount()) {
225                        int newCapacity = calculateCapacity(this.writePosition + length);
226                        capacity(newCapacity);
227                }
228                return this;
229        }
230
231        private static ByteBuffer allocate(int capacity, boolean direct) {
232                return (direct ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity));
233        }
234
235        @Override
236        public byte getByte(int index) {
237                assertIndex(index >= 0, "index %d must be >= 0", index);
238                assertIndex(index <= this.writePosition - 1, "index %d must be <= %d", index, this.writePosition - 1);
239                return this.byteBuffer.get(index);
240        }
241
242        @Override
243        public byte read() {
244                assertIndex(this.readPosition <= this.writePosition - 1, "readPosition %d must be <= %d",
245                                this.readPosition, this.writePosition - 1);
246                int pos = this.readPosition;
247                byte b = this.byteBuffer.get(pos);
248                this.readPosition = pos + 1;
249                return b;
250        }
251
252        @Override
253        public DefaultDataBuffer read(byte[] destination) {
254                Assert.notNull(destination, "Byte array must not be null");
255                read(destination, 0, destination.length);
256                return this;
257        }
258
259        @Override
260        public DefaultDataBuffer read(byte[] destination, int offset, int length) {
261                Assert.notNull(destination, "Byte array must not be null");
262                assertIndex(this.readPosition <= this.writePosition - length,
263                                "readPosition %d and length %d should be smaller than writePosition %d",
264                                this.readPosition, length, this.writePosition);
265
266                ByteBuffer tmp = this.byteBuffer.duplicate();
267                int limit = this.readPosition + length;
268                tmp.clear().position(this.readPosition).limit(limit);
269                tmp.get(destination, offset, length);
270
271                this.readPosition += length;
272                return this;
273        }
274
275        @Override
276        public DefaultDataBuffer write(byte b) {
277                ensureCapacity(1);
278                int pos = this.writePosition;
279                this.byteBuffer.put(pos, b);
280                this.writePosition = pos + 1;
281                return this;
282        }
283
284        @Override
285        public DefaultDataBuffer write(byte[] source) {
286                Assert.notNull(source, "Byte array must not be null");
287                write(source, 0, source.length);
288                return this;
289        }
290
291        @Override
292        public DefaultDataBuffer write(byte[] source, int offset, int length) {
293                Assert.notNull(source, "Byte array must not be null");
294                ensureCapacity(length);
295
296                ByteBuffer tmp = this.byteBuffer.duplicate();
297                int limit = this.writePosition + length;
298                tmp.clear().position(this.writePosition).limit(limit);
299                tmp.put(source, offset, length);
300
301                this.writePosition += length;
302                return this;
303        }
304
305        @Override
306        public DefaultDataBuffer write(DataBuffer... buffers) {
307                if (!ObjectUtils.isEmpty(buffers)) {
308                        write(Arrays.stream(buffers).map(DataBuffer::asByteBuffer).toArray(ByteBuffer[]::new));
309                }
310                return this;
311        }
312
313        @Override
314        public DefaultDataBuffer write(ByteBuffer... buffers) {
315                if (!ObjectUtils.isEmpty(buffers)) {
316                        int capacity = Arrays.stream(buffers).mapToInt(ByteBuffer::remaining).sum();
317                        ensureCapacity(capacity);
318                        Arrays.stream(buffers).forEach(this::write);
319                }
320                return this;
321        }
322
323        private void write(ByteBuffer source) {
324                int length = source.remaining();
325                ByteBuffer tmp = this.byteBuffer.duplicate();
326                int limit = this.writePosition + source.remaining();
327                tmp.clear().position(this.writePosition).limit(limit);
328                tmp.put(source);
329                this.writePosition += length;
330        }
331
332        @Override
333        public DefaultDataBuffer slice(int index, int length) {
334                checkIndex(index, length);
335                int oldPosition = this.byteBuffer.position();
336                // Explicit access via Buffer base type for compatibility
337                // with covariant return type on JDK 9's ByteBuffer...
338                Buffer buffer = this.byteBuffer;
339                try {
340                        buffer.position(index);
341                        ByteBuffer slice = this.byteBuffer.slice();
342                        // Explicit cast for compatibility with covariant return type on JDK 9's ByteBuffer
343                        slice.limit(length);
344                        return new SlicedDefaultDataBuffer(slice, this.dataBufferFactory, length);
345                }
346                finally {
347                        buffer.position(oldPosition);
348                }
349        }
350
351        @Override
352        public ByteBuffer asByteBuffer() {
353                return asByteBuffer(this.readPosition, readableByteCount());
354        }
355
356        @Override
357        public ByteBuffer asByteBuffer(int index, int length) {
358                checkIndex(index, length);
359
360                ByteBuffer duplicate = this.byteBuffer.duplicate();
361                // Explicit access via Buffer base type for compatibility
362                // with covariant return type on JDK 9's ByteBuffer...
363                Buffer buffer = duplicate;
364                buffer.position(index);
365                buffer.limit(index + length);
366                return duplicate.slice();
367        }
368
369        @Override
370        public InputStream asInputStream() {
371                return new DefaultDataBufferInputStream();
372        }
373
374        @Override
375        public InputStream asInputStream(boolean releaseOnClose) {
376                return new DefaultDataBufferInputStream();
377        }
378
379        @Override
380        public OutputStream asOutputStream() {
381                return new DefaultDataBufferOutputStream();
382        }
383
384
385        @Override
386        public String toString(int index, int length, Charset charset) {
387                checkIndex(index, length);
388                Assert.notNull(charset, "Charset must not be null");
389
390                byte[] bytes;
391                int offset;
392
393                if (this.byteBuffer.hasArray()) {
394                        bytes = this.byteBuffer.array();
395                        offset = this.byteBuffer.arrayOffset() + index;
396                }
397                else {
398                        bytes = new byte[length];
399                        offset = 0;
400                        ByteBuffer duplicate = this.byteBuffer.duplicate();
401                        duplicate.clear().position(index).limit(index + length);
402                        duplicate.get(bytes, 0, length);
403                }
404                return new String(bytes, offset, length, charset);
405        }
406
407        /**
408         * Calculate the capacity of the buffer.
409         * @see io.netty.buffer.AbstractByteBufAllocator#calculateNewCapacity(int, int)
410         */
411        private int calculateCapacity(int neededCapacity) {
412                Assert.isTrue(neededCapacity >= 0, "'neededCapacity' must >= 0");
413
414                if (neededCapacity == CAPACITY_THRESHOLD) {
415                        return CAPACITY_THRESHOLD;
416                }
417                else if (neededCapacity > CAPACITY_THRESHOLD) {
418                        int newCapacity = neededCapacity / CAPACITY_THRESHOLD * CAPACITY_THRESHOLD;
419                        if (newCapacity > MAX_CAPACITY - CAPACITY_THRESHOLD) {
420                                newCapacity = MAX_CAPACITY;
421                        }
422                        else {
423                                newCapacity += CAPACITY_THRESHOLD;
424                        }
425                        return newCapacity;
426                }
427                else {
428                        int newCapacity = 64;
429                        while (newCapacity < neededCapacity) {
430                                newCapacity <<= 1;
431                        }
432                        return Math.min(newCapacity, MAX_CAPACITY);
433                }
434        }
435
436
437        @Override
438        public boolean equals(@Nullable Object other) {
439                if (this == other) {
440                        return true;
441                }
442                if (!(other instanceof DefaultDataBuffer)) {
443                        return false;
444                }
445                DefaultDataBuffer otherBuffer = (DefaultDataBuffer) other;
446                return (this.readPosition == otherBuffer.readPosition &&
447                                this.writePosition == otherBuffer.writePosition &&
448                                this.byteBuffer.equals(otherBuffer.byteBuffer));
449        }
450
451        @Override
452        public int hashCode() {
453                return this.byteBuffer.hashCode();
454        }
455
456        @Override
457        public String toString() {
458                return String.format("DefaultDataBuffer (r: %d, w: %d, c: %d)",
459                                this.readPosition, this.writePosition, this.capacity);
460        }
461
462
463        private void checkIndex(int index, int length) {
464                assertIndex(index >= 0, "index %d must be >= 0", index);
465                assertIndex(length >= 0, "length %d must be >= 0", index);
466                assertIndex(index <= this.capacity, "index %d must be <= %d", index, this.capacity);
467                assertIndex(length <= this.capacity, "length %d must be <= %d", index, this.capacity);
468        }
469
470        private void assertIndex(boolean expression, String format, Object... args) {
471                if (!expression) {
472                        String message = String.format(format, args);
473                        throw new IndexOutOfBoundsException(message);
474                }
475        }
476
477
478        private class DefaultDataBufferInputStream extends InputStream {
479
480                @Override
481                public int available() {
482                        return readableByteCount();
483                }
484
485                @Override
486                public int read() {
487                        return available() > 0 ? DefaultDataBuffer.this.read() & 0xFF : -1;
488                }
489
490                @Override
491                public int read(byte[] bytes, int off, int len) throws IOException {
492                        int available = available();
493                        if (available > 0) {
494                                len = Math.min(len, available);
495                                DefaultDataBuffer.this.read(bytes, off, len);
496                                return len;
497                        }
498                        else {
499                                return -1;
500                        }
501                }
502        }
503
504
505        private class DefaultDataBufferOutputStream extends OutputStream {
506
507                @Override
508                public void write(int b) throws IOException {
509                        DefaultDataBuffer.this.write((byte) b);
510                }
511
512                @Override
513                public void write(byte[] bytes, int off, int len) throws IOException {
514                        DefaultDataBuffer.this.write(bytes, off, len);
515                }
516        }
517
518
519        private static class SlicedDefaultDataBuffer extends DefaultDataBuffer {
520
521                SlicedDefaultDataBuffer(ByteBuffer byteBuffer, DefaultDataBufferFactory dataBufferFactory, int length) {
522                        super(dataBufferFactory, byteBuffer);
523                        writePosition(length);
524                }
525
526                @Override
527                public DefaultDataBuffer capacity(int newCapacity) {
528                        throw new UnsupportedOperationException("Changing the capacity of a sliced buffer is not supported");
529                }
530        }
531
532}