001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.core.io.buffer;
018
019import java.io.File;
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.OutputStream;
023import java.nio.ByteBuffer;
024import java.nio.channels.AsynchronousFileChannel;
025import java.nio.channels.Channel;
026import java.nio.channels.Channels;
027import java.nio.channels.CompletionHandler;
028import java.nio.channels.ReadableByteChannel;
029import java.nio.channels.WritableByteChannel;
030import java.nio.file.OpenOption;
031import java.nio.file.Path;
032import java.nio.file.StandardOpenOption;
033import java.util.HashSet;
034import java.util.Set;
035import java.util.concurrent.Callable;
036import java.util.concurrent.atomic.AtomicBoolean;
037import java.util.concurrent.atomic.AtomicLong;
038import java.util.concurrent.atomic.AtomicReference;
039import java.util.function.Consumer;
040
041import org.apache.commons.logging.Log;
042import org.apache.commons.logging.LogFactory;
043import org.reactivestreams.Publisher;
044import org.reactivestreams.Subscription;
045import reactor.core.publisher.BaseSubscriber;
046import reactor.core.publisher.Flux;
047import reactor.core.publisher.FluxSink;
048import reactor.core.publisher.Mono;
049import reactor.core.publisher.SynchronousSink;
050
051import org.springframework.core.io.Resource;
052import org.springframework.lang.Nullable;
053import org.springframework.util.Assert;
054
055/**
056 * Utility class for working with {@link DataBuffer DataBuffers}.
057 *
058 * @author Arjen Poutsma
059 * @author Brian Clozel
060 * @since 5.0
061 */
062public abstract class DataBufferUtils {
063
064        private final static Log logger = LogFactory.getLog(DataBufferUtils.class);
065
066        private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release;
067
068
069        //---------------------------------------------------------------------
070        // Reading
071        //---------------------------------------------------------------------
072
073        /**
074         * Obtain a {@link InputStream} from the given supplier, and read it into a
075         * {@code Flux} of {@code DataBuffer}s. Closes the input stream when the
076         * Flux is terminated.
077         * @param inputStreamSupplier the supplier for the input stream to read from
078         * @param bufferFactory the factory to create data buffers with
079         * @param bufferSize the maximum size of the data buffers
080         * @return a Flux of data buffers read from the given channel
081         */
082        public static Flux<DataBuffer> readInputStream(
083                        Callable<InputStream> inputStreamSupplier, DataBufferFactory bufferFactory, int bufferSize) {
084
085                Assert.notNull(inputStreamSupplier, "'inputStreamSupplier' must not be null");
086                return readByteChannel(() -> Channels.newChannel(inputStreamSupplier.call()), bufferFactory, bufferSize);
087        }
088
089        /**
090         * Obtain a {@link ReadableByteChannel} from the given supplier, and read
091         * it into a {@code Flux} of {@code DataBuffer}s. Closes the channel when
092         * the Flux is terminated.
093         * @param channelSupplier the supplier for the channel to read from
094         * @param bufferFactory the factory to create data buffers with
095         * @param bufferSize the maximum size of the data buffers
096         * @return a Flux of data buffers read from the given channel
097         */
098        public static Flux<DataBuffer> readByteChannel(
099                        Callable<ReadableByteChannel> channelSupplier, DataBufferFactory bufferFactory, int bufferSize) {
100
101                Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
102                Assert.notNull(bufferFactory, "'dataBufferFactory' must not be null");
103                Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
104
105                return Flux.using(channelSupplier,
106                                channel -> Flux.generate(new ReadableByteChannelGenerator(channel, bufferFactory, bufferSize)),
107                                DataBufferUtils::closeChannel);
108
109                // No doOnDiscard as operators used do not cache
110        }
111
112        /**
113         * Obtain a {@code AsynchronousFileChannel} from the given supplier, and read
114         * it into a {@code Flux} of {@code DataBuffer}s. Closes the channel when
115         * the Flux is terminated.
116         * @param channelSupplier the supplier for the channel to read from
117         * @param bufferFactory the factory to create data buffers with
118         * @param bufferSize the maximum size of the data buffers
119         * @return a Flux of data buffers read from the given channel
120         */
121        public static Flux<DataBuffer> readAsynchronousFileChannel(
122                        Callable<AsynchronousFileChannel> channelSupplier, DataBufferFactory bufferFactory, int bufferSize) {
123
124                return readAsynchronousFileChannel(channelSupplier, 0, bufferFactory, bufferSize);
125        }
126
127        /**
128         * Obtain a {@code AsynchronousFileChannel} from the given supplier, and
129         * read it into a {@code Flux} of {@code DataBuffer}s, starting at the given
130         * position. Closes the channel when the Flux is terminated.
131         * @param channelSupplier the supplier for the channel to read from
132         * @param position the position to start reading from
133         * @param bufferFactory the factory to create data buffers with
134         * @param bufferSize the maximum size of the data buffers
135         * @return a Flux of data buffers read from the given channel
136         */
137        public static Flux<DataBuffer> readAsynchronousFileChannel(
138                        Callable<AsynchronousFileChannel> channelSupplier, long position,
139                        DataBufferFactory bufferFactory, int bufferSize) {
140
141                Assert.notNull(channelSupplier, "'channelSupplier' must not be null");
142                Assert.notNull(bufferFactory, "'dataBufferFactory' must not be null");
143                Assert.isTrue(position >= 0, "'position' must be >= 0");
144                Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
145
146                Flux<DataBuffer> flux = Flux.using(channelSupplier,
147                                channel -> Flux.create(sink -> {
148                                        ReadCompletionHandler handler =
149                                                        new ReadCompletionHandler(channel, sink, position, bufferFactory, bufferSize);
150                                        sink.onCancel(handler::cancel);
151                                        sink.onRequest(handler::request);
152                                }),
153                                channel -> {
154                                        // Do not close channel from here, rather wait for the current read callback
155                                        // and then complete after releasing the DataBuffer.
156                                });
157
158                return flux.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
159        }
160
161        /**
162         * Read bytes from the given file {@code Path} into a {@code Flux} of {@code DataBuffer}s.
163         * The method ensures that the file is closed when the flux is terminated.
164         * @param path the path to read bytes from
165         * @param bufferFactory the factory to create data buffers with
166         * @param bufferSize the maximum size of the data buffers
167         * @return a Flux of data buffers read from the given channel
168         * @since 5.2
169         */
170        public static Flux<DataBuffer> read(
171                        Path path, DataBufferFactory bufferFactory, int bufferSize, OpenOption... options) {
172
173                Assert.notNull(path, "Path must not be null");
174                Assert.notNull(bufferFactory, "BufferFactory must not be null");
175                Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0");
176                if (options.length > 0) {
177                        for (OpenOption option : options) {
178                                Assert.isTrue(!(option == StandardOpenOption.APPEND || option == StandardOpenOption.WRITE),
179                                                "'" + option + "' not allowed");
180                        }
181                }
182
183                return readAsynchronousFileChannel(() -> AsynchronousFileChannel.open(path, options),
184                                bufferFactory, bufferSize);
185        }
186
187        /**
188         * Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s.
189         * <p>If the resource is a file, it is read into an
190         * {@code AsynchronousFileChannel} and turned to {@code Flux} via
191         * {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} or else
192         * fall back to {@link #readByteChannel(Callable, DataBufferFactory, int)}.
193         * Closes the channel when the flux is terminated.
194         * @param resource the resource to read from
195         * @param bufferFactory the factory to create data buffers with
196         * @param bufferSize the maximum size of the data buffers
197         * @return a Flux of data buffers read from the given channel
198         */
199        public static Flux<DataBuffer> read(Resource resource, DataBufferFactory bufferFactory, int bufferSize) {
200                return read(resource, 0, bufferFactory, bufferSize);
201        }
202
203        /**
204         * Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s
205         * starting at the given position.
206         * <p>If the resource is a file, it is read into an
207         * {@code AsynchronousFileChannel} and turned to {@code Flux} via
208         * {@link #readAsynchronousFileChannel(Callable, DataBufferFactory, int)} or else
209         * fall back on {@link #readByteChannel(Callable, DataBufferFactory, int)}.
210         * Closes the channel when the flux is terminated.
211         * @param resource the resource to read from
212         * @param position the position to start reading from
213         * @param bufferFactory the factory to create data buffers with
214         * @param bufferSize the maximum size of the data buffers
215         * @return a Flux of data buffers read from the given channel
216         */
217        public static Flux<DataBuffer> read(
218                        Resource resource, long position, DataBufferFactory bufferFactory, int bufferSize) {
219
220                try {
221                        if (resource.isFile()) {
222                                File file = resource.getFile();
223                                return readAsynchronousFileChannel(
224                                                () -> AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.READ),
225                                                position, bufferFactory, bufferSize);
226                        }
227                }
228                catch (IOException ignore) {
229                        // fallback to resource.readableChannel(), below
230                }
231                Flux<DataBuffer> result = readByteChannel(resource::readableChannel, bufferFactory, bufferSize);
232                return position == 0 ? result : skipUntilByteCount(result, position);
233        }
234
235
236        //---------------------------------------------------------------------
237        // Writing
238        //---------------------------------------------------------------------
239
240        /**
241         * Write the given stream of {@link DataBuffer DataBuffers} to the given
242         * {@code OutputStream}. Does <strong>not</strong> close the output stream
243         * when the flux is terminated, and does <strong>not</strong>
244         * {@linkplain #release(DataBuffer) release} the data buffers in the source.
245         * If releasing is required, then subscribe to the returned {@code Flux}
246         * with a {@link #releaseConsumer()}.
247         * <p>Note that the writing process does not start until the returned
248         * {@code Flux} is subscribed to.
249         * @param source the stream of data buffers to be written
250         * @param outputStream the output stream to write to
251         * @return a Flux containing the same buffers as in {@code source}, that
252         * starts the writing process when subscribed to, and that publishes any
253         * writing errors and the completion signal
254         */
255        public static Flux<DataBuffer> write(Publisher<DataBuffer> source, OutputStream outputStream) {
256                Assert.notNull(source, "'source' must not be null");
257                Assert.notNull(outputStream, "'outputStream' must not be null");
258
259                WritableByteChannel channel = Channels.newChannel(outputStream);
260                return write(source, channel);
261        }
262
263        /**
264         * Write the given stream of {@link DataBuffer DataBuffers} to the given
265         * {@code WritableByteChannel}. Does <strong>not</strong> close the channel
266         * when the flux is terminated, and does <strong>not</strong>
267         * {@linkplain #release(DataBuffer) release} the data buffers in the source.
268         * If releasing is required, then subscribe to the returned {@code Flux}
269         * with a {@link #releaseConsumer()}.
270         * <p>Note that the writing process does not start until the returned
271         * {@code Flux} is subscribed to.
272         * @param source the stream of data buffers to be written
273         * @param channel the channel to write to
274         * @return a Flux containing the same buffers as in {@code source}, that
275         * starts the writing process when subscribed to, and that publishes any
276         * writing errors and the completion signal
277         */
278        public static Flux<DataBuffer> write(Publisher<DataBuffer> source, WritableByteChannel channel) {
279                Assert.notNull(source, "'source' must not be null");
280                Assert.notNull(channel, "'channel' must not be null");
281
282                Flux<DataBuffer> flux = Flux.from(source);
283                return Flux.create(sink -> {
284                        WritableByteChannelSubscriber subscriber = new WritableByteChannelSubscriber(sink, channel);
285                        sink.onDispose(subscriber);
286                        flux.subscribe(subscriber);
287                });
288        }
289
290        /**
291         * Write the given stream of {@link DataBuffer DataBuffers} to the given
292         * {@code AsynchronousFileChannel}. Does <strong>not</strong> close the
293         * channel when the flux is terminated, and does <strong>not</strong>
294         * {@linkplain #release(DataBuffer) release} the data buffers in the source.
295         * If releasing is required, then subscribe to the returned {@code Flux}
296         * with a {@link #releaseConsumer()}.
297         * <p>Note that the writing process does not start until the returned
298         * {@code Flux} is subscribed to.
299         * @param source the stream of data buffers to be written
300         * @param channel the channel to write to
301         * @return a Flux containing the same buffers as in {@code source}, that
302         * starts the writing process when subscribed to, and that publishes any
303         * writing errors and the completion signal
304         * @since 5.0.10
305         */
306        public static Flux<DataBuffer> write(Publisher<DataBuffer> source, AsynchronousFileChannel channel) {
307                return write(source, channel, 0);
308        }
309
310        /**
311         * Write the given stream of {@link DataBuffer DataBuffers} to the given
312         * {@code AsynchronousFileChannel}. Does <strong>not</strong> close the channel
313         * when the flux is terminated, and does <strong>not</strong>
314         * {@linkplain #release(DataBuffer) release} the data buffers in the source.
315         * If releasing is required, then subscribe to the returned {@code Flux} with a
316         * {@link #releaseConsumer()}.
317         * <p>Note that the writing process does not start until the returned
318         * {@code Flux} is subscribed to.
319         * @param source the stream of data buffers to be written
320         * @param channel the channel to write to
321         * @param position the file position where writing is to begin; must be non-negative
322         * @return a flux containing the same buffers as in {@code source}, that
323         * starts the writing process when subscribed to, and that publishes any
324         * writing errors and the completion signal
325         */
326        public static Flux<DataBuffer> write(
327                        Publisher<? extends DataBuffer> source, AsynchronousFileChannel channel, long position) {
328
329                Assert.notNull(source, "'source' must not be null");
330                Assert.notNull(channel, "'channel' must not be null");
331                Assert.isTrue(position >= 0, "'position' must be >= 0");
332
333                Flux<DataBuffer> flux = Flux.from(source);
334                return Flux.create(sink -> {
335                        WriteCompletionHandler handler = new WriteCompletionHandler(sink, channel, position);
336                        sink.onDispose(handler);
337                        flux.subscribe(handler);
338                });
339
340
341        }
342
343        /**
344         * Write the given stream of {@link DataBuffer DataBuffers} to the given
345         * file {@link Path}. The optional {@code options} parameter specifies
346         * how the file is created or opened (defaults to
347         * {@link StandardOpenOption#CREATE CREATE},
348         * {@link StandardOpenOption#TRUNCATE_EXISTING TRUNCATE_EXISTING}, and
349         * {@link StandardOpenOption#WRITE WRITE}).
350         * @param source the stream of data buffers to be written
351         * @param destination the path to the file
352         * @param options the options specifying how the file is opened
353         * @return a {@link Mono} that indicates completion or error
354         * @since 5.2
355         */
356        public static Mono<Void> write(Publisher<DataBuffer> source, Path destination, OpenOption... options) {
357                Assert.notNull(source, "Source must not be null");
358                Assert.notNull(destination, "Destination must not be null");
359
360                Set<OpenOption> optionSet = checkWriteOptions(options);
361
362                return Mono.create(sink -> {
363                        try {
364                                AsynchronousFileChannel channel = AsynchronousFileChannel.open(destination, optionSet, null);
365                                sink.onDispose(() -> closeChannel(channel));
366                                write(source, channel).subscribe(DataBufferUtils::release,
367                                                sink::error,
368                                                sink::success);
369                        }
370                        catch (IOException ex) {
371                                sink.error(ex);
372                        }
373                });
374        }
375
376        private static Set<OpenOption> checkWriteOptions(OpenOption[] options) {
377                int length = options.length;
378                Set<OpenOption> result = new HashSet<>(length + 3);
379                if (length == 0) {
380                        result.add(StandardOpenOption.CREATE);
381                        result.add(StandardOpenOption.TRUNCATE_EXISTING);
382                }
383                else {
384                        for (OpenOption opt : options) {
385                                if (opt == StandardOpenOption.READ) {
386                                        throw new IllegalArgumentException("READ not allowed");
387                                }
388                                result.add(opt);
389                        }
390                }
391                result.add(StandardOpenOption.WRITE);
392                return result;
393        }
394
395        static void closeChannel(@Nullable Channel channel) {
396                if (channel != null && channel.isOpen()) {
397                        try {
398                                channel.close();
399                        }
400                        catch (IOException ignored) {
401                        }
402                }
403        }
404
405
406        //---------------------------------------------------------------------
407        // Various
408        //---------------------------------------------------------------------
409
410        /**
411         * Relay buffers from the given {@link Publisher} until the total
412         * {@linkplain DataBuffer#readableByteCount() byte count} reaches
413         * the given maximum byte count, or until the publisher is complete.
414         * @param publisher the publisher to filter
415         * @param maxByteCount the maximum byte count
416         * @return a flux whose maximum byte count is {@code maxByteCount}
417         */
418        public static Flux<DataBuffer> takeUntilByteCount(Publisher<? extends DataBuffer> publisher, long maxByteCount) {
419                Assert.notNull(publisher, "Publisher must not be null");
420                Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
421
422                return Flux.defer(() -> {
423                        AtomicLong countDown = new AtomicLong(maxByteCount);
424                        return Flux.from(publisher)
425                                        .map(buffer -> {
426                                                long remainder = countDown.addAndGet(-buffer.readableByteCount());
427                                                if (remainder < 0) {
428                                                        int length = buffer.readableByteCount() + (int) remainder;
429                                                        return buffer.slice(0, length);
430                                                }
431                                                else {
432                                                        return buffer;
433                                                }
434                                        })
435                                        .takeUntil(buffer -> countDown.get() <= 0);
436                });
437
438                // No doOnDiscard as operators used do not cache (and drop) buffers
439        }
440
441        /**
442         * Skip buffers from the given {@link Publisher} until the total
443         * {@linkplain DataBuffer#readableByteCount() byte count} reaches
444         * the given maximum byte count, or until the publisher is complete.
445         * @param publisher the publisher to filter
446         * @param maxByteCount the maximum byte count
447         * @return a flux with the remaining part of the given publisher
448         */
449        public static Flux<DataBuffer> skipUntilByteCount(Publisher<? extends DataBuffer> publisher, long maxByteCount) {
450                Assert.notNull(publisher, "Publisher must not be null");
451                Assert.isTrue(maxByteCount >= 0, "'maxByteCount' must be a positive number");
452
453                return Flux.defer(() -> {
454                        AtomicLong countDown = new AtomicLong(maxByteCount);
455                        return Flux.from(publisher)
456                                        .skipUntil(buffer -> {
457                                                long remainder = countDown.addAndGet(-buffer.readableByteCount());
458                                                return remainder < 0;
459                                        })
460                                        .map(buffer -> {
461                                                long remainder = countDown.get();
462                                                if (remainder < 0) {
463                                                        countDown.set(0);
464                                                        int start = buffer.readableByteCount() + (int)remainder;
465                                                        int length = (int) -remainder;
466                                                        return buffer.slice(start, length);
467                                                }
468                                                else {
469                                                        return buffer;
470                                                }
471                                        });
472                }).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
473        }
474
475        /**
476         * Retain the given data buffer, if it is a {@link PooledDataBuffer}.
477         * @param dataBuffer the data buffer to retain
478         * @return the retained buffer
479         */
480        @SuppressWarnings("unchecked")
481        public static <T extends DataBuffer> T retain(T dataBuffer) {
482                if (dataBuffer instanceof PooledDataBuffer) {
483                        return (T) ((PooledDataBuffer) dataBuffer).retain();
484                }
485                else {
486                        return dataBuffer;
487                }
488        }
489
490        /**
491         * Release the given data buffer, if it is a {@link PooledDataBuffer} and
492         * has been {@linkplain PooledDataBuffer#isAllocated() allocated}.
493         * @param dataBuffer the data buffer to release
494         * @return {@code true} if the buffer was released; {@code false} otherwise.
495         */
496        public static boolean release(@Nullable DataBuffer dataBuffer) {
497                if (dataBuffer instanceof PooledDataBuffer) {
498                        PooledDataBuffer pooledDataBuffer = (PooledDataBuffer) dataBuffer;
499                        if (pooledDataBuffer.isAllocated()) {
500                                try {
501                                        return pooledDataBuffer.release();
502                                }
503                                catch (IllegalStateException ex) {
504                                        // Avoid dependency on Netty: IllegalReferenceCountException
505                                        if (logger.isDebugEnabled()) {
506                                                logger.debug("Failed to release PooledDataBuffer", ex);
507                                        }
508                                        return false;
509                                }
510                        }
511                }
512                return false;
513        }
514
515        /**
516         * Return a consumer that calls {@link #release(DataBuffer)} on all
517         * passed data buffers.
518         */
519        public static Consumer<DataBuffer> releaseConsumer() {
520                return RELEASE_CONSUMER;
521        }
522
523        /**
524         * Return a new {@code DataBuffer} composed from joining together the given
525         * {@code dataBuffers} elements. Depending on the {@link DataBuffer} type,
526         * the returned buffer may be a single buffer containing all data of the
527         * provided buffers, or it may be a zero-copy, composite with references to
528         * the given buffers.
529         * <p>If {@code dataBuffers} produces an error or if there is a cancel
530         * signal, then all accumulated buffers will be
531         * {@linkplain #release(DataBuffer) released}.
532         * <p>Note that the given data buffers do <strong>not</strong> have to be
533         * released. They will be released as part of the returned composite.
534         * @param dataBuffers the data buffers that are to be composed
535         * @return a buffer that is composed from the {@code dataBuffers} argument
536         * @since 5.0.3
537         */
538        public static Mono<DataBuffer> join(Publisher<? extends DataBuffer> dataBuffers) {
539                return join(dataBuffers, -1);
540        }
541
542        /**
543         * Variant of {@link #join(Publisher)} that behaves the same way up until
544         * the specified max number of bytes to buffer. Once the limit is exceeded,
545         * {@link DataBufferLimitException} is raised.
546         * @param buffers the data buffers that are to be composed
547         * @param maxByteCount the max number of bytes to buffer, or -1 for unlimited
548         * @return a buffer with the aggregated content, possibly an empty Mono if
549         * the max number of bytes to buffer is exceeded.
550         * @throws DataBufferLimitException if maxByteCount is exceeded
551         * @since 5.1.11
552         */
553        @SuppressWarnings("unchecked")
554        public static Mono<DataBuffer> join(Publisher<? extends DataBuffer> buffers, int maxByteCount) {
555                Assert.notNull(buffers, "'dataBuffers' must not be null");
556
557                if (buffers instanceof Mono) {
558                        return (Mono<DataBuffer>) buffers;
559                }
560
561                return Flux.from(buffers)
562                                .collect(() -> new LimitedDataBufferList(maxByteCount), LimitedDataBufferList::add)
563                                .filter(list -> !list.isEmpty())
564                                .map(list -> list.get(0).factory().join(list))
565                                .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
566        }
567
568        /**
569         * Return a {@link Matcher} for the given delimiter.
570         * The matcher can be used to find the delimiters in a stream of data buffers.
571         * @param delimiter the delimiter bytes to find
572         * @return the matcher
573         * @since 5.2
574         */
575        public static Matcher matcher(byte[] delimiter) {
576                return createMatcher(delimiter);
577        }
578
579        /**
580         * Return a {@link Matcher} for the given delimiters.
581         * The matcher can be used to find the delimiters in a stream of data buffers.
582         * @param delimiters the delimiters bytes to find
583         * @return the matcher
584         * @since 5.2
585         */
586        public static Matcher matcher(byte[]... delimiters) {
587                Assert.isTrue(delimiters.length > 0, "Delimiters must not be empty");
588                return (delimiters.length == 1 ? createMatcher(delimiters[0]) : new CompositeMatcher(delimiters));
589        }
590
591        private static NestedMatcher createMatcher(byte[] delimiter) {
592                Assert.isTrue(delimiter.length > 0, "Delimiter must not be empty");
593                switch (delimiter.length) {
594                        case 1:
595                                return (delimiter[0] == 10 ? SingleByteMatcher.NEWLINE_MATCHER : new SingleByteMatcher(delimiter));
596                        case 2:
597                                return new TwoByteMatcher(delimiter);
598                        default:
599                                return new KnuthMorrisPrattMatcher(delimiter);
600                }
601        }
602
603
604        /**
605         * Contract to find delimiter(s) against one or more data buffers that can
606         * be passed one at a time to the {@link #match(DataBuffer)} method.
607         *
608         * @since 5.2
609         * @see #match(DataBuffer)
610         */
611        public interface Matcher {
612
613                /**
614                 * Find the first matching delimiter and return the index of the last
615                 * byte of the delimiter, or {@code -1} if not found.
616                 */
617                int match(DataBuffer dataBuffer);
618
619                /**
620                 * Return the delimiter from the last invocation of {@link #match(DataBuffer)}.
621                 */
622                byte[] delimiter();
623
624                /**
625                 * Reset the state of this matcher.
626                 */
627                void reset();
628        }
629
630
631        /**
632         * Matcher that supports searching for multiple delimiters.
633         */
634        private static class CompositeMatcher implements Matcher {
635
636                private static final byte[] NO_DELIMITER = new byte[0];
637
638
639                private final NestedMatcher[] matchers;
640
641                byte[] longestDelimiter = NO_DELIMITER;
642
643                CompositeMatcher(byte[][] delimiters) {
644                        this.matchers = initMatchers(delimiters);
645                }
646
647                private static NestedMatcher[] initMatchers(byte[][] delimiters) {
648                        NestedMatcher[] matchers = new NestedMatcher[delimiters.length];
649                        for (int i = 0; i < delimiters.length; i++) {
650                                matchers[i] = createMatcher(delimiters[i]);
651                        }
652                        return matchers;
653                }
654
655                @Override
656                public int match(DataBuffer dataBuffer) {
657                        this.longestDelimiter = NO_DELIMITER;
658
659                        for (int pos = dataBuffer.readPosition(); pos < dataBuffer.writePosition(); pos++) {
660                                byte b = dataBuffer.getByte(pos);
661
662                                for (NestedMatcher matcher : this.matchers) {
663                                        if (matcher.match(b) && matcher.delimiter().length > this.longestDelimiter.length) {
664                                                this.longestDelimiter = matcher.delimiter();
665                                        }
666                                }
667
668                                if (this.longestDelimiter != NO_DELIMITER) {
669                                        reset();
670                                        return pos;
671                                }
672                        }
673                        return -1;
674                }
675
676                @Override
677                public byte[] delimiter() {
678                        Assert.state(this.longestDelimiter != NO_DELIMITER, "Illegal state!");
679                        return this.longestDelimiter;
680                }
681
682                @Override
683                public void reset() {
684                        for (NestedMatcher matcher : this.matchers) {
685                                matcher.reset();
686                        }
687                }
688        }
689
690
691        /**
692         * Matcher that can be nested within {@link CompositeMatcher} where multiple
693         * matchers advance together using the same index, one byte at a time.
694         */
695        private interface NestedMatcher extends Matcher {
696
697                /**
698                 * Perform a match against the next byte of the stream and return true
699                 * if the delimiter is fully matched.
700                 */
701                boolean match(byte b);
702
703        }
704
705
706        /**
707         * Matcher for a single byte delimiter.
708         */
709        private static class SingleByteMatcher implements NestedMatcher {
710
711                static SingleByteMatcher NEWLINE_MATCHER = new SingleByteMatcher(new byte[] {10});
712
713                private final byte[] delimiter;
714
715                SingleByteMatcher(byte[] delimiter) {
716                        Assert.isTrue(delimiter.length == 1, "Expected a 1 byte delimiter");
717                        this.delimiter = delimiter;
718                }
719
720                @Override
721                public int match(DataBuffer dataBuffer) {
722                        for (int pos = dataBuffer.readPosition(); pos < dataBuffer.writePosition(); pos++) {
723                                byte b = dataBuffer.getByte(pos);
724                                if (match(b)) {
725                                        return pos;
726                                }
727                        }
728                        return -1;
729                }
730
731                @Override
732                public boolean match(byte b) {
733                        return this.delimiter[0] == b;
734                }
735
736                @Override
737                public byte[] delimiter() {
738                        return this.delimiter;
739                }
740
741                @Override
742                public void reset() {
743                }
744        }
745
746
747        /**
748         * Base class for a {@link NestedMatcher}.
749         */
750        private static abstract class AbstractNestedMatcher implements NestedMatcher {
751
752                private final byte[] delimiter;
753
754                private int matches = 0;
755
756
757                protected AbstractNestedMatcher(byte[] delimiter) {
758                        this.delimiter = delimiter;
759                }
760
761                protected void setMatches(int index) {
762                        this.matches = index;
763                }
764
765                protected int getMatches() {
766                        return this.matches;
767                }
768
769                @Override
770                public int match(DataBuffer dataBuffer) {
771                        for (int pos = dataBuffer.readPosition(); pos < dataBuffer.writePosition(); pos++) {
772                                byte b = dataBuffer.getByte(pos);
773                                if (match(b)) {
774                                        reset();
775                                        return pos;
776                                }
777                        }
778                        return -1;
779                }
780
781                @Override
782                public boolean match(byte b) {
783                        if (b == this.delimiter[this.matches]) {
784                                this.matches++;
785                                return (this.matches == delimiter().length);
786                        }
787                        return false;
788                }
789
790                @Override
791                public byte[] delimiter() {
792                        return this.delimiter;
793                }
794
795                @Override
796                public void reset() {
797                        this.matches = 0;
798                }
799        }
800
801
802        /**
803         * Matcher with a 2 byte delimiter that does not benefit from a
804         * Knuth-Morris-Pratt suffix-prefix table.
805         */
806        private static class TwoByteMatcher extends AbstractNestedMatcher {
807
808                protected TwoByteMatcher(byte[] delimiter) {
809                        super(delimiter);
810                        Assert.isTrue(delimiter.length == 2, "Expected a 2-byte delimiter");
811                }
812        }
813
814
815        /**
816         * Implementation of {@link Matcher} that uses the Knuth-Morris-Pratt algorithm.
817         * @see <a href="https://www.nayuki.io/page/knuth-morris-pratt-string-matching">Knuth-Morris-Pratt string matching</a>
818         */
819        private static class KnuthMorrisPrattMatcher extends AbstractNestedMatcher {
820
821                private final int[] table;
822
823                public KnuthMorrisPrattMatcher(byte[] delimiter) {
824                        super(delimiter);
825                        this.table = longestSuffixPrefixTable(delimiter);
826                }
827
828                private static int[] longestSuffixPrefixTable(byte[] delimiter) {
829                        int[] result = new int[delimiter.length];
830                        result[0] = 0;
831                        for (int i = 1; i < delimiter.length; i++) {
832                                int j = result[i - 1];
833                                while (j > 0 && delimiter[i] != delimiter[j]) {
834                                        j = result[j - 1];
835                                }
836                                if (delimiter[i] == delimiter[j]) {
837                                        j++;
838                                }
839                                result[i] = j;
840                        }
841                        return result;
842                }
843
844                @Override
845                public boolean match(byte b) {
846                        while (getMatches() > 0 && b != delimiter()[getMatches()]) {
847                                setMatches(this.table[getMatches() - 1]);
848                        }
849                        return super.match(b);
850                }
851        }
852
853
854        private static class ReadableByteChannelGenerator implements Consumer<SynchronousSink<DataBuffer>> {
855
856                private final ReadableByteChannel channel;
857
858                private final DataBufferFactory dataBufferFactory;
859
860                private final int bufferSize;
861
862                public ReadableByteChannelGenerator(
863                                ReadableByteChannel channel, DataBufferFactory dataBufferFactory, int bufferSize) {
864
865                        this.channel = channel;
866                        this.dataBufferFactory = dataBufferFactory;
867                        this.bufferSize = bufferSize;
868                }
869
870                @Override
871                public void accept(SynchronousSink<DataBuffer> sink) {
872                        boolean release = true;
873                        DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
874                        try {
875                                int read;
876                                ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, dataBuffer.capacity());
877                                if ((read = this.channel.read(byteBuffer)) >= 0) {
878                                        dataBuffer.writePosition(read);
879                                        release = false;
880                                        sink.next(dataBuffer);
881                                }
882                                else {
883                                        sink.complete();
884                                }
885                        }
886                        catch (IOException ex) {
887                                sink.error(ex);
888                        }
889                        finally {
890                                if (release) {
891                                        release(dataBuffer);
892                                }
893                        }
894                }
895        }
896
897
898        private static class ReadCompletionHandler implements CompletionHandler<Integer, DataBuffer> {
899
900                private final AsynchronousFileChannel channel;
901
902                private final FluxSink<DataBuffer> sink;
903
904                private final DataBufferFactory dataBufferFactory;
905
906                private final int bufferSize;
907
908                private final AtomicLong position;
909
910                private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
911
912                public ReadCompletionHandler(AsynchronousFileChannel channel,
913                                FluxSink<DataBuffer> sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) {
914
915                        this.channel = channel;
916                        this.sink = sink;
917                        this.position = new AtomicLong(position);
918                        this.dataBufferFactory = dataBufferFactory;
919                        this.bufferSize = bufferSize;
920                }
921
922                /**
923                 * Invoked when Reactive Streams consumer signals demand.
924                 */
925                public void request(long n) {
926                        tryRead();
927                }
928
929                /**
930                 * Invoked when Reactive Streams consumer cancels.
931                 */
932                public void cancel() {
933                        this.state.getAndSet(State.DISPOSED);
934
935                        // According java.nio.channels.AsynchronousChannel "if an I/O operation is outstanding
936                        // on the channel and the channel's close method is invoked, then the I/O operation
937                        // fails with the exception AsynchronousCloseException". That should invoke the failed
938                        // callback below which and the current DataBuffer should be released.
939
940                        closeChannel(this.channel);
941                }
942
943                private void tryRead() {
944                        if (this.sink.requestedFromDownstream() > 0 && this.state.compareAndSet(State.IDLE, State.READING)) {
945                                read();
946                        }
947                }
948
949                private void read() {
950                        DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize);
951                        ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, this.bufferSize);
952                        this.channel.read(byteBuffer, this.position.get(), dataBuffer, this);
953                }
954
955                @Override
956                public void completed(Integer read, DataBuffer dataBuffer) {
957                        if (this.state.get().equals(State.DISPOSED)) {
958                                release(dataBuffer);
959                                closeChannel(this.channel);
960                                return;
961                        }
962
963                        if (read == -1) {
964                                release(dataBuffer);
965                                closeChannel(this.channel);
966                                this.state.set(State.DISPOSED);
967                                this.sink.complete();
968                                return;
969                        }
970
971                        this.position.addAndGet(read);
972                        dataBuffer.writePosition(read);
973                        this.sink.next(dataBuffer);
974
975                        // Stay in READING mode if there is demand
976                        if (this.sink.requestedFromDownstream() > 0) {
977                                read();
978                                return;
979                        }
980
981                        // Release READING mode and then try again in case of concurrent "request"
982                        if (this.state.compareAndSet(State.READING, State.IDLE)) {
983                                tryRead();
984                        }
985                }
986
987                @Override
988                public void failed(Throwable exc, DataBuffer dataBuffer) {
989                        release(dataBuffer);
990                        closeChannel(this.channel);
991                        this.state.set(State.DISPOSED);
992                        this.sink.error(exc);
993                }
994
995                private enum State {
996                        IDLE, READING, DISPOSED
997                }
998        }
999
1000
1001        private static class WritableByteChannelSubscriber extends BaseSubscriber<DataBuffer> {
1002
1003                private final FluxSink<DataBuffer> sink;
1004
1005                private final WritableByteChannel channel;
1006
1007                public WritableByteChannelSubscriber(FluxSink<DataBuffer> sink, WritableByteChannel channel) {
1008                        this.sink = sink;
1009                        this.channel = channel;
1010                }
1011
1012                @Override
1013                protected void hookOnSubscribe(Subscription subscription) {
1014                        request(1);
1015                }
1016
1017                @Override
1018                protected void hookOnNext(DataBuffer dataBuffer) {
1019                        try {
1020                                ByteBuffer byteBuffer = dataBuffer.asByteBuffer();
1021                                while (byteBuffer.hasRemaining()) {
1022                                        this.channel.write(byteBuffer);
1023                                }
1024                                this.sink.next(dataBuffer);
1025                                request(1);
1026                        }
1027                        catch (IOException ex) {
1028                                this.sink.next(dataBuffer);
1029                                this.sink.error(ex);
1030                        }
1031                }
1032
1033                @Override
1034                protected void hookOnError(Throwable throwable) {
1035                        this.sink.error(throwable);
1036                }
1037
1038                @Override
1039                protected void hookOnComplete() {
1040                        this.sink.complete();
1041                }
1042        }
1043
1044
1045        private static class WriteCompletionHandler extends BaseSubscriber<DataBuffer>
1046                        implements CompletionHandler<Integer, ByteBuffer> {
1047
1048                private final FluxSink<DataBuffer> sink;
1049
1050                private final AsynchronousFileChannel channel;
1051
1052                private final AtomicBoolean completed = new AtomicBoolean();
1053
1054                private final AtomicReference<Throwable> error = new AtomicReference<>();
1055
1056                private final AtomicLong position;
1057
1058                private final AtomicReference<DataBuffer> dataBuffer = new AtomicReference<>();
1059
1060                public WriteCompletionHandler(
1061                                FluxSink<DataBuffer> sink, AsynchronousFileChannel channel, long position) {
1062
1063                        this.sink = sink;
1064                        this.channel = channel;
1065                        this.position = new AtomicLong(position);
1066                }
1067
1068                @Override
1069                protected void hookOnSubscribe(Subscription subscription) {
1070                        request(1);
1071                }
1072
1073                @Override
1074                protected void hookOnNext(DataBuffer value) {
1075                        if (!this.dataBuffer.compareAndSet(null, value)) {
1076                                throw new IllegalStateException();
1077                        }
1078                        ByteBuffer byteBuffer = value.asByteBuffer();
1079                        this.channel.write(byteBuffer, this.position.get(), byteBuffer, this);
1080                }
1081
1082                @Override
1083                protected void hookOnError(Throwable throwable) {
1084                        this.error.set(throwable);
1085
1086                        if (this.dataBuffer.get() == null) {
1087                                this.sink.error(throwable);
1088                        }
1089                }
1090
1091                @Override
1092                protected void hookOnComplete() {
1093                        this.completed.set(true);
1094
1095                        if (this.dataBuffer.get() == null) {
1096                                this.sink.complete();
1097                        }
1098                }
1099
1100                @Override
1101                public void completed(Integer written, ByteBuffer byteBuffer) {
1102                        long pos = this.position.addAndGet(written);
1103                        if (byteBuffer.hasRemaining()) {
1104                                this.channel.write(byteBuffer, pos, byteBuffer, this);
1105                                return;
1106                        }
1107                        sinkDataBuffer();
1108
1109                        Throwable throwable = this.error.get();
1110                        if (throwable != null) {
1111                                this.sink.error(throwable);
1112                        }
1113                        else if (this.completed.get()) {
1114                                this.sink.complete();
1115                        }
1116                        else {
1117                                request(1);
1118                        }
1119                }
1120
1121                @Override
1122                public void failed(Throwable exc, ByteBuffer byteBuffer) {
1123                        sinkDataBuffer();
1124                        this.sink.error(exc);
1125                }
1126
1127                private void sinkDataBuffer() {
1128                        DataBuffer dataBuffer = this.dataBuffer.get();
1129                        Assert.state(dataBuffer != null, "DataBuffer should not be null");
1130                        this.sink.next(dataBuffer);
1131                        this.dataBuffer.set(null);
1132                }
1133        }
1134
1135}