001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.http.server.reactive;
018
019import java.io.IOException;
020import java.util.concurrent.atomic.AtomicReference;
021
022import org.apache.commons.logging.Log;
023import org.reactivestreams.Processor;
024import org.reactivestreams.Publisher;
025import org.reactivestreams.Subscriber;
026import org.reactivestreams.Subscription;
027
028import org.springframework.core.log.LogDelegateFactory;
029import org.springframework.lang.Nullable;
030import org.springframework.util.Assert;
031
032/**
033 * An alternative to {@link AbstractListenerWriteProcessor} but instead writing
034 * a {@code Publisher<Publisher<T>>} with flush boundaries enforces after
035 * the completion of each nested Publisher.
036 *
037 * @author Arjen Poutsma
038 * @author Violeta Georgieva
039 * @author Rossen Stoyanchev
040 * @since 5.0
041 * @param <T> the type of element signaled to the {@link Subscriber}
042 */
043public abstract class AbstractListenerWriteFlushProcessor<T> implements Processor<Publisher<? extends T>, Void> {
044
045        /**
046         * Special logger for debugging Reactive Streams signals.
047         * @see LogDelegateFactory#getHiddenLog(Class)
048         * @see AbstractListenerReadPublisher#rsReadLogger
049         * @see AbstractListenerWriteProcessor#rsWriteLogger
050         * @see WriteResultPublisher#rsWriteResultLogger
051         */
052        protected static final Log rsWriteFlushLogger =
053                        LogDelegateFactory.getHiddenLog(AbstractListenerWriteFlushProcessor.class);
054
055
056        private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
057
058        @Nullable
059        private Subscription subscription;
060
061        private volatile boolean subscriberCompleted;
062
063        private final WriteResultPublisher resultPublisher;
064
065        private final String logPrefix;
066
067
068        public AbstractListenerWriteFlushProcessor() {
069                this("");
070        }
071
072        /**
073         * Create an instance with the given log prefix.
074         * @since 5.1
075         */
076        public AbstractListenerWriteFlushProcessor(String logPrefix) {
077                this.logPrefix = logPrefix;
078                this.resultPublisher = new WriteResultPublisher(logPrefix);
079        }
080
081
082        /**
083         * Create an instance with the given log prefix.
084         * @since 5.1
085         */
086        public String getLogPrefix() {
087                return this.logPrefix;
088        }
089
090
091        // Subscriber methods and async I/O notification methods...
092
093        @Override
094        public final void onSubscribe(Subscription subscription) {
095                this.state.get().onSubscribe(this, subscription);
096        }
097
098        @Override
099        public final void onNext(Publisher<? extends T> publisher) {
100                if (rsWriteFlushLogger.isTraceEnabled()) {
101                        rsWriteFlushLogger.trace(getLogPrefix() + "Received onNext publisher");
102                }
103                this.state.get().onNext(this, publisher);
104        }
105
106        /**
107         * Error signal from the upstream, write Publisher. This is also used by
108         * sub-classes to delegate error notifications from the container.
109         */
110        @Override
111        public final void onError(Throwable ex) {
112                if (rsWriteFlushLogger.isTraceEnabled()) {
113                        rsWriteFlushLogger.trace(getLogPrefix() + "Received onError: " + ex);
114                }
115                this.state.get().onError(this, ex);
116        }
117
118        /**
119         * Completion signal from the upstream, write Publisher. This is also used
120         * by sub-classes to delegate completion notifications from the container.
121         */
122        @Override
123        public final void onComplete() {
124                if (rsWriteFlushLogger.isTraceEnabled()) {
125                        rsWriteFlushLogger.trace(getLogPrefix() + "Received onComplete");
126                }
127                this.state.get().onComplete(this);
128        }
129
130        /**
131         * Invoked when flushing is possible, either in the same thread after a check
132         * via {@link #isWritePossible()}, or as a callback from the underlying
133         * container.
134         */
135        protected final void onFlushPossible() {
136                this.state.get().onFlushPossible(this);
137        }
138
139        /**
140         * Invoked during an error or completion callback from the underlying
141         * container to cancel the upstream subscription.
142         */
143        protected void cancel() {
144                if (rsWriteFlushLogger.isTraceEnabled()) {
145                        rsWriteFlushLogger.trace(getLogPrefix() + "Received request to cancel");
146                }
147                if (this.subscription != null) {
148                        this.subscription.cancel();
149                }
150        }
151
152
153        // Publisher implementation for result notifications...
154
155        @Override
156        public final void subscribe(Subscriber<? super Void> subscriber) {
157                this.resultPublisher.subscribe(subscriber);
158        }
159
160
161        // Write API methods to be implemented or template methods to override...
162
163        /**
164         * Create a new processor for the current flush boundary.
165         */
166        protected abstract Processor<? super T, Void> createWriteProcessor();
167
168        /**
169         * Whether writing/flushing is possible.
170         */
171        protected abstract boolean isWritePossible();
172
173        /**
174         * Flush the output if ready, or otherwise {@link #isFlushPending()} should
175         * return true after.
176         * <p>This is primarily for the Servlet non-blocking I/O API where flush
177         * cannot be called without a readyToWrite check.
178         */
179        protected abstract void flush() throws IOException;
180
181        /**
182         * Whether flushing is pending.
183         * <p>This is primarily for the Servlet non-blocking I/O API where flush
184         * cannot be called without a readyToWrite check.
185         */
186        protected abstract boolean isFlushPending();
187
188        /**
189         * Invoked when an error happens while flushing. Sub-classes may choose
190         * to ignore this if they know the underlying API will provide an error
191         * notification in a container thread.
192         * <p>Defaults to no-op.
193         */
194        protected void flushingFailed(Throwable t) {
195        }
196
197
198        // Private methods for use in State...
199
200        private boolean changeState(State oldState, State newState) {
201                boolean result = this.state.compareAndSet(oldState, newState);
202                if (result && rsWriteFlushLogger.isTraceEnabled()) {
203                        rsWriteFlushLogger.trace(getLogPrefix() + oldState + " -> " + newState);
204                }
205                return result;
206        }
207
208        private void flushIfPossible() {
209                boolean result = isWritePossible();
210                if (rsWriteFlushLogger.isTraceEnabled()) {
211                        rsWriteFlushLogger.trace(getLogPrefix() + "isWritePossible[" + result + "]");
212                }
213                if (result) {
214                        onFlushPossible();
215                }
216        }
217
218
219        /**
220         * Represents a state for the {@link Processor} to be in.
221         *
222         * <p><pre>
223         *       UNSUBSCRIBED
224         *            |
225         *            v
226         *        REQUESTED <---> RECEIVED ------+
227         *            |              |           |
228         *            |              v           |
229         *            |           FLUSHING       |
230         *            |              |           |
231         *            |              v           |
232         *            +--------> COMPLETED <-----+
233         * </pre>
234         */
235        private enum State {
236
237                UNSUBSCRIBED {
238                        @Override
239                        public <T> void onSubscribe(AbstractListenerWriteFlushProcessor<T> processor, Subscription subscription) {
240                                Assert.notNull(subscription, "Subscription must not be null");
241                                if (processor.changeState(this, REQUESTED)) {
242                                        processor.subscription = subscription;
243                                        subscription.request(1);
244                                }
245                                else {
246                                        super.onSubscribe(processor, subscription);
247                                }
248                        }
249
250                        @Override
251                        public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
252                                // This can happen on (very early) completion notification from container..
253                                if (processor.changeState(this, COMPLETED)) {
254                                        processor.resultPublisher.publishComplete();
255                                }
256                                else {
257                                        processor.state.get().onComplete(processor);
258                                }
259                        }
260                },
261
262                REQUESTED {
263                        @Override
264                        public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor,
265                                        Publisher<? extends T> currentPublisher) {
266
267                                if (processor.changeState(this, RECEIVED)) {
268                                        Processor<? super T, Void> currentProcessor = processor.createWriteProcessor();
269                                        currentPublisher.subscribe(currentProcessor);
270                                        currentProcessor.subscribe(new WriteResultSubscriber(processor));
271                                }
272                        }
273                        @Override
274                        public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
275                                if (processor.changeState(this, COMPLETED)) {
276                                        processor.resultPublisher.publishComplete();
277                                }
278                                else {
279                                        processor.state.get().onComplete(processor);
280                                }
281                        }
282                },
283
284                RECEIVED {
285                        @Override
286                        public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor) {
287                                try {
288                                        processor.flush();
289                                }
290                                catch (Throwable ex) {
291                                        processor.flushingFailed(ex);
292                                        return;
293                                }
294                                if (processor.changeState(this, REQUESTED)) {
295                                        if (processor.subscriberCompleted) {
296                                                handleSubscriberCompleted(processor);
297                                        }
298                                        else {
299                                                Assert.state(processor.subscription != null, "No subscription");
300                                                processor.subscription.request(1);
301                                        }
302                                }
303                        }
304                        @Override
305                        public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
306                                processor.subscriberCompleted = true;
307                                // A competing write might have completed very quickly
308                                if (processor.state.get().equals(State.REQUESTED)) {
309                                        handleSubscriberCompleted(processor);
310                                }
311                        }
312
313                        private <T> void handleSubscriberCompleted(AbstractListenerWriteFlushProcessor<T> processor) {
314                                if (processor.isFlushPending()) {
315                                        // Ensure the final flush
316                                        processor.changeState(State.REQUESTED, State.FLUSHING);
317                                        processor.flushIfPossible();
318                                }
319                                else if (processor.changeState(State.REQUESTED, State.COMPLETED)) {
320                                        processor.resultPublisher.publishComplete();
321                                }
322                                else {
323                                        processor.state.get().onComplete(processor);
324                                }
325                        }
326                },
327
328                FLUSHING {
329                        @Override
330                        public <T> void onFlushPossible(AbstractListenerWriteFlushProcessor<T> processor) {
331                                try {
332                                        processor.flush();
333                                }
334                                catch (Throwable ex) {
335                                        processor.flushingFailed(ex);
336                                        return;
337                                }
338                                if (processor.changeState(this, COMPLETED)) {
339                                        processor.resultPublisher.publishComplete();
340                                }
341                                else {
342                                        processor.state.get().onComplete(processor);
343                                }
344                        }
345                        @Override
346                        public <T> void onNext(AbstractListenerWriteFlushProcessor<T> proc, Publisher<? extends T> pub) {
347                                // ignore
348                        }
349                        @Override
350                        public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
351                                // ignore
352                        }
353                },
354
355                COMPLETED {
356                        @Override
357                        public <T> void onNext(AbstractListenerWriteFlushProcessor<T> proc, Publisher<? extends T> pub) {
358                                // ignore
359                        }
360                        @Override
361                        public <T> void onError(AbstractListenerWriteFlushProcessor<T> processor, Throwable t) {
362                                // ignore
363                        }
364                        @Override
365                        public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
366                                // ignore
367                        }
368                };
369
370
371                public <T> void onSubscribe(AbstractListenerWriteFlushProcessor<T> proc, Subscription subscription) {
372                        subscription.cancel();
373                }
374
375                public <T> void onNext(AbstractListenerWriteFlushProcessor<T> proc, Publisher<? extends T> pub) {
376                        throw new IllegalStateException(toString());
377                }
378
379                public <T> void onError(AbstractListenerWriteFlushProcessor<T> processor, Throwable ex) {
380                        if (processor.changeState(this, COMPLETED)) {
381                                processor.resultPublisher.publishError(ex);
382                        }
383                        else {
384                                processor.state.get().onError(processor, ex);
385                        }
386                }
387
388                public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
389                        throw new IllegalStateException(toString());
390                }
391
392                public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor) {
393                        throw new IllegalStateException(toString());
394                }
395
396                public <T> void onFlushPossible(AbstractListenerWriteFlushProcessor<T> processor) {
397                        // ignore
398                }
399
400
401                /**
402                 * Subscriber to receive and delegate completion notifications for from
403                 * the current Publisher, i.e. for the current flush boundary.
404                 */
405                private static class WriteResultSubscriber implements Subscriber<Void> {
406
407                        private final AbstractListenerWriteFlushProcessor<?> processor;
408
409
410                        public WriteResultSubscriber(AbstractListenerWriteFlushProcessor<?> processor) {
411                                this.processor = processor;
412                        }
413
414                        @Override
415                        public void onSubscribe(Subscription subscription) {
416                                subscription.request(Long.MAX_VALUE);
417                        }
418
419                        @Override
420                        public void onNext(Void aVoid) {
421                        }
422
423                        @Override
424                        public void onError(Throwable ex) {
425                                this.processor.cancel();
426                                this.processor.onError(ex);
427                        }
428
429                        @Override
430                        public void onComplete() {
431                                if (rsWriteFlushLogger.isTraceEnabled()) {
432                                        rsWriteFlushLogger.trace(this.processor.getLogPrefix() + this.processor.state + " writeComplete");
433                                }
434                                this.processor.state.get().writeComplete(this.processor);
435                        }
436                }
437        }
438
439}