001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.http.server.reactive;
018
019import java.io.IOException;
020import java.util.concurrent.atomic.AtomicReference;
021
022import org.apache.commons.logging.Log;
023import org.reactivestreams.Processor;
024import org.reactivestreams.Subscriber;
025import org.reactivestreams.Subscription;
026
027import org.springframework.core.log.LogDelegateFactory;
028import org.springframework.lang.Nullable;
029import org.springframework.util.Assert;
030import org.springframework.util.StringUtils;
031
032/**
033 * Abstract base class for {@code Processor} implementations that bridge between
034 * event-listener write APIs and Reactive Streams.
035 *
036 * <p>Specifically a base class for writing to the HTTP response body with
037 * Servlet 3.1 non-blocking I/O and Undertow XNIO as well for writing WebSocket
038 * messages through the Java WebSocket API (JSR-356), Jetty, and Undertow.
039 *
040 * @author Arjen Poutsma
041 * @author Violeta Georgieva
042 * @author Rossen Stoyanchev
043 * @since 5.0
044 * @param <T> the type of element signaled to the {@link Subscriber}
045 */
046public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, Void> {
047
048        /**
049         * Special logger for debugging Reactive Streams signals.
050         * @see LogDelegateFactory#getHiddenLog(Class)
051         * @see AbstractListenerReadPublisher#rsReadLogger
052         * @see AbstractListenerWriteFlushProcessor#rsWriteFlushLogger
053         * @see WriteResultPublisher#rsWriteResultLogger
054         */
055        protected static final Log rsWriteLogger = LogDelegateFactory.getHiddenLog(AbstractListenerWriteProcessor.class);
056
057
058        private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
059
060        @Nullable
061        private Subscription subscription;
062
063        @Nullable
064        private volatile T currentData;
065
066        /* Indicates "onComplete" was received during the (last) write. */
067        private volatile boolean subscriberCompleted;
068
069        /**
070         * Indicates we're waiting for one last isReady-onWritePossible cycle
071         * after "onComplete" because some Servlet containers expect this to take
072         * place prior to calling AsyncContext.complete().
073         * See https://github.com/eclipse-ee4j/servlet-api/issues/273
074         */
075        private volatile boolean readyToCompleteAfterLastWrite;
076
077        private final WriteResultPublisher resultPublisher;
078
079        private final String logPrefix;
080
081
082        public AbstractListenerWriteProcessor() {
083                this("");
084        }
085
086        /**
087         * Create an instance with the given log prefix.
088         * @since 5.1
089         */
090        public AbstractListenerWriteProcessor(String logPrefix) {
091                this.logPrefix = (StringUtils.hasText(logPrefix) ? logPrefix : "");
092                this.resultPublisher = new WriteResultPublisher(logPrefix);
093        }
094
095
096        /**
097         * Get the configured log prefix.
098         * @since 5.1
099         */
100        public String getLogPrefix() {
101                return this.logPrefix;
102        }
103
104
105        // Subscriber methods and async I/O notification methods...
106
107        @Override
108        public final void onSubscribe(Subscription subscription) {
109                this.state.get().onSubscribe(this, subscription);
110        }
111
112        @Override
113        public final void onNext(T data) {
114                if (rsWriteLogger.isTraceEnabled()) {
115                        rsWriteLogger.trace(getLogPrefix() + "Item to write");
116                }
117                this.state.get().onNext(this, data);
118        }
119
120        /**
121         * Error signal from the upstream, write Publisher. This is also used by
122         * sub-classes to delegate error notifications from the container.
123         */
124        @Override
125        public final void onError(Throwable ex) {
126                if (rsWriteLogger.isTraceEnabled()) {
127                        rsWriteLogger.trace(getLogPrefix() + "Write source error: " + ex);
128                }
129                this.state.get().onError(this, ex);
130        }
131
132        /**
133         * Completion signal from the upstream, write Publisher. This is also used
134         * by sub-classes to delegate completion notifications from the container.
135         */
136        @Override
137        public final void onComplete() {
138                if (rsWriteLogger.isTraceEnabled()) {
139                        rsWriteLogger.trace(getLogPrefix() + "No more items to write");
140                }
141                this.state.get().onComplete(this);
142        }
143
144        /**
145         * Invoked when writing is possible, either in the same thread after a check
146         * via {@link #isWritePossible()}, or as a callback from the underlying
147         * container.
148         */
149        public final void onWritePossible() {
150                if (rsWriteLogger.isTraceEnabled()) {
151                        rsWriteLogger.trace(getLogPrefix() + "onWritePossible");
152                }
153                this.state.get().onWritePossible(this);
154        }
155
156        /**
157         * Invoked during an error or completion callback from the underlying
158         * container to cancel the upstream subscription.
159         */
160        public void cancel() {
161                rsWriteLogger.trace(getLogPrefix() + "Cancellation");
162                if (this.subscription != null) {
163                        this.subscription.cancel();
164                }
165        }
166
167        // Publisher implementation for result notifications...
168
169        @Override
170        public final void subscribe(Subscriber<? super Void> subscriber) {
171                // Technically, cancellation from the result subscriber should be propagated
172                // to the upstream subscription. In practice, HttpHandler server adapters
173                // don't have a reason to cancel the result subscription.
174                this.resultPublisher.subscribe(subscriber);
175        }
176
177
178        // Write API methods to be implemented or template methods to override...
179
180        /**
181         * Whether the given data item has any content to write.
182         * If false the item is not written.
183         */
184        protected abstract boolean isDataEmpty(T data);
185
186        /**
187         * Template method invoked after a data item to write is received via
188         * {@link Subscriber#onNext(Object)}. The default implementation saves the
189         * data item for writing once that is possible.
190         */
191        protected void dataReceived(T data) {
192                T prev = this.currentData;
193                if (prev != null) {
194                        // This shouldn't happen:
195                        //   1. dataReceived can only be called from REQUESTED state
196                        //   2. currentData is cleared before requesting
197                        discardData(data);
198                        cancel();
199                        onError(new IllegalStateException("Received new data while current not processed yet."));
200                }
201                this.currentData = data;
202        }
203
204        /**
205         * Whether writing is possible.
206         */
207        protected abstract boolean isWritePossible();
208
209        /**
210         * Write the given item.
211         * <p><strong>Note:</strong> Sub-classes are responsible for releasing any
212         * data buffer associated with the item, once fully written, if pooled
213         * buffers apply to the underlying container.
214         * @param data the item to write
215         * @return {@code true} if the current data item was written completely and
216         * a new item requested, or {@code false} if it was written partially and
217         * we'll need more write callbacks before it is fully written
218         */
219        protected abstract boolean write(T data) throws IOException;
220
221        /**
222         * Invoked after the current data has been written and before requesting
223         * the next item from the upstream, write Publisher.
224         * <p>The default implementation is a no-op.
225         * @deprecated originally introduced for Undertow to stop write notifications
226         * when no data is available, but deprecated as of as of 5.0.6 since constant
227         * switching on every requested item causes a significant slowdown.
228         */
229        @Deprecated
230        protected void writingPaused() {
231        }
232
233        /**
234         * Invoked after onComplete or onError notification.
235         * <p>The default implementation is a no-op.
236         */
237        protected void writingComplete() {
238        }
239
240        /**
241         * Invoked when an I/O error occurs during a write. Sub-classes may choose
242         * to ignore this if they know the underlying API will provide an error
243         * notification in a container thread.
244         * <p>Defaults to no-op.
245         */
246        protected void writingFailed(Throwable ex) {
247        }
248
249        /**
250         * Invoked after any error (either from the upstream write Publisher, or
251         * from I/O operations to the underlying server) and cancellation
252         * to discard in-flight data that was in
253         * the process of being written when the error took place.
254         * @param data the data to be released
255         * @since 5.0.11
256         */
257        protected abstract void discardData(T data);
258
259
260        // Private methods for use from State's...
261
262        private boolean changeState(State oldState, State newState) {
263                boolean result = this.state.compareAndSet(oldState, newState);
264                if (result && rsWriteLogger.isTraceEnabled()) {
265                        rsWriteLogger.trace(getLogPrefix() + oldState + " -> " + newState);
266                }
267                return result;
268        }
269
270        private void changeStateToReceived(State oldState) {
271                if (changeState(oldState, State.RECEIVED)) {
272                        writeIfPossible();
273                }
274        }
275
276        private void changeStateToComplete(State oldState) {
277                if (changeState(oldState, State.COMPLETED)) {
278                        discardCurrentData();
279                        writingComplete();
280                        this.resultPublisher.publishComplete();
281                }
282                else {
283                        this.state.get().onComplete(this);
284                }
285        }
286
287        private void writeIfPossible() {
288                boolean result = isWritePossible();
289                if (!result && rsWriteLogger.isTraceEnabled()) {
290                        rsWriteLogger.trace(getLogPrefix() + "isWritePossible: false");
291                }
292                if (result) {
293                        onWritePossible();
294                }
295        }
296
297        private void discardCurrentData() {
298                T data = this.currentData;
299                this.currentData = null;
300                if (data != null) {
301                        discardData(data);
302                }
303        }
304
305
306        /**
307         * Represents a state for the {@link Processor} to be in.
308         *
309         * <p><pre>
310         *        UNSUBSCRIBED
311         *             |
312         *             v
313         *   +--- REQUESTED -------------> RECEIVED ---+
314         *   |        ^                       ^        |
315         *   |        |                       |        |
316         *   |        + ------ WRITING <------+        |
317         *   |                    |                    |
318         *   |                    v                    |
319         *   +--------------> COMPLETED <--------------+
320         * </pre>
321         */
322        private enum State {
323
324                UNSUBSCRIBED {
325                        @Override
326                        public <T> void onSubscribe(AbstractListenerWriteProcessor<T> processor, Subscription subscription) {
327                                Assert.notNull(subscription, "Subscription must not be null");
328                                if (processor.changeState(this, REQUESTED)) {
329                                        processor.subscription = subscription;
330                                        subscription.request(1);
331                                }
332                                else {
333                                        super.onSubscribe(processor, subscription);
334                                }
335                        }
336
337                        @Override
338                        public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
339                                // This can happen on (very early) completion notification from container..
340                                processor.changeStateToComplete(this);
341                        }
342                },
343
344                REQUESTED {
345                        @Override
346                        public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) {
347                                if (processor.isDataEmpty(data)) {
348                                        Assert.state(processor.subscription != null, "No subscription");
349                                        processor.subscription.request(1);
350                                }
351                                else {
352                                        processor.dataReceived(data);
353                                        processor.changeStateToReceived(this);
354                                }
355                        }
356                        @Override
357                        public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
358                                processor.readyToCompleteAfterLastWrite = true;
359                                processor.changeStateToReceived(this);
360                        }
361                },
362
363                RECEIVED {
364                        @SuppressWarnings("deprecation")
365                        @Override
366                        public <T> void onWritePossible(AbstractListenerWriteProcessor<T> processor) {
367                                if (processor.readyToCompleteAfterLastWrite) {
368                                        processor.changeStateToComplete(RECEIVED);
369                                }
370                                else if (processor.changeState(this, WRITING)) {
371                                        T data = processor.currentData;
372                                        Assert.state(data != null, "No data");
373                                        try {
374                                                if (processor.write(data)) {
375                                                        if (processor.changeState(WRITING, REQUESTED)) {
376                                                                processor.currentData = null;
377                                                                if (processor.subscriberCompleted) {
378                                                                        processor.readyToCompleteAfterLastWrite = true;
379                                                                        processor.changeStateToReceived(REQUESTED);
380                                                                }
381                                                                else {
382                                                                        processor.writingPaused();
383                                                                        Assert.state(processor.subscription != null, "No subscription");
384                                                                        processor.subscription.request(1);
385                                                                }
386                                                        }
387                                                }
388                                                else {
389                                                        processor.changeStateToReceived(WRITING);
390                                                }
391                                        }
392                                        catch (IOException ex) {
393                                                processor.writingFailed(ex);
394                                        }
395                                }
396                        }
397
398                        @Override
399                        public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
400                                processor.subscriberCompleted = true;
401                                // A competing write might have completed very quickly
402                                if (processor.state.get().equals(State.REQUESTED)) {
403                                        processor.changeStateToComplete(State.REQUESTED);
404                                }
405                        }
406                },
407
408                WRITING {
409                        @Override
410                        public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
411                                processor.subscriberCompleted = true;
412                                // A competing write might have completed very quickly
413                                if (processor.state.get().equals(State.REQUESTED)) {
414                                        processor.changeStateToComplete(State.REQUESTED);
415                                }
416                        }
417                },
418
419                COMPLETED {
420                        @Override
421                        public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) {
422                                // ignore
423                        }
424                        @Override
425                        public <T> void onError(AbstractListenerWriteProcessor<T> processor, Throwable ex) {
426                                // ignore
427                        }
428                        @Override
429                        public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
430                                // ignore
431                        }
432                };
433
434                public <T> void onSubscribe(AbstractListenerWriteProcessor<T> processor, Subscription subscription) {
435                        subscription.cancel();
436                }
437
438                public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) {
439                        processor.discardData(data);
440                        processor.cancel();
441                        processor.onError(new IllegalStateException("Illegal onNext without demand"));
442                }
443
444                public <T> void onError(AbstractListenerWriteProcessor<T> processor, Throwable ex) {
445                        if (processor.changeState(this, COMPLETED)) {
446                                processor.discardCurrentData();
447                                processor.writingComplete();
448                                processor.resultPublisher.publishError(ex);
449                        }
450                        else {
451                                processor.state.get().onError(processor, ex);
452                        }
453                }
454
455                public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
456                        throw new IllegalStateException(toString());
457                }
458
459                public <T> void onWritePossible(AbstractListenerWriteProcessor<T> processor) {
460                        // ignore
461                }
462        }
463
464}