001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.core.io.buffer; 018 019import java.io.IOException; 020import java.io.InputStream; 021import java.io.OutputStream; 022import java.nio.Buffer; 023import java.nio.ByteBuffer; 024import java.nio.charset.Charset; 025import java.util.Arrays; 026import java.util.function.IntPredicate; 027 028import org.springframework.lang.Nullable; 029import org.springframework.util.Assert; 030import org.springframework.util.ObjectUtils; 031 032/** 033 * Default implementation of the {@link DataBuffer} interface that uses a 034 * {@link ByteBuffer} internally. with separate read and write positions. 035 * Constructed using the {@link DefaultDataBufferFactory}. 036 * 037 * <p>Inspired by Netty's {@code ByteBuf}. Introduced so that non-Netty runtimes 038 * (i.e. Servlet) do not require Netty on the classpath. 039 * 040 * @author Arjen Poutsma 041 * @author Juergen Hoeller 042 * @author Brian Clozel 043 * @since 5.0 044 * @see DefaultDataBufferFactory 045 */ 046public class DefaultDataBuffer implements DataBuffer { 047 048 private static final int MAX_CAPACITY = Integer.MAX_VALUE; 049 050 private static final int CAPACITY_THRESHOLD = 1024 * 1024 * 4; 051 052 053 private final DefaultDataBufferFactory dataBufferFactory; 054 055 private ByteBuffer byteBuffer; 056 057 private int capacity; 058 059 private int readPosition; 060 061 private int writePosition; 062 063 064 private DefaultDataBuffer(DefaultDataBufferFactory dataBufferFactory, ByteBuffer byteBuffer) { 065 Assert.notNull(dataBufferFactory, "DefaultDataBufferFactory must not be null"); 066 Assert.notNull(byteBuffer, "ByteBuffer must not be null"); 067 this.dataBufferFactory = dataBufferFactory; 068 ByteBuffer slice = byteBuffer.slice(); 069 this.byteBuffer = slice; 070 this.capacity = slice.remaining(); 071 } 072 073 static DefaultDataBuffer fromFilledByteBuffer(DefaultDataBufferFactory dataBufferFactory, ByteBuffer byteBuffer) { 074 DefaultDataBuffer dataBuffer = new DefaultDataBuffer(dataBufferFactory, byteBuffer); 075 dataBuffer.writePosition(byteBuffer.remaining()); 076 return dataBuffer; 077 } 078 079 static DefaultDataBuffer fromEmptyByteBuffer(DefaultDataBufferFactory dataBufferFactory, ByteBuffer byteBuffer) { 080 return new DefaultDataBuffer(dataBufferFactory, byteBuffer); 081 } 082 083 084 /** 085 * Directly exposes the native {@code ByteBuffer} that this buffer is based 086 * on also updating the {@code ByteBuffer's} position and limit to match 087 * the current {@link #readPosition()} and {@link #readableByteCount()}. 088 * @return the wrapped byte buffer 089 */ 090 public ByteBuffer getNativeBuffer() { 091 this.byteBuffer.position(this.readPosition); 092 this.byteBuffer.limit(readableByteCount()); 093 return this.byteBuffer; 094 } 095 096 private void setNativeBuffer(ByteBuffer byteBuffer) { 097 this.byteBuffer = byteBuffer; 098 this.capacity = byteBuffer.remaining(); 099 } 100 101 102 @Override 103 public DefaultDataBufferFactory factory() { 104 return this.dataBufferFactory; 105 } 106 107 @Override 108 public int indexOf(IntPredicate predicate, int fromIndex) { 109 Assert.notNull(predicate, "IntPredicate must not be null"); 110 if (fromIndex < 0) { 111 fromIndex = 0; 112 } 113 else if (fromIndex >= this.writePosition) { 114 return -1; 115 } 116 for (int i = fromIndex; i < this.writePosition; i++) { 117 byte b = this.byteBuffer.get(i); 118 if (predicate.test(b)) { 119 return i; 120 } 121 } 122 return -1; 123 } 124 125 @Override 126 public int lastIndexOf(IntPredicate predicate, int fromIndex) { 127 Assert.notNull(predicate, "IntPredicate must not be null"); 128 int i = Math.min(fromIndex, this.writePosition - 1); 129 for (; i >= 0; i--) { 130 byte b = this.byteBuffer.get(i); 131 if (predicate.test(b)) { 132 return i; 133 } 134 } 135 return -1; 136 } 137 138 @Override 139 public int readableByteCount() { 140 return this.writePosition - this.readPosition; 141 } 142 143 @Override 144 public int writableByteCount() { 145 return this.capacity - this.writePosition; 146 } 147 148 @Override 149 public int readPosition() { 150 return this.readPosition; 151 } 152 153 @Override 154 public DefaultDataBuffer readPosition(int readPosition) { 155 assertIndex(readPosition >= 0, "'readPosition' %d must be >= 0", readPosition); 156 assertIndex(readPosition <= this.writePosition, "'readPosition' %d must be <= %d", 157 readPosition, this.writePosition); 158 this.readPosition = readPosition; 159 return this; 160 } 161 162 @Override 163 public int writePosition() { 164 return this.writePosition; 165 } 166 167 @Override 168 public DefaultDataBuffer writePosition(int writePosition) { 169 assertIndex(writePosition >= this.readPosition, "'writePosition' %d must be >= %d", 170 writePosition, this.readPosition); 171 assertIndex(writePosition <= this.capacity, "'writePosition' %d must be <= %d", 172 writePosition, this.capacity); 173 this.writePosition = writePosition; 174 return this; 175 } 176 177 @Override 178 public int capacity() { 179 return this.capacity; 180 } 181 182 @Override 183 public DefaultDataBuffer capacity(int newCapacity) { 184 if (newCapacity <= 0) { 185 throw new IllegalArgumentException(String.format("'newCapacity' %d must be higher than 0", newCapacity)); 186 } 187 int readPosition = readPosition(); 188 int writePosition = writePosition(); 189 int oldCapacity = capacity(); 190 191 if (newCapacity > oldCapacity) { 192 ByteBuffer oldBuffer = this.byteBuffer; 193 ByteBuffer newBuffer = allocate(newCapacity, oldBuffer.isDirect()); 194 oldBuffer.position(0).limit(oldBuffer.capacity()); 195 newBuffer.position(0).limit(oldBuffer.capacity()); 196 newBuffer.put(oldBuffer); 197 newBuffer.clear(); 198 setNativeBuffer(newBuffer); 199 } 200 else if (newCapacity < oldCapacity) { 201 ByteBuffer oldBuffer = this.byteBuffer; 202 ByteBuffer newBuffer = allocate(newCapacity, oldBuffer.isDirect()); 203 if (readPosition < newCapacity) { 204 if (writePosition > newCapacity) { 205 writePosition = newCapacity; 206 writePosition(writePosition); 207 } 208 oldBuffer.position(readPosition).limit(writePosition); 209 newBuffer.position(readPosition).limit(writePosition); 210 newBuffer.put(oldBuffer); 211 newBuffer.clear(); 212 } 213 else { 214 readPosition(newCapacity); 215 writePosition(newCapacity); 216 } 217 setNativeBuffer(newBuffer); 218 } 219 return this; 220 } 221 222 @Override 223 public DataBuffer ensureCapacity(int length) { 224 if (length > writableByteCount()) { 225 int newCapacity = calculateCapacity(this.writePosition + length); 226 capacity(newCapacity); 227 } 228 return this; 229 } 230 231 private static ByteBuffer allocate(int capacity, boolean direct) { 232 return (direct ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity)); 233 } 234 235 @Override 236 public byte getByte(int index) { 237 assertIndex(index >= 0, "index %d must be >= 0", index); 238 assertIndex(index <= this.writePosition - 1, "index %d must be <= %d", index, this.writePosition - 1); 239 return this.byteBuffer.get(index); 240 } 241 242 @Override 243 public byte read() { 244 assertIndex(this.readPosition <= this.writePosition - 1, "readPosition %d must be <= %d", 245 this.readPosition, this.writePosition - 1); 246 int pos = this.readPosition; 247 byte b = this.byteBuffer.get(pos); 248 this.readPosition = pos + 1; 249 return b; 250 } 251 252 @Override 253 public DefaultDataBuffer read(byte[] destination) { 254 Assert.notNull(destination, "Byte array must not be null"); 255 read(destination, 0, destination.length); 256 return this; 257 } 258 259 @Override 260 public DefaultDataBuffer read(byte[] destination, int offset, int length) { 261 Assert.notNull(destination, "Byte array must not be null"); 262 assertIndex(this.readPosition <= this.writePosition - length, 263 "readPosition %d and length %d should be smaller than writePosition %d", 264 this.readPosition, length, this.writePosition); 265 266 ByteBuffer tmp = this.byteBuffer.duplicate(); 267 int limit = this.readPosition + length; 268 tmp.clear().position(this.readPosition).limit(limit); 269 tmp.get(destination, offset, length); 270 271 this.readPosition += length; 272 return this; 273 } 274 275 @Override 276 public DefaultDataBuffer write(byte b) { 277 ensureCapacity(1); 278 int pos = this.writePosition; 279 this.byteBuffer.put(pos, b); 280 this.writePosition = pos + 1; 281 return this; 282 } 283 284 @Override 285 public DefaultDataBuffer write(byte[] source) { 286 Assert.notNull(source, "Byte array must not be null"); 287 write(source, 0, source.length); 288 return this; 289 } 290 291 @Override 292 public DefaultDataBuffer write(byte[] source, int offset, int length) { 293 Assert.notNull(source, "Byte array must not be null"); 294 ensureCapacity(length); 295 296 ByteBuffer tmp = this.byteBuffer.duplicate(); 297 int limit = this.writePosition + length; 298 tmp.clear().position(this.writePosition).limit(limit); 299 tmp.put(source, offset, length); 300 301 this.writePosition += length; 302 return this; 303 } 304 305 @Override 306 public DefaultDataBuffer write(DataBuffer... buffers) { 307 if (!ObjectUtils.isEmpty(buffers)) { 308 write(Arrays.stream(buffers).map(DataBuffer::asByteBuffer).toArray(ByteBuffer[]::new)); 309 } 310 return this; 311 } 312 313 @Override 314 public DefaultDataBuffer write(ByteBuffer... buffers) { 315 if (!ObjectUtils.isEmpty(buffers)) { 316 int capacity = Arrays.stream(buffers).mapToInt(ByteBuffer::remaining).sum(); 317 ensureCapacity(capacity); 318 Arrays.stream(buffers).forEach(this::write); 319 } 320 return this; 321 } 322 323 private void write(ByteBuffer source) { 324 int length = source.remaining(); 325 ByteBuffer tmp = this.byteBuffer.duplicate(); 326 int limit = this.writePosition + source.remaining(); 327 tmp.clear().position(this.writePosition).limit(limit); 328 tmp.put(source); 329 this.writePosition += length; 330 } 331 332 @Override 333 public DefaultDataBuffer slice(int index, int length) { 334 checkIndex(index, length); 335 int oldPosition = this.byteBuffer.position(); 336 // Explicit access via Buffer base type for compatibility 337 // with covariant return type on JDK 9's ByteBuffer... 338 Buffer buffer = this.byteBuffer; 339 try { 340 buffer.position(index); 341 ByteBuffer slice = this.byteBuffer.slice(); 342 // Explicit cast for compatibility with covariant return type on JDK 9's ByteBuffer 343 slice.limit(length); 344 return new SlicedDefaultDataBuffer(slice, this.dataBufferFactory, length); 345 } 346 finally { 347 buffer.position(oldPosition); 348 } 349 } 350 351 @Override 352 public ByteBuffer asByteBuffer() { 353 return asByteBuffer(this.readPosition, readableByteCount()); 354 } 355 356 @Override 357 public ByteBuffer asByteBuffer(int index, int length) { 358 checkIndex(index, length); 359 360 ByteBuffer duplicate = this.byteBuffer.duplicate(); 361 // Explicit access via Buffer base type for compatibility 362 // with covariant return type on JDK 9's ByteBuffer... 363 Buffer buffer = duplicate; 364 buffer.position(index); 365 buffer.limit(index + length); 366 return duplicate.slice(); 367 } 368 369 @Override 370 public InputStream asInputStream() { 371 return new DefaultDataBufferInputStream(); 372 } 373 374 @Override 375 public InputStream asInputStream(boolean releaseOnClose) { 376 return new DefaultDataBufferInputStream(); 377 } 378 379 @Override 380 public OutputStream asOutputStream() { 381 return new DefaultDataBufferOutputStream(); 382 } 383 384 385 @Override 386 public String toString(int index, int length, Charset charset) { 387 checkIndex(index, length); 388 Assert.notNull(charset, "Charset must not be null"); 389 390 byte[] bytes; 391 int offset; 392 393 if (this.byteBuffer.hasArray()) { 394 bytes = this.byteBuffer.array(); 395 offset = this.byteBuffer.arrayOffset() + index; 396 } 397 else { 398 bytes = new byte[length]; 399 offset = 0; 400 ByteBuffer duplicate = this.byteBuffer.duplicate(); 401 duplicate.clear().position(index).limit(index + length); 402 duplicate.get(bytes, 0, length); 403 } 404 return new String(bytes, offset, length, charset); 405 } 406 407 /** 408 * Calculate the capacity of the buffer. 409 * @see io.netty.buffer.AbstractByteBufAllocator#calculateNewCapacity(int, int) 410 */ 411 private int calculateCapacity(int neededCapacity) { 412 Assert.isTrue(neededCapacity >= 0, "'neededCapacity' must >= 0"); 413 414 if (neededCapacity == CAPACITY_THRESHOLD) { 415 return CAPACITY_THRESHOLD; 416 } 417 else if (neededCapacity > CAPACITY_THRESHOLD) { 418 int newCapacity = neededCapacity / CAPACITY_THRESHOLD * CAPACITY_THRESHOLD; 419 if (newCapacity > MAX_CAPACITY - CAPACITY_THRESHOLD) { 420 newCapacity = MAX_CAPACITY; 421 } 422 else { 423 newCapacity += CAPACITY_THRESHOLD; 424 } 425 return newCapacity; 426 } 427 else { 428 int newCapacity = 64; 429 while (newCapacity < neededCapacity) { 430 newCapacity <<= 1; 431 } 432 return Math.min(newCapacity, MAX_CAPACITY); 433 } 434 } 435 436 437 @Override 438 public boolean equals(@Nullable Object other) { 439 if (this == other) { 440 return true; 441 } 442 if (!(other instanceof DefaultDataBuffer)) { 443 return false; 444 } 445 DefaultDataBuffer otherBuffer = (DefaultDataBuffer) other; 446 return (this.readPosition == otherBuffer.readPosition && 447 this.writePosition == otherBuffer.writePosition && 448 this.byteBuffer.equals(otherBuffer.byteBuffer)); 449 } 450 451 @Override 452 public int hashCode() { 453 return this.byteBuffer.hashCode(); 454 } 455 456 @Override 457 public String toString() { 458 return String.format("DefaultDataBuffer (r: %d, w: %d, c: %d)", 459 this.readPosition, this.writePosition, this.capacity); 460 } 461 462 463 private void checkIndex(int index, int length) { 464 assertIndex(index >= 0, "index %d must be >= 0", index); 465 assertIndex(length >= 0, "length %d must be >= 0", index); 466 assertIndex(index <= this.capacity, "index %d must be <= %d", index, this.capacity); 467 assertIndex(length <= this.capacity, "length %d must be <= %d", index, this.capacity); 468 } 469 470 private void assertIndex(boolean expression, String format, Object... args) { 471 if (!expression) { 472 String message = String.format(format, args); 473 throw new IndexOutOfBoundsException(message); 474 } 475 } 476 477 478 private class DefaultDataBufferInputStream extends InputStream { 479 480 @Override 481 public int available() { 482 return readableByteCount(); 483 } 484 485 @Override 486 public int read() { 487 return available() > 0 ? DefaultDataBuffer.this.read() & 0xFF : -1; 488 } 489 490 @Override 491 public int read(byte[] bytes, int off, int len) throws IOException { 492 int available = available(); 493 if (available > 0) { 494 len = Math.min(len, available); 495 DefaultDataBuffer.this.read(bytes, off, len); 496 return len; 497 } 498 else { 499 return -1; 500 } 501 } 502 } 503 504 505 private class DefaultDataBufferOutputStream extends OutputStream { 506 507 @Override 508 public void write(int b) throws IOException { 509 DefaultDataBuffer.this.write((byte) b); 510 } 511 512 @Override 513 public void write(byte[] bytes, int off, int len) throws IOException { 514 DefaultDataBuffer.this.write(bytes, off, len); 515 } 516 } 517 518 519 private static class SlicedDefaultDataBuffer extends DefaultDataBuffer { 520 521 SlicedDefaultDataBuffer(ByteBuffer byteBuffer, DefaultDataBufferFactory dataBufferFactory, int length) { 522 super(dataBufferFactory, byteBuffer); 523 writePosition(length); 524 } 525 526 @Override 527 public DefaultDataBuffer capacity(int newCapacity) { 528 throw new UnsupportedOperationException("Changing the capacity of a sliced buffer is not supported"); 529 } 530 } 531 532}