001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.http.server.reactive;
018
019import java.util.function.Function;
020
021import org.reactivestreams.Publisher;
022import org.reactivestreams.Subscriber;
023import org.reactivestreams.Subscription;
024import reactor.core.CoreSubscriber;
025import reactor.core.Scannable;
026import reactor.core.publisher.Flux;
027import reactor.core.publisher.Mono;
028import reactor.core.publisher.Operators;
029import reactor.util.context.Context;
030
031import org.springframework.core.io.buffer.DataBuffer;
032import org.springframework.core.io.buffer.DataBufferUtils;
033import org.springframework.lang.Nullable;
034import org.springframework.util.Assert;
035
036/**
037 * Given a write function that accepts a source {@code Publisher<T>} to write
038 * with and returns {@code Publisher<Void>} for the result, this operator helps
039 * to defer the invocation of the write function, until we know if the source
040 * publisher will begin publishing without an error. If the first emission is
041 * an error, the write function is bypassed, and the error is sent directly
042 * through the result publisher. Otherwise the write function is invoked.
043 *
044 * @author Rossen Stoyanchev
045 * @author Stephane Maldini
046 * @since 5.0
047 * @param <T> the type of element signaled
048 */
049public class ChannelSendOperator<T> extends Mono<Void> implements Scannable {
050
051        private final Function<Publisher<T>, Publisher<Void>> writeFunction;
052
053        private final Flux<T> source;
054
055
056        public ChannelSendOperator(Publisher<? extends T> source, Function<Publisher<T>, Publisher<Void>> writeFunction) {
057                this.source = Flux.from(source);
058                this.writeFunction = writeFunction;
059        }
060
061
062        @Override
063        @Nullable
064        @SuppressWarnings("rawtypes")
065        public Object scanUnsafe(Attr key) {
066                if (key == Attr.PREFETCH) {
067                        return Integer.MAX_VALUE;
068                }
069                if (key == Attr.PARENT) {
070                        return this.source;
071                }
072                return null;
073        }
074
075        @Override
076        public void subscribe(CoreSubscriber<? super Void> actual) {
077                this.source.subscribe(new WriteBarrier(actual));
078        }
079
080
081        private enum State {
082
083                /** No emissions from the upstream source yet. */
084                NEW,
085
086                /**
087                 * At least one signal of any kind has been received; we're ready to
088                 * call the write function and proceed with actual writing.
089                 */
090                FIRST_SIGNAL_RECEIVED,
091
092                /**
093                 * The write subscriber has subscribed and requested; we're going to
094                 * emit the cached signals.
095                 */
096                EMITTING_CACHED_SIGNALS,
097
098                /**
099                 * The write subscriber has subscribed, and cached signals have been
100                 * emitted to it; we're ready to switch to a simple pass-through mode
101                 * for all remaining signals.
102                 **/
103                READY_TO_WRITE
104
105        }
106
107
108        /**
109         * A barrier inserted between the write source and the write subscriber
110         * (i.e. the HTTP server adapter) that pre-fetches and waits for the first
111         * signal before deciding whether to hook in to the write subscriber.
112         *
113         * <p>Acts as:
114         * <ul>
115         * <li>Subscriber to the write source.
116         * <li>Subscription to the write subscriber.
117         * <li>Publisher to the write subscriber.
118         * </ul>
119         *
120         * <p>Also uses {@link WriteCompletionBarrier} to communicate completion
121         * and detect cancel signals from the completion subscriber.
122         */
123        private class WriteBarrier implements CoreSubscriber<T>, Subscription, Publisher<T> {
124
125                /* Bridges signals to and from the completionSubscriber */
126                private final WriteCompletionBarrier writeCompletionBarrier;
127
128                /* Upstream write source subscription */
129                @Nullable
130                private Subscription subscription;
131
132                /** Cached data item before readyToWrite. */
133                @Nullable
134                private T item;
135
136                /** Cached error signal before readyToWrite. */
137                @Nullable
138                private Throwable error;
139
140                /** Cached onComplete signal before readyToWrite. */
141                private boolean completed = false;
142
143                /** Recursive demand while emitting cached signals. */
144                private long demandBeforeReadyToWrite;
145
146                /** Current state. */
147                private State state = State.NEW;
148
149                /** The actual writeSubscriber from the HTTP server adapter. */
150                @Nullable
151                private Subscriber<? super T> writeSubscriber;
152
153
154                WriteBarrier(CoreSubscriber<? super Void> completionSubscriber) {
155                        this.writeCompletionBarrier = new WriteCompletionBarrier(completionSubscriber, this);
156                }
157
158
159                // Subscriber<T> methods (we're the subscriber to the write source)..
160
161                @Override
162                public final void onSubscribe(Subscription s) {
163                        if (Operators.validate(this.subscription, s)) {
164                                this.subscription = s;
165                                this.writeCompletionBarrier.connect();
166                                s.request(1);
167                        }
168                }
169
170                @Override
171                public final void onNext(T item) {
172                        if (this.state == State.READY_TO_WRITE) {
173                                requiredWriteSubscriber().onNext(item);
174                                return;
175                        }
176                        //FIXME revisit in case of reentrant sync deadlock
177                        synchronized (this) {
178                                if (this.state == State.READY_TO_WRITE) {
179                                        requiredWriteSubscriber().onNext(item);
180                                }
181                                else if (this.state == State.NEW) {
182                                        this.item = item;
183                                        this.state = State.FIRST_SIGNAL_RECEIVED;
184                                        Publisher<Void> result;
185                                        try {
186                                                result = writeFunction.apply(this);
187                                        }
188                                        catch (Throwable ex) {
189                                                this.writeCompletionBarrier.onError(ex);
190                                                return;
191                                        }
192                                        result.subscribe(this.writeCompletionBarrier);
193                                }
194                                else {
195                                        if (this.subscription != null) {
196                                                this.subscription.cancel();
197                                        }
198                                        this.writeCompletionBarrier.onError(new IllegalStateException("Unexpected item."));
199                                }
200                        }
201                }
202
203                private Subscriber<? super T> requiredWriteSubscriber() {
204                        Assert.state(this.writeSubscriber != null, "No write subscriber");
205                        return this.writeSubscriber;
206                }
207
208                @Override
209                public final void onError(Throwable ex) {
210                        if (this.state == State.READY_TO_WRITE) {
211                                requiredWriteSubscriber().onError(ex);
212                                return;
213                        }
214                        synchronized (this) {
215                                if (this.state == State.READY_TO_WRITE) {
216                                        requiredWriteSubscriber().onError(ex);
217                                }
218                                else if (this.state == State.NEW) {
219                                        this.state = State.FIRST_SIGNAL_RECEIVED;
220                                        this.writeCompletionBarrier.onError(ex);
221                                }
222                                else {
223                                        this.error = ex;
224                                }
225                        }
226                }
227
228                @Override
229                public final void onComplete() {
230                        if (this.state == State.READY_TO_WRITE) {
231                                requiredWriteSubscriber().onComplete();
232                                return;
233                        }
234                        synchronized (this) {
235                                if (this.state == State.READY_TO_WRITE) {
236                                        requiredWriteSubscriber().onComplete();
237                                }
238                                else if (this.state == State.NEW) {
239                                        this.completed = true;
240                                        this.state = State.FIRST_SIGNAL_RECEIVED;
241                                        Publisher<Void> result;
242                                        try {
243                                                result = writeFunction.apply(this);
244                                        }
245                                        catch (Throwable ex) {
246                                                this.writeCompletionBarrier.onError(ex);
247                                                return;
248                                        }
249                                        result.subscribe(this.writeCompletionBarrier);
250                                }
251                                else {
252                                        this.completed = true;
253                                }
254                        }
255                }
256
257                @Override
258                public Context currentContext() {
259                        return this.writeCompletionBarrier.currentContext();
260                }
261
262
263                // Subscription methods (we're the Subscription to the writeSubscriber)..
264
265                @Override
266                public void request(long n) {
267                        Subscription s = this.subscription;
268                        if (s == null) {
269                                return;
270                        }
271                        if (this.state == State.READY_TO_WRITE) {
272                                s.request(n);
273                                return;
274                        }
275                        synchronized (this) {
276                                if (this.writeSubscriber != null) {
277                                        if (this.state == State.EMITTING_CACHED_SIGNALS) {
278                                                this.demandBeforeReadyToWrite = n;
279                                                return;
280                                        }
281                                        try {
282                                                this.state = State.EMITTING_CACHED_SIGNALS;
283                                                if (emitCachedSignals()) {
284                                                        return;
285                                                }
286                                                n = n + this.demandBeforeReadyToWrite - 1;
287                                                if (n == 0) {
288                                                        return;
289                                                }
290                                        }
291                                        finally {
292                                                this.state = State.READY_TO_WRITE;
293                                        }
294                                }
295                        }
296                        s.request(n);
297                }
298
299                private boolean emitCachedSignals() {
300                        if (this.error != null) {
301                                try {
302                                        requiredWriteSubscriber().onError(this.error);
303                                }
304                                finally {
305                                        releaseCachedItem();
306                                }
307                                return true;
308                        }
309                        T item = this.item;
310                        this.item = null;
311                        if (item != null) {
312                                requiredWriteSubscriber().onNext(item);
313                        }
314                        if (this.completed) {
315                                requiredWriteSubscriber().onComplete();
316                                return true;
317                        }
318                        return false;
319                }
320
321                @Override
322                public void cancel() {
323                        Subscription s = this.subscription;
324                        if (s != null) {
325                                this.subscription = null;
326                                try {
327                                        s.cancel();
328                                }
329                                finally {
330                                        releaseCachedItem();
331                                }
332                        }
333                }
334
335                private void releaseCachedItem() {
336                        synchronized (this) {
337                                Object item = this.item;
338                                if (item instanceof DataBuffer) {
339                                        DataBufferUtils.release((DataBuffer) item);
340                                }
341                                this.item = null;
342                        }
343                }
344
345
346                // Publisher<T> methods (we're the Publisher to the writeSubscriber)..
347
348                @Override
349                public void subscribe(Subscriber<? super T> writeSubscriber) {
350                        synchronized (this) {
351                                Assert.state(this.writeSubscriber == null, "Only one write subscriber supported");
352                                this.writeSubscriber = writeSubscriber;
353                                if (this.error != null || this.completed) {
354                                        this.writeSubscriber.onSubscribe(Operators.emptySubscription());
355                                        emitCachedSignals();
356                                }
357                                else {
358                                        this.writeSubscriber.onSubscribe(this);
359                                }
360                        }
361                }
362        }
363
364
365        /**
366         * We need an extra barrier between the WriteBarrier itself and the actual
367         * completion subscriber.
368         *
369         * <p>The completionSubscriber is subscribed initially to the WriteBarrier.
370         * Later after the first signal is received, we need one more subscriber
371         * instance (per spec can only subscribe once) to subscribe to the write
372         * function and switch to delegating completion signals from it.
373         */
374        private class WriteCompletionBarrier implements CoreSubscriber<Void>, Subscription {
375
376                /* Downstream write completion subscriber */
377                private final CoreSubscriber<? super Void> completionSubscriber;
378
379                private final WriteBarrier writeBarrier;
380
381                @Nullable
382                private Subscription subscription;
383
384
385                public WriteCompletionBarrier(CoreSubscriber<? super Void> subscriber, WriteBarrier writeBarrier) {
386                        this.completionSubscriber = subscriber;
387                        this.writeBarrier = writeBarrier;
388                }
389
390
391                /**
392                 * Connect the underlying completion subscriber to this barrier in order
393                 * to track cancel signals and pass them on to the write barrier.
394                 */
395                public void connect() {
396                        this.completionSubscriber.onSubscribe(this);
397                }
398
399                // Subscriber methods (we're the subscriber to the write function)..
400
401                @Override
402                public void onSubscribe(Subscription subscription) {
403                        this.subscription = subscription;
404                        subscription.request(Long.MAX_VALUE);
405                }
406
407                @Override
408                public void onNext(Void aVoid) {
409                }
410
411                @Override
412                public void onError(Throwable ex) {
413                        try {
414                                this.completionSubscriber.onError(ex);
415                        }
416                        finally {
417                                this.writeBarrier.releaseCachedItem();
418                        }
419                }
420
421                @Override
422                public void onComplete() {
423                        this.completionSubscriber.onComplete();
424                }
425
426                @Override
427                public Context currentContext() {
428                        return this.completionSubscriber.currentContext();
429                }
430
431
432                @Override
433                public void request(long n) {
434                        // Ignore: we don't produce data
435                }
436
437                @Override
438                public void cancel() {
439                        this.writeBarrier.cancel();
440                        Subscription subscription = this.subscription;
441                        if (subscription != null) {
442                                subscription.cancel();
443                        }
444                }
445        }
446
447}