001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.http.server.reactive;
018
019import java.io.IOException;
020import java.util.concurrent.atomic.AtomicLongFieldUpdater;
021import java.util.concurrent.atomic.AtomicReference;
022
023import org.apache.commons.logging.Log;
024import org.reactivestreams.Publisher;
025import org.reactivestreams.Subscriber;
026import org.reactivestreams.Subscription;
027import reactor.core.publisher.Operators;
028
029import org.springframework.core.log.LogDelegateFactory;
030import org.springframework.lang.Nullable;
031import org.springframework.util.Assert;
032
033/**
034 * Abstract base class for {@code Publisher} implementations that bridge between
035 * event-listener read APIs and Reactive Streams.
036 *
037 * <p>Specifically a base class for reading from the HTTP request body with
038 * Servlet 3.1 non-blocking I/O and Undertow XNIO as well as handling incoming
039 * WebSocket messages with standard Java WebSocket (JSR-356), Jetty, and
040 * Undertow.
041 *
042 * @author Arjen Poutsma
043 * @author Violeta Georgieva
044 * @author Rossen Stoyanchev
045 * @since 5.0
046 * @param <T> the type of element signaled
047 */
048public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
049
050        /**
051         * Special logger for debugging Reactive Streams signals.
052         * @see LogDelegateFactory#getHiddenLog(Class)
053         * @see AbstractListenerWriteProcessor#rsWriteLogger
054         * @see AbstractListenerWriteFlushProcessor#rsWriteFlushLogger
055         * @see WriteResultPublisher#rsWriteResultLogger
056         */
057        protected static Log rsReadLogger = LogDelegateFactory.getHiddenLog(AbstractListenerReadPublisher.class);
058
059
060        private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
061
062        private volatile long demand;
063
064        @SuppressWarnings("rawtypes")
065        private static final AtomicLongFieldUpdater<AbstractListenerReadPublisher> DEMAND_FIELD_UPDATER =
066                        AtomicLongFieldUpdater.newUpdater(AbstractListenerReadPublisher.class, "demand");
067
068        @Nullable
069        private volatile Subscriber<? super T> subscriber;
070
071        private volatile boolean completionBeforeDemand;
072
073        @Nullable
074        private volatile Throwable errorBeforeDemand;
075
076        private final String logPrefix;
077
078
079        public AbstractListenerReadPublisher() {
080                this("");
081        }
082
083        /**
084         * Create an instance with the given log prefix.
085         * @since 5.1
086         */
087        public AbstractListenerReadPublisher(String logPrefix) {
088                this.logPrefix = logPrefix;
089        }
090
091
092        /**
093         * Return the configured log message prefix.
094         * @since 5.1
095         */
096        public String getLogPrefix() {
097                return this.logPrefix;
098        }
099
100
101        // Publisher implementation...
102
103        @Override
104        public void subscribe(Subscriber<? super T> subscriber) {
105                this.state.get().subscribe(this, subscriber);
106        }
107
108
109        // Async I/O notification methods...
110
111        /**
112         * Invoked when reading is possible, either in the same thread after a check
113         * via {@link #checkOnDataAvailable()}, or as a callback from the underlying
114         * container.
115         */
116        public final void onDataAvailable() {
117                rsReadLogger.trace(getLogPrefix() + "onDataAvailable");
118                this.state.get().onDataAvailable(this);
119        }
120
121        /**
122         * Sub-classes can call this method to delegate a contain notification when
123         * all data has been read.
124         */
125        public void onAllDataRead() {
126                rsReadLogger.trace(getLogPrefix() + "onAllDataRead");
127                this.state.get().onAllDataRead(this);
128        }
129
130        /**
131         * Sub-classes can call this to delegate container error notifications.
132         */
133        public final void onError(Throwable ex) {
134                if (rsReadLogger.isTraceEnabled()) {
135                        rsReadLogger.trace(getLogPrefix() + "Connection error: " + ex);
136                }
137                this.state.get().onError(this, ex);
138        }
139
140
141        // Read API methods to be implemented or template methods to override...
142
143        /**
144         * Check if data is available and either call {@link #onDataAvailable()}
145         * immediately or schedule a notification.
146         */
147        protected abstract void checkOnDataAvailable();
148
149        /**
150         * Read once from the input, if possible.
151         * @return the item that was read; or {@code null}
152         */
153        @Nullable
154        protected abstract T read() throws IOException;
155
156        /**
157         * Invoked when reading is paused due to a lack of demand.
158         * <p><strong>Note:</strong> This method is guaranteed not to compete with
159         * {@link #checkOnDataAvailable()} so it can be used to safely suspend
160         * reading, if the underlying API supports it, i.e. without competing with
161         * an implicit call to resume via {@code checkOnDataAvailable()}.
162         * @since 5.0.2
163         */
164        protected abstract void readingPaused();
165
166        /**
167         * Invoked after an I/O read error from the underlying server or after a
168         * cancellation signal from the downstream consumer to allow sub-classes
169         * to discard any current cached data they might have.
170         * @since 5.0.11
171         */
172        protected abstract void discardData();
173
174
175        // Private methods for use in State...
176
177        /**
178         * Read and publish data one at a time until there is no more data, no more
179         * demand, or perhaps we completed in the mean time.
180         * @return {@code true} if there is more demand; {@code false} if there is
181         * no more demand or we have completed.
182         */
183        private boolean readAndPublish() throws IOException {
184                long r;
185                while ((r = this.demand) > 0 && !this.state.get().equals(State.COMPLETED)) {
186                        T data = read();
187                        if (data != null) {
188                                if (r != Long.MAX_VALUE) {
189                                        DEMAND_FIELD_UPDATER.addAndGet(this, -1L);
190                                }
191                                Subscriber<? super T> subscriber = this.subscriber;
192                                Assert.state(subscriber != null, "No subscriber");
193                                if (rsReadLogger.isTraceEnabled()) {
194                                        rsReadLogger.trace(getLogPrefix() + "Publishing data read");
195                                }
196                                subscriber.onNext(data);
197                        }
198                        else {
199                                if (rsReadLogger.isTraceEnabled()) {
200                                        rsReadLogger.trace(getLogPrefix() + "No more data to read");
201                                }
202                                return true;
203                        }
204                }
205                return false;
206        }
207
208        private boolean changeState(State oldState, State newState) {
209                boolean result = this.state.compareAndSet(oldState, newState);
210                if (result && rsReadLogger.isTraceEnabled()) {
211                        rsReadLogger.trace(getLogPrefix() + oldState + " -> " + newState);
212                }
213                return result;
214        }
215
216        private void changeToDemandState(State oldState) {
217                if (changeState(oldState, State.DEMAND)) {
218                        // Protect from infinite recursion in Undertow, where we can't check if data
219                        // is available, so all we can do is to try to read.
220                        // Generally, no need to check if we just came out of readAndPublish()...
221                        if (!oldState.equals(State.READING)) {
222                                checkOnDataAvailable();
223                        }
224                }
225        }
226
227        private void handleCompletionOrErrorBeforeDemand() {
228                State state = this.state.get();
229                if (!state.equals(State.UNSUBSCRIBED) && !state.equals(State.SUBSCRIBING)) {
230                        if (this.completionBeforeDemand) {
231                                rsReadLogger.trace(getLogPrefix() + "Completed before demand");
232                                this.state.get().onAllDataRead(this);
233                        }
234                        Throwable ex = this.errorBeforeDemand;
235                        if (ex != null) {
236                                if (rsReadLogger.isTraceEnabled()) {
237                                        rsReadLogger.trace(getLogPrefix() + "Completed with error before demand: " + ex);
238                                }
239                                this.state.get().onError(this, ex);
240                        }
241                }
242        }
243
244        private Subscription createSubscription() {
245                return new ReadSubscription();
246        }
247
248
249        /**
250         * Subscription that delegates signals to State.
251         */
252        private final class ReadSubscription implements Subscription {
253
254
255                @Override
256                public final void request(long n) {
257                        if (rsReadLogger.isTraceEnabled()) {
258                                rsReadLogger.trace(getLogPrefix() + n + " requested");
259                        }
260                        state.get().request(AbstractListenerReadPublisher.this, n);
261                }
262
263                @Override
264                public final void cancel() {
265                        if (rsReadLogger.isTraceEnabled()) {
266                                rsReadLogger.trace(getLogPrefix() + "Cancellation");
267                        }
268                        state.get().cancel(AbstractListenerReadPublisher.this);
269                }
270        }
271
272
273        /**
274         * Represents a state for the {@link Publisher} to be in.
275         * <p><pre>
276         *        UNSUBSCRIBED
277         *             |
278         *             v
279         *        SUBSCRIBING
280         *             |
281         *             v
282         *    +---- NO_DEMAND ---------------> DEMAND ---+
283         *    |        ^                         ^       |
284         *    |        |                         |       |
285         *    |        +------- READING <--------+       |
286         *    |                    |                     |
287         *    |                    v                     |
288         *    +--------------> COMPLETED <---------------+
289         * </pre>
290         */
291        private enum State {
292
293                UNSUBSCRIBED {
294                        @Override
295                        <T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? super T> subscriber) {
296                                Assert.notNull(publisher, "Publisher must not be null");
297                                Assert.notNull(subscriber, "Subscriber must not be null");
298                                if (publisher.changeState(this, SUBSCRIBING)) {
299                                        Subscription subscription = publisher.createSubscription();
300                                        publisher.subscriber = subscriber;
301                                        subscriber.onSubscribe(subscription);
302                                        publisher.changeState(SUBSCRIBING, NO_DEMAND);
303                                        publisher.handleCompletionOrErrorBeforeDemand();
304                                }
305                                else {
306                                        throw new IllegalStateException("Failed to transition to SUBSCRIBING, " +
307                                                        "subscriber: " + subscriber);
308                                }
309                        }
310
311                        @Override
312                        <T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
313                                publisher.completionBeforeDemand = true;
314                                publisher.handleCompletionOrErrorBeforeDemand();
315                        }
316
317                        @Override
318                        <T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
319                                publisher.errorBeforeDemand = ex;
320                                publisher.handleCompletionOrErrorBeforeDemand();
321                        }
322                },
323
324                /**
325                 * Very brief state where we know we have a Subscriber but must not
326                 * send onComplete and onError until we after onSubscribe.
327                 */
328                SUBSCRIBING {
329                        @Override
330                        <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
331                                if (Operators.validate(n)) {
332                                        Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n);
333                                        publisher.changeToDemandState(this);
334                                }
335                        }
336
337                        @Override
338                        <T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
339                                publisher.completionBeforeDemand = true;
340                                publisher.handleCompletionOrErrorBeforeDemand();
341                        }
342
343                        @Override
344                        <T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
345                                publisher.errorBeforeDemand = ex;
346                                publisher.handleCompletionOrErrorBeforeDemand();
347                        }
348                },
349
350                NO_DEMAND {
351                        @Override
352                        <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
353                                if (Operators.validate(n)) {
354                                        Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n);
355                                        publisher.changeToDemandState(this);
356                                }
357                        }
358                },
359
360                DEMAND {
361                        @Override
362                        <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
363                                if (Operators.validate(n)) {
364                                        Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n);
365                                        // Did a concurrent read transition to NO_DEMAND just before us?
366                                        publisher.changeToDemandState(NO_DEMAND);
367                                }
368                        }
369
370                        @Override
371                        <T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
372                                if (publisher.changeState(this, READING)) {
373                                        try {
374                                                boolean demandAvailable = publisher.readAndPublish();
375                                                if (demandAvailable) {
376                                                        publisher.changeToDemandState(READING);
377                                                }
378                                                else {
379                                                        publisher.readingPaused();
380                                                        if (publisher.changeState(READING, NO_DEMAND)) {
381                                                                // Demand may have arrived since readAndPublish returned
382                                                                long r = publisher.demand;
383                                                                if (r > 0) {
384                                                                        publisher.changeToDemandState(NO_DEMAND);
385                                                                }
386                                                        }
387                                                }
388                                        }
389                                        catch (IOException ex) {
390                                                publisher.onError(ex);
391                                        }
392                                }
393                                // Else, either competing onDataAvailable (request vs container), or concurrent completion
394                        }
395                },
396
397                READING {
398                        @Override
399                        <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
400                                if (Operators.validate(n)) {
401                                        Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n);
402                                        // Did a concurrent read transition to NO_DEMAND just before us?
403                                        publisher.changeToDemandState(NO_DEMAND);
404                                }
405                        }
406                },
407
408                COMPLETED {
409                        @Override
410                        <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
411                                // ignore
412                        }
413                        @Override
414                        <T> void cancel(AbstractListenerReadPublisher<T> publisher) {
415                                // ignore
416                        }
417                        @Override
418                        <T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
419                                // ignore
420                        }
421                        @Override
422                        <T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) {
423                                // ignore
424                        }
425                };
426
427                <T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? super T> subscriber) {
428                        throw new IllegalStateException(toString());
429                }
430
431                <T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
432                        throw new IllegalStateException(toString());
433                }
434
435                <T> void cancel(AbstractListenerReadPublisher<T> publisher) {
436                        if (publisher.changeState(this, COMPLETED)) {
437                                publisher.discardData();
438                        }
439                        else {
440                                publisher.state.get().cancel(publisher);
441                        }
442                }
443
444                <T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
445                        // ignore
446                }
447
448                <T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
449                        if (publisher.changeState(this, COMPLETED)) {
450                                Subscriber<? super T> s = publisher.subscriber;
451                                if (s != null) {
452                                        s.onComplete();
453                                }
454                        }
455                        else {
456                                publisher.state.get().onAllDataRead(publisher);
457                        }
458                }
459
460                <T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) {
461                        if (publisher.changeState(this, COMPLETED)) {
462                                publisher.discardData();
463                                Subscriber<? super T> s = publisher.subscriber;
464                                if (s != null) {
465                                        s.onError(t);
466                                }
467                        }
468                        else {
469                                publisher.state.get().onError(publisher, t);
470                        }
471                }
472        }
473
474}