001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.core.io.buffer; 018 019import java.io.File; 020import java.io.IOException; 021import java.io.InputStream; 022import java.io.OutputStream; 023import java.nio.ByteBuffer; 024import java.nio.channels.AsynchronousFileChannel; 025import java.nio.channels.Channel; 026import java.nio.channels.Channels; 027import java.nio.channels.CompletionHandler; 028import java.nio.channels.ReadableByteChannel; 029import java.nio.channels.WritableByteChannel; 030import java.nio.file.OpenOption; 031import java.nio.file.Path; 032import java.nio.file.StandardOpenOption; 033import java.util.HashSet; 034import java.util.Set; 035import java.util.concurrent.Callable; 036import java.util.concurrent.atomic.AtomicBoolean; 037import java.util.concurrent.atomic.AtomicLong; 038import java.util.concurrent.atomic.AtomicReference; 039import java.util.function.Consumer; 040 041import org.apache.commons.logging.Log; 042import org.apache.commons.logging.LogFactory; 043import org.reactivestreams.Publisher; 044import org.reactivestreams.Subscription; 045import reactor.core.publisher.BaseSubscriber; 046import reactor.core.publisher.Flux; 047import reactor.core.publisher.FluxSink; 048import reactor.core.publisher.Mono; 049import reactor.core.publisher.SynchronousSink; 050 051import org.springframework.core.io.Resource; 052import org.springframework.lang.Nullable; 053import org.springframework.util.Assert; 054 055/** 056 * Utility class for working with {@link DataBuffer DataBuffers}. 057 * 058 * @author Arjen Poutsma 059 * @author Brian Clozel 060 * @since 5.0 061 */ 062public abstract class DataBufferUtils { 063 064 private final static Log logger = LogFactory.getLog(DataBufferUtils.class); 065 066 private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release; 067 068 069 //--------------------------------------------------------------------- 070 // Reading 071 //--------------------------------------------------------------------- 072 073 /** 074 * Obtain a {@link InputStream} from the given supplier, and read it into a 075 * {@code Flux} of {@code DataBuffer}s. Closes the input stream when the 076 * Flux is terminated. 077 * @param inputStreamSupplier the supplier for the input stream to read from 078 * @param bufferFactory the factory to create data buffers with 079 * @param bufferSize the maximum size of the data buffers 080 * @return a Flux of data buffers read from the given channel 081 */ 082 public static Flux<DataBuffer> readInputStream( 083 Callable<InputStream> inputStreamSupplier, DataBufferFactory bufferFactory, int bufferSize) { 084 085 Assert.notNull(inputStreamSupplier, "'inputStreamSupplier' must not be null"); 086 return readByteChannel(() -> Channels.newChannel(inputStreamSupplier.call()), bufferFactory, bufferSize); 087 } 088 089 /** 090 * Obtain a {@link ReadableByteChannel} from the given supplier, and read 091 * it into a {@code Flux} of {@code DataBuffer}s. Closes the channel when 092 * the Flux is terminated. 093 * @param channelSupplier the supplier for the channel to read from 094 * @param bufferFactory the factory to create data buffers with 095 * @param bufferSize the maximum size of the data buffers 096 * @return a Flux of data buffers read from the given channel 097 */ 098 public static Flux<DataBuffer> readByteChannel( 099 Callable<ReadableByteChannel> channelSupplier, DataBufferFactory bufferFactory, int bufferSize) { 100 101 Assert.notNull(channelSupplier, "'channelSupplier' must not be null"); 102 Assert.notNull(bufferFactory, "'dataBufferFactory' must not be null"); 103 Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0"); 104 105 return Flux.using(channelSupplier, 106 channel -> Flux.generate(new ReadableByteChannelGenerator(channel, bufferFactory, bufferSize)), 107 DataBufferUtils::closeChannel); 108 109 // No doOnDiscard as operators used do not cache 110 } 111 112 /** 113 * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read 114 * it into a {@code Flux} of {@code DataBuffer}s. Closes the channel when 115 * the Flux is terminated. 116 * @param channelSupplier the supplier for the channel to read from 117 * @param bufferFactory the factory to create data buffers with 118 * @param bufferSize the maximum size of the data buffers 119 * @return a Flux of data buffers read from the given channel 120 */ 121 public static Flux<DataBuffer> readAsynchronousFileChannel( 122 Callable<AsynchronousFileChannel> channelSupplier, DataBufferFactory bufferFactory, int bufferSize) { 123 124 return readAsynchronousFileChannel(channelSupplier, 0, bufferFactory, bufferSize); 125 } 126 127 /** 128 * Obtain a {@code AsynchronousFileChannel} from the given supplier, and 129 * read it into a {@code Flux} of {@code DataBuffer}s, starting at the given 130 * position. Closes the channel when the Flux is terminated. 131 * @param channelSupplier the supplier for the channel to read from 132 * @param position the position to start reading from 133 * @param bufferFactory the factory to create data buffers with 134 * @param bufferSize the maximum size of the data buffers 135 * @return a Flux of data buffers read from the given channel 136 */ 137 public static Flux<DataBuffer> readAsynchronousFileChannel( 138 Callable<AsynchronousFileChannel> channelSupplier, long position, 139 DataBufferFactory bufferFactory, int bufferSize) { 140 141 Assert.notNull(channelSupplier, "'channelSupplier' must not be null"); 142 Assert.notNull(bufferFactory, "'dataBufferFactory' must not be null"); 143 Assert.isTrue(position >= 0, "'position' must be >= 0"); 144 Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0"); 145 146 Flux<DataBuffer> flux = Flux.using(channelSupplier, 147 channel -> Flux.create(sink -> { 148 ReadCompletionHandler handler = 149 new ReadCompletionHandler(channel, sink, position, bufferFactory, bufferSize); 150 sink.onCancel(handler::cancel); 151 sink.onRequest(handler::request); 152 }), 153 channel -> { 154 // Do not close channel from here, rather wait for the current read callback 155 // and then complete after releasing the DataBuffer. 156 }); 157 158 return flux.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); 159 } 160 161 /** 162 * Read bytes from the given file {@code Path} into a {@code Flux} of {@code DataBuffer}s. 163 * The method ensures that the file is closed when the flux is terminated. 164 * @param path the path to read bytes from 165 * @param bufferFactory the factory to create data buffers with 166 * @param bufferSize the maximum size of the data buffers 167 * @return a Flux of data buffers read from the given channel 168 * @since 5.2 169 */ 170 public static Flux<DataBuffer> read( 171 Path path, DataBufferFactory bufferFactory, int bufferSize, OpenOption... options) { 172 173 Assert.notNull(path, "Path must not be null"); 174 Assert.notNull(bufferFactory, "BufferFactory must not be null"); 175 Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0"); 176 if (options.length > 0) { 177 for (OpenOption option : options) { 178 Assert.isTrue(!(option == StandardOpenOption.APPEND || option == StandardOpenOption.WRITE), 179 "'" + option + "' not allowed"); 180 } 181 } 182 183 return readAsynchronousFileChannel(() -> AsynchronousFileChannel.open(path, options), 184 bufferFactory, bufferSize); 185 } 186 187 /** 188 * Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s. 189 * <p>If the resource is a file, it is read into an 190 * {@code AsynchronousFileChannel} and turned to {@code Flux} via 191 * {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} or else 192 * fall back to {@link #readByteChannel(Callable, DataBufferFactory, int)}. 193 * Closes the channel when the flux is terminated. 194 * @param resource the resource to read from 195 * @param bufferFactory the factory to create data buffers with 196 * @param bufferSize the maximum size of the data buffers 197 * @return a Flux of data buffers read from the given channel 198 */ 199 public static Flux<DataBuffer> read(Resource resource, DataBufferFactory bufferFactory, int bufferSize) { 200 return read(resource, 0, bufferFactory, bufferSize); 201 } 202 203 /** 204 * Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s 205 * starting at the given position. 206 * <p>If the resource is a file, it is read into an 207 * {@code AsynchronousFileChannel} and turned to {@code Flux} via 208 * {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} or else 209 * fall back on {@link #readByteChannel(Callable, DataBufferFactory, int)}. 210 * Closes the channel when the flux is terminated. 211 * @param resource the resource to read from 212 * @param position the position to start reading from 213 * @param bufferFactory the factory to create data buffers with 214 * @param bufferSize the maximum size of the data buffers 215 * @return a Flux of data buffers read from the given channel 216 */ 217 public static Flux<DataBuffer> read( 218 Resource resource, long position, DataBufferFactory bufferFactory, int bufferSize) { 219 220 try { 221 if (resource.isFile()) { 222 File file = resource.getFile(); 223 return readAsynchronousFileChannel( 224 () -> AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ), 225 position, bufferFactory, bufferSize); 226 } 227 } 228 catch (IOException ignore) { 229 // fallback to resource.readableChannel(), below 230 } 231 Flux<DataBuffer> result = readByteChannel(resource::readableChannel, bufferFactory, bufferSize); 232 return position == 0 ? result : skipUntilByteCount(result, position); 233 } 234 235 236 //--------------------------------------------------------------------- 237 // Writing 238 //--------------------------------------------------------------------- 239 240 /** 241 * Write the given stream of {@link DataBuffer DataBuffers} to the given 242 * {@code OutputStream}. Does <strong>not</strong> close the output stream 243 * when the flux is terminated, and does <strong>not</strong> 244 * {@linkplain #release(DataBuffer) release} the data buffers in the source. 245 * If releasing is required, then subscribe to the returned {@code Flux} 246 * with a {@link #releaseConsumer()}. 247 * <p>Note that the writing process does not start until the returned 248 * {@code Flux} is subscribed to. 249 * @param source the stream of data buffers to be written 250 * @param outputStream the output stream to write to 251 * @return a Flux containing the same buffers as in {@code source}, that 252 * starts the writing process when subscribed to, and that publishes any 253 * writing errors and the completion signal 254 */ 255 public static Flux<DataBuffer> write(Publisher<DataBuffer> source, OutputStream outputStream) { 256 Assert.notNull(source, "'source' must not be null"); 257 Assert.notNull(outputStream, "'outputStream' must not be null"); 258 259 WritableByteChannel channel = Channels.newChannel(outputStream); 260 return write(source, channel); 261 } 262 263 /** 264 * Write the given stream of {@link DataBuffer DataBuffers} to the given 265 * {@code WritableByteChannel}. Does <strong>not</strong> close the channel 266 * when the flux is terminated, and does <strong>not</strong> 267 * {@linkplain #release(DataBuffer) release} the data buffers in the source. 268 * If releasing is required, then subscribe to the returned {@code Flux} 269 * with a {@link #releaseConsumer()}. 270 * <p>Note that the writing process does not start until the returned 271 * {@code Flux} is subscribed to. 272 * @param source the stream of data buffers to be written 273 * @param channel the channel to write to 274 * @return a Flux containing the same buffers as in {@code source}, that 275 * starts the writing process when subscribed to, and that publishes any 276 * writing errors and the completion signal 277 */ 278 public static Flux<DataBuffer> write(Publisher<DataBuffer> source, WritableByteChannel channel) { 279 Assert.notNull(source, "'source' must not be null"); 280 Assert.notNull(channel, "'channel' must not be null"); 281 282 Flux<DataBuffer> flux = Flux.from(source); 283 return Flux.create(sink -> { 284 WritableByteChannelSubscriber subscriber = new WritableByteChannelSubscriber(sink, channel); 285 sink.onDispose(subscriber); 286 flux.subscribe(subscriber); 287 }); 288 } 289 290 /** 291 * Write the given stream of {@link DataBuffer DataBuffers} to the given 292 * {@code AsynchronousFileChannel}. Does <strong>not</strong> close the 293 * channel when the flux is terminated, and does <strong>not</strong> 294 * {@linkplain #release(DataBuffer) release} the data buffers in the source. 295 * If releasing is required, then subscribe to the returned {@code Flux} 296 * with a {@link #releaseConsumer()}. 297 * <p>Note that the writing process does not start until the returned 298 * {@code Flux} is subscribed to. 299 * @param source the stream of data buffers to be written 300 * @param channel the channel to write to 301 * @return a Flux containing the same buffers as in {@code source}, that 302 * starts the writing process when subscribed to, and that publishes any 303 * writing errors and the completion signal 304 * @since 5.0.10 305 */ 306 public static Flux<DataBuffer> write(Publisher<DataBuffer> source, AsynchronousFileChannel channel) { 307 return write(source, channel, 0); 308 } 309 310 /** 311 * Write the given stream of {@link DataBuffer DataBuffers} to the given 312 * {@code AsynchronousFileChannel}. Does <strong>not</strong> close the channel 313 * when the flux is terminated, and does <strong>not</strong> 314 * {@linkplain #release(DataBuffer) release} the data buffers in the source. 315 * If releasing is required, then subscribe to the returned {@code Flux} with a 316 * {@link #releaseConsumer()}. 317 * <p>Note that the writing process does not start until the returned 318 * {@code Flux} is subscribed to. 319 * @param source the stream of data buffers to be written 320 * @param channel the channel to write to 321 * @param position the file position where writing is to begin; must be non-negative 322 * @return a flux containing the same buffers as in {@code source}, that 323 * starts the writing process when subscribed to, and that publishes any 324 * writing errors and the completion signal 325 */ 326 public static Flux<DataBuffer> write( 327 Publisher<? extends DataBuffer> source, AsynchronousFileChannel channel, long position) { 328 329 Assert.notNull(source, "'source' must not be null"); 330 Assert.notNull(channel, "'channel' must not be null"); 331 Assert.isTrue(position >= 0, "'position' must be >= 0"); 332 333 Flux<DataBuffer> flux = Flux.from(source); 334 return Flux.create(sink -> { 335 WriteCompletionHandler handler = new WriteCompletionHandler(sink, channel, position); 336 sink.onDispose(handler); 337 flux.subscribe(handler); 338 }); 339 340 341 } 342 343 /** 344 * Write the given stream of {@link DataBuffer DataBuffers} to the given 345 * file {@link Path}. The optional {@code options} parameter specifies 346 * how the file is created or opened (defaults to 347 * {@link StandardOpenOption#CREATE CREATE}, 348 * {@link StandardOpenOption#TRUNCATE_EXISTING TRUNCATE_EXISTING}, and 349 * {@link StandardOpenOption#WRITE WRITE}). 350 * @param source the stream of data buffers to be written 351 * @param destination the path to the file 352 * @param options the options specifying how the file is opened 353 * @return a {@link Mono} that indicates completion or error 354 * @since 5.2 355 */ 356 public static Mono<Void> write(Publisher<DataBuffer> source, Path destination, OpenOption... options) { 357 Assert.notNull(source, "Source must not be null"); 358 Assert.notNull(destination, "Destination must not be null"); 359 360 Set<OpenOption> optionSet = checkWriteOptions(options); 361 362 return Mono.create(sink -> { 363 try { 364 AsynchronousFileChannel channel = AsynchronousFileChannel.open(destination, optionSet, null); 365 sink.onDispose(() -> closeChannel(channel)); 366 write(source, channel).subscribe(DataBufferUtils::release, 367 sink::error, 368 sink::success); 369 } 370 catch (IOException ex) { 371 sink.error(ex); 372 } 373 }); 374 } 375 376 private static Set<OpenOption> checkWriteOptions(OpenOption[] options) { 377 int length = options.length; 378 Set<OpenOption> result = new HashSet<>(length + 3); 379 if (length == 0) { 380 result.add(StandardOpenOption.CREATE); 381 result.add(StandardOpenOption.TRUNCATE_EXISTING); 382 } 383 else { 384 for (OpenOption opt : options) { 385 if (opt == StandardOpenOption.READ) { 386 throw new IllegalArgumentException("READ not allowed"); 387 } 388 result.add(opt); 389 } 390 } 391 result.add(StandardOpenOption.WRITE); 392 return result; 393 } 394 395 static void closeChannel(@Nullable Channel channel) { 396 if (channel != null && channel.isOpen()) { 397 try { 398 channel.close(); 399 } 400 catch (IOException ignored) { 401 } 402 } 403 } 404 405 406 //--------------------------------------------------------------------- 407 // Various 408 //--------------------------------------------------------------------- 409 410 /** 411 * Relay buffers from the given {@link Publisher} until the total 412 * {@linkplain DataBuffer#readableByteCount() byte count} reaches 413 * the given maximum byte count, or until the publisher is complete. 414 * @param publisher the publisher to filter 415 * @param maxByteCount the maximum byte count 416 * @return a flux whose maximum byte count is {@code maxByteCount} 417 */ 418 public static Flux<DataBuffer> takeUntilByteCount(Publisher<? extends DataBuffer> publisher, long maxByteCount) { 419 Assert.notNull(publisher, "Publisher must not be null"); 420 Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number"); 421 422 return Flux.defer(() -> { 423 AtomicLong countDown = new AtomicLong(maxByteCount); 424 return Flux.from(publisher) 425 .map(buffer -> { 426 long remainder = countDown.addAndGet(-buffer.readableByteCount()); 427 if (remainder < 0) { 428 int length = buffer.readableByteCount() + (int) remainder; 429 return buffer.slice(0, length); 430 } 431 else { 432 return buffer; 433 } 434 }) 435 .takeUntil(buffer -> countDown.get() <= 0); 436 }); 437 438 // No doOnDiscard as operators used do not cache (and drop) buffers 439 } 440 441 /** 442 * Skip buffers from the given {@link Publisher} until the total 443 * {@linkplain DataBuffer#readableByteCount() byte count} reaches 444 * the given maximum byte count, or until the publisher is complete. 445 * @param publisher the publisher to filter 446 * @param maxByteCount the maximum byte count 447 * @return a flux with the remaining part of the given publisher 448 */ 449 public static Flux<DataBuffer> skipUntilByteCount(Publisher<? extends DataBuffer> publisher, long maxByteCount) { 450 Assert.notNull(publisher, "Publisher must not be null"); 451 Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number"); 452 453 return Flux.defer(() -> { 454 AtomicLong countDown = new AtomicLong(maxByteCount); 455 return Flux.from(publisher) 456 .skipUntil(buffer -> { 457 long remainder = countDown.addAndGet(-buffer.readableByteCount()); 458 return remainder < 0; 459 }) 460 .map(buffer -> { 461 long remainder = countDown.get(); 462 if (remainder < 0) { 463 countDown.set(0); 464 int start = buffer.readableByteCount() + (int)remainder; 465 int length = (int) -remainder; 466 return buffer.slice(start, length); 467 } 468 else { 469 return buffer; 470 } 471 }); 472 }).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); 473 } 474 475 /** 476 * Retain the given data buffer, if it is a {@link PooledDataBuffer}. 477 * @param dataBuffer the data buffer to retain 478 * @return the retained buffer 479 */ 480 @SuppressWarnings("unchecked") 481 public static <T extends DataBuffer> T retain(T dataBuffer) { 482 if (dataBuffer instanceof PooledDataBuffer) { 483 return (T) ((PooledDataBuffer) dataBuffer).retain(); 484 } 485 else { 486 return dataBuffer; 487 } 488 } 489 490 /** 491 * Release the given data buffer, if it is a {@link PooledDataBuffer} and 492 * has been {@linkplain PooledDataBuffer#isAllocated() allocated}. 493 * @param dataBuffer the data buffer to release 494 * @return {@code true} if the buffer was released; {@code false} otherwise. 495 */ 496 public static boolean release(@Nullable DataBuffer dataBuffer) { 497 if (dataBuffer instanceof PooledDataBuffer) { 498 PooledDataBuffer pooledDataBuffer = (PooledDataBuffer) dataBuffer; 499 if (pooledDataBuffer.isAllocated()) { 500 try { 501 return pooledDataBuffer.release(); 502 } 503 catch (IllegalStateException ex) { 504 // Avoid dependency on Netty: IllegalReferenceCountException 505 if (logger.isDebugEnabled()) { 506 logger.debug("Failed to release PooledDataBuffer", ex); 507 } 508 return false; 509 } 510 } 511 } 512 return false; 513 } 514 515 /** 516 * Return a consumer that calls {@link #release(DataBuffer)} on all 517 * passed data buffers. 518 */ 519 public static Consumer<DataBuffer> releaseConsumer() { 520 return RELEASE_CONSUMER; 521 } 522 523 /** 524 * Return a new {@code DataBuffer} composed from joining together the given 525 * {@code dataBuffers} elements. Depending on the {@link DataBuffer} type, 526 * the returned buffer may be a single buffer containing all data of the 527 * provided buffers, or it may be a zero-copy, composite with references to 528 * the given buffers. 529 * <p>If {@code dataBuffers} produces an error or if there is a cancel 530 * signal, then all accumulated buffers will be 531 * {@linkplain #release(DataBuffer) released}. 532 * <p>Note that the given data buffers do <strong>not</strong> have to be 533 * released. They will be released as part of the returned composite. 534 * @param dataBuffers the data buffers that are to be composed 535 * @return a buffer that is composed from the {@code dataBuffers} argument 536 * @since 5.0.3 537 */ 538 public static Mono<DataBuffer> join(Publisher<? extends DataBuffer> dataBuffers) { 539 return join(dataBuffers, -1); 540 } 541 542 /** 543 * Variant of {@link #join(Publisher)} that behaves the same way up until 544 * the specified max number of bytes to buffer. Once the limit is exceeded, 545 * {@link DataBufferLimitException} is raised. 546 * @param buffers the data buffers that are to be composed 547 * @param maxByteCount the max number of bytes to buffer, or -1 for unlimited 548 * @return a buffer with the aggregated content, possibly an empty Mono if 549 * the max number of bytes to buffer is exceeded. 550 * @throws DataBufferLimitException if maxByteCount is exceeded 551 * @since 5.1.11 552 */ 553 @SuppressWarnings("unchecked") 554 public static Mono<DataBuffer> join(Publisher<? extends DataBuffer> buffers, int maxByteCount) { 555 Assert.notNull(buffers, "'dataBuffers' must not be null"); 556 557 if (buffers instanceof Mono) { 558 return (Mono<DataBuffer>) buffers; 559 } 560 561 return Flux.from(buffers) 562 .collect(() -> new LimitedDataBufferList(maxByteCount), LimitedDataBufferList::add) 563 .filter(list -> !list.isEmpty()) 564 .map(list -> list.get(0).factory().join(list)) 565 .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); 566 } 567 568 /** 569 * Return a {@link Matcher} for the given delimiter. 570 * The matcher can be used to find the delimiters in a stream of data buffers. 571 * @param delimiter the delimiter bytes to find 572 * @return the matcher 573 * @since 5.2 574 */ 575 public static Matcher matcher(byte[] delimiter) { 576 return createMatcher(delimiter); 577 } 578 579 /** 580 * Return a {@link Matcher} for the given delimiters. 581 * The matcher can be used to find the delimiters in a stream of data buffers. 582 * @param delimiters the delimiters bytes to find 583 * @return the matcher 584 * @since 5.2 585 */ 586 public static Matcher matcher(byte[]... delimiters) { 587 Assert.isTrue(delimiters.length > 0, "Delimiters must not be empty"); 588 return (delimiters.length == 1 ? createMatcher(delimiters[0]) : new CompositeMatcher(delimiters)); 589 } 590 591 private static NestedMatcher createMatcher(byte[] delimiter) { 592 Assert.isTrue(delimiter.length > 0, "Delimiter must not be empty"); 593 switch (delimiter.length) { 594 case 1: 595 return (delimiter[0] == 10 ? SingleByteMatcher.NEWLINE_MATCHER : new SingleByteMatcher(delimiter)); 596 case 2: 597 return new TwoByteMatcher(delimiter); 598 default: 599 return new KnuthMorrisPrattMatcher(delimiter); 600 } 601 } 602 603 604 /** 605 * Contract to find delimiter(s) against one or more data buffers that can 606 * be passed one at a time to the {@link #match(DataBuffer)} method. 607 * 608 * @since 5.2 609 * @see #match(DataBuffer) 610 */ 611 public interface Matcher { 612 613 /** 614 * Find the first matching delimiter and return the index of the last 615 * byte of the delimiter, or {@code -1} if not found. 616 */ 617 int match(DataBuffer dataBuffer); 618 619 /** 620 * Return the delimiter from the last invocation of {@link #match(DataBuffer)}. 621 */ 622 byte[] delimiter(); 623 624 /** 625 * Reset the state of this matcher. 626 */ 627 void reset(); 628 } 629 630 631 /** 632 * Matcher that supports searching for multiple delimiters. 633 */ 634 private static class CompositeMatcher implements Matcher { 635 636 private static final byte[] NO_DELIMITER = new byte[0]; 637 638 639 private final NestedMatcher[] matchers; 640 641 byte[] longestDelimiter = NO_DELIMITER; 642 643 CompositeMatcher(byte[][] delimiters) { 644 this.matchers = initMatchers(delimiters); 645 } 646 647 private static NestedMatcher[] initMatchers(byte[][] delimiters) { 648 NestedMatcher[] matchers = new NestedMatcher[delimiters.length]; 649 for (int i = 0; i < delimiters.length; i++) { 650 matchers[i] = createMatcher(delimiters[i]); 651 } 652 return matchers; 653 } 654 655 @Override 656 public int match(DataBuffer dataBuffer) { 657 this.longestDelimiter = NO_DELIMITER; 658 659 for (int pos = dataBuffer.readPosition(); pos < dataBuffer.writePosition(); pos++) { 660 byte b = dataBuffer.getByte(pos); 661 662 for (NestedMatcher matcher : this.matchers) { 663 if (matcher.match(b) && matcher.delimiter().length > this.longestDelimiter.length) { 664 this.longestDelimiter = matcher.delimiter(); 665 } 666 } 667 668 if (this.longestDelimiter != NO_DELIMITER) { 669 reset(); 670 return pos; 671 } 672 } 673 return -1; 674 } 675 676 @Override 677 public byte[] delimiter() { 678 Assert.state(this.longestDelimiter != NO_DELIMITER, "Illegal state!"); 679 return this.longestDelimiter; 680 } 681 682 @Override 683 public void reset() { 684 for (NestedMatcher matcher : this.matchers) { 685 matcher.reset(); 686 } 687 } 688 } 689 690 691 /** 692 * Matcher that can be nested within {@link CompositeMatcher} where multiple 693 * matchers advance together using the same index, one byte at a time. 694 */ 695 private interface NestedMatcher extends Matcher { 696 697 /** 698 * Perform a match against the next byte of the stream and return true 699 * if the delimiter is fully matched. 700 */ 701 boolean match(byte b); 702 703 } 704 705 706 /** 707 * Matcher for a single byte delimiter. 708 */ 709 private static class SingleByteMatcher implements NestedMatcher { 710 711 static SingleByteMatcher NEWLINE_MATCHER = new SingleByteMatcher(new byte[] {10}); 712 713 private final byte[] delimiter; 714 715 SingleByteMatcher(byte[] delimiter) { 716 Assert.isTrue(delimiter.length == 1, "Expected a 1 byte delimiter"); 717 this.delimiter = delimiter; 718 } 719 720 @Override 721 public int match(DataBuffer dataBuffer) { 722 for (int pos = dataBuffer.readPosition(); pos < dataBuffer.writePosition(); pos++) { 723 byte b = dataBuffer.getByte(pos); 724 if (match(b)) { 725 return pos; 726 } 727 } 728 return -1; 729 } 730 731 @Override 732 public boolean match(byte b) { 733 return this.delimiter[0] == b; 734 } 735 736 @Override 737 public byte[] delimiter() { 738 return this.delimiter; 739 } 740 741 @Override 742 public void reset() { 743 } 744 } 745 746 747 /** 748 * Base class for a {@link NestedMatcher}. 749 */ 750 private static abstract class AbstractNestedMatcher implements NestedMatcher { 751 752 private final byte[] delimiter; 753 754 private int matches = 0; 755 756 757 protected AbstractNestedMatcher(byte[] delimiter) { 758 this.delimiter = delimiter; 759 } 760 761 protected void setMatches(int index) { 762 this.matches = index; 763 } 764 765 protected int getMatches() { 766 return this.matches; 767 } 768 769 @Override 770 public int match(DataBuffer dataBuffer) { 771 for (int pos = dataBuffer.readPosition(); pos < dataBuffer.writePosition(); pos++) { 772 byte b = dataBuffer.getByte(pos); 773 if (match(b)) { 774 reset(); 775 return pos; 776 } 777 } 778 return -1; 779 } 780 781 @Override 782 public boolean match(byte b) { 783 if (b == this.delimiter[this.matches]) { 784 this.matches++; 785 return (this.matches == delimiter().length); 786 } 787 return false; 788 } 789 790 @Override 791 public byte[] delimiter() { 792 return this.delimiter; 793 } 794 795 @Override 796 public void reset() { 797 this.matches = 0; 798 } 799 } 800 801 802 /** 803 * Matcher with a 2 byte delimiter that does not benefit from a 804 * Knuth-Morris-Pratt suffix-prefix table. 805 */ 806 private static class TwoByteMatcher extends AbstractNestedMatcher { 807 808 protected TwoByteMatcher(byte[] delimiter) { 809 super(delimiter); 810 Assert.isTrue(delimiter.length == 2, "Expected a 2-byte delimiter"); 811 } 812 } 813 814 815 /** 816 * Implementation of {@link Matcher} that uses the Knuth-Morris-Pratt algorithm. 817 * @see <a href="https://www.nayuki.io/page/knuth-morris-pratt-string-matching">Knuth-Morris-Pratt string matching</a> 818 */ 819 private static class KnuthMorrisPrattMatcher extends AbstractNestedMatcher { 820 821 private final int[] table; 822 823 public KnuthMorrisPrattMatcher(byte[] delimiter) { 824 super(delimiter); 825 this.table = longestSuffixPrefixTable(delimiter); 826 } 827 828 private static int[] longestSuffixPrefixTable(byte[] delimiter) { 829 int[] result = new int[delimiter.length]; 830 result[0] = 0; 831 for (int i = 1; i < delimiter.length; i++) { 832 int j = result[i - 1]; 833 while (j > 0 && delimiter[i] != delimiter[j]) { 834 j = result[j - 1]; 835 } 836 if (delimiter[i] == delimiter[j]) { 837 j++; 838 } 839 result[i] = j; 840 } 841 return result; 842 } 843 844 @Override 845 public boolean match(byte b) { 846 while (getMatches() > 0 && b != delimiter()[getMatches()]) { 847 setMatches(this.table[getMatches() - 1]); 848 } 849 return super.match(b); 850 } 851 } 852 853 854 private static class ReadableByteChannelGenerator implements Consumer<SynchronousSink<DataBuffer>> { 855 856 private final ReadableByteChannel channel; 857 858 private final DataBufferFactory dataBufferFactory; 859 860 private final int bufferSize; 861 862 public ReadableByteChannelGenerator( 863 ReadableByteChannel channel, DataBufferFactory dataBufferFactory, int bufferSize) { 864 865 this.channel = channel; 866 this.dataBufferFactory = dataBufferFactory; 867 this.bufferSize = bufferSize; 868 } 869 870 @Override 871 public void accept(SynchronousSink<DataBuffer> sink) { 872 boolean release = true; 873 DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); 874 try { 875 int read; 876 ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, dataBuffer.capacity()); 877 if ((read = this.channel.read(byteBuffer)) >= 0) { 878 dataBuffer.writePosition(read); 879 release = false; 880 sink.next(dataBuffer); 881 } 882 else { 883 sink.complete(); 884 } 885 } 886 catch (IOException ex) { 887 sink.error(ex); 888 } 889 finally { 890 if (release) { 891 release(dataBuffer); 892 } 893 } 894 } 895 } 896 897 898 private static class ReadCompletionHandler implements CompletionHandler<Integer, DataBuffer> { 899 900 private final AsynchronousFileChannel channel; 901 902 private final FluxSink<DataBuffer> sink; 903 904 private final DataBufferFactory dataBufferFactory; 905 906 private final int bufferSize; 907 908 private final AtomicLong position; 909 910 private final AtomicReference<State> state = new AtomicReference<>(State.IDLE); 911 912 public ReadCompletionHandler(AsynchronousFileChannel channel, 913 FluxSink<DataBuffer> sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) { 914 915 this.channel = channel; 916 this.sink = sink; 917 this.position = new AtomicLong(position); 918 this.dataBufferFactory = dataBufferFactory; 919 this.bufferSize = bufferSize; 920 } 921 922 /** 923 * Invoked when Reactive Streams consumer signals demand. 924 */ 925 public void request(long n) { 926 tryRead(); 927 } 928 929 /** 930 * Invoked when Reactive Streams consumer cancels. 931 */ 932 public void cancel() { 933 this.state.getAndSet(State.DISPOSED); 934 935 // According java.nio.channels.AsynchronousChannel "if an I/O operation is outstanding 936 // on the channel and the channel's close method is invoked, then the I/O operation 937 // fails with the exception AsynchronousCloseException". That should invoke the failed 938 // callback below which and the current DataBuffer should be released. 939 940 closeChannel(this.channel); 941 } 942 943 private void tryRead() { 944 if (this.sink.requestedFromDownstream() > 0 && this.state.compareAndSet(State.IDLE, State.READING)) { 945 read(); 946 } 947 } 948 949 private void read() { 950 DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); 951 ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize); 952 this.channel.read(byteBuffer, this.position.get(), dataBuffer, this); 953 } 954 955 @Override 956 public void completed(Integer read, DataBuffer dataBuffer) { 957 if (this.state.get().equals(State.DISPOSED)) { 958 release(dataBuffer); 959 closeChannel(this.channel); 960 return; 961 } 962 963 if (read == -1) { 964 release(dataBuffer); 965 closeChannel(this.channel); 966 this.state.set(State.DISPOSED); 967 this.sink.complete(); 968 return; 969 } 970 971 this.position.addAndGet(read); 972 dataBuffer.writePosition(read); 973 this.sink.next(dataBuffer); 974 975 // Stay in READING mode if there is demand 976 if (this.sink.requestedFromDownstream() > 0) { 977 read(); 978 return; 979 } 980 981 // Release READING mode and then try again in case of concurrent "request" 982 if (this.state.compareAndSet(State.READING, State.IDLE)) { 983 tryRead(); 984 } 985 } 986 987 @Override 988 public void failed(Throwable exc, DataBuffer dataBuffer) { 989 release(dataBuffer); 990 closeChannel(this.channel); 991 this.state.set(State.DISPOSED); 992 this.sink.error(exc); 993 } 994 995 private enum State { 996 IDLE, READING, DISPOSED 997 } 998 } 999 1000 1001 private static class WritableByteChannelSubscriber extends BaseSubscriber<DataBuffer> { 1002 1003 private final FluxSink<DataBuffer> sink; 1004 1005 private final WritableByteChannel channel; 1006 1007 public WritableByteChannelSubscriber(FluxSink<DataBuffer> sink, WritableByteChannel channel) { 1008 this.sink = sink; 1009 this.channel = channel; 1010 } 1011 1012 @Override 1013 protected void hookOnSubscribe(Subscription subscription) { 1014 request(1); 1015 } 1016 1017 @Override 1018 protected void hookOnNext(DataBuffer dataBuffer) { 1019 try { 1020 ByteBuffer byteBuffer = dataBuffer.asByteBuffer(); 1021 while (byteBuffer.hasRemaining()) { 1022 this.channel.write(byteBuffer); 1023 } 1024 this.sink.next(dataBuffer); 1025 request(1); 1026 } 1027 catch (IOException ex) { 1028 this.sink.next(dataBuffer); 1029 this.sink.error(ex); 1030 } 1031 } 1032 1033 @Override 1034 protected void hookOnError(Throwable throwable) { 1035 this.sink.error(throwable); 1036 } 1037 1038 @Override 1039 protected void hookOnComplete() { 1040 this.sink.complete(); 1041 } 1042 } 1043 1044 1045 private static class WriteCompletionHandler extends BaseSubscriber<DataBuffer> 1046 implements CompletionHandler<Integer, ByteBuffer> { 1047 1048 private final FluxSink<DataBuffer> sink; 1049 1050 private final AsynchronousFileChannel channel; 1051 1052 private final AtomicBoolean completed = new AtomicBoolean(); 1053 1054 private final AtomicReference<Throwable> error = new AtomicReference<>(); 1055 1056 private final AtomicLong position; 1057 1058 private final AtomicReference<DataBuffer> dataBuffer = new AtomicReference<>(); 1059 1060 public WriteCompletionHandler( 1061 FluxSink<DataBuffer> sink, AsynchronousFileChannel channel, long position) { 1062 1063 this.sink = sink; 1064 this.channel = channel; 1065 this.position = new AtomicLong(position); 1066 } 1067 1068 @Override 1069 protected void hookOnSubscribe(Subscription subscription) { 1070 request(1); 1071 } 1072 1073 @Override 1074 protected void hookOnNext(DataBuffer value) { 1075 if (!this.dataBuffer.compareAndSet(null, value)) { 1076 throw new IllegalStateException(); 1077 } 1078 ByteBuffer byteBuffer = value.asByteBuffer(); 1079 this.channel.write(byteBuffer, this.position.get(), byteBuffer, this); 1080 } 1081 1082 @Override 1083 protected void hookOnError(Throwable throwable) { 1084 this.error.set(throwable); 1085 1086 if (this.dataBuffer.get() == null) { 1087 this.sink.error(throwable); 1088 } 1089 } 1090 1091 @Override 1092 protected void hookOnComplete() { 1093 this.completed.set(true); 1094 1095 if (this.dataBuffer.get() == null) { 1096 this.sink.complete(); 1097 } 1098 } 1099 1100 @Override 1101 public void completed(Integer written, ByteBuffer byteBuffer) { 1102 long pos = this.position.addAndGet(written); 1103 if (byteBuffer.hasRemaining()) { 1104 this.channel.write(byteBuffer, pos, byteBuffer, this); 1105 return; 1106 } 1107 sinkDataBuffer(); 1108 1109 Throwable throwable = this.error.get(); 1110 if (throwable != null) { 1111 this.sink.error(throwable); 1112 } 1113 else if (this.completed.get()) { 1114 this.sink.complete(); 1115 } 1116 else { 1117 request(1); 1118 } 1119 } 1120 1121 @Override 1122 public void failed(Throwable exc, ByteBuffer byteBuffer) { 1123 sinkDataBuffer(); 1124 this.sink.error(exc); 1125 } 1126 1127 private void sinkDataBuffer() { 1128 DataBuffer dataBuffer = this.dataBuffer.get(); 1129 Assert.state(dataBuffer != null, "DataBuffer should not be null"); 1130 this.sink.next(dataBuffer); 1131 this.dataBuffer.set(null); 1132 } 1133 } 1134 1135}