001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.http.server.reactive; 018 019import java.io.IOException; 020import java.util.concurrent.atomic.AtomicLongFieldUpdater; 021import java.util.concurrent.atomic.AtomicReference; 022 023import org.apache.commons.logging.Log; 024import org.reactivestreams.Publisher; 025import org.reactivestreams.Subscriber; 026import org.reactivestreams.Subscription; 027import reactor.core.publisher.Operators; 028 029import org.springframework.core.log.LogDelegateFactory; 030import org.springframework.lang.Nullable; 031import org.springframework.util.Assert; 032 033/** 034 * Abstract base class for {@code Publisher} implementations that bridge between 035 * event-listener read APIs and Reactive Streams. 036 * 037 * <p>Specifically a base class for reading from the HTTP request body with 038 * Servlet 3.1 non-blocking I/O and Undertow XNIO as well as handling incoming 039 * WebSocket messages with standard Java WebSocket (JSR-356), Jetty, and 040 * Undertow. 041 * 042 * @author Arjen Poutsma 043 * @author Violeta Georgieva 044 * @author Rossen Stoyanchev 045 * @since 5.0 046 * @param <T> the type of element signaled 047 */ 048public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { 049 050 /** 051 * Special logger for debugging Reactive Streams signals. 052 * @see LogDelegateFactory#getHiddenLog(Class) 053 * @see AbstractListenerWriteProcessor#rsWriteLogger 054 * @see AbstractListenerWriteFlushProcessor#rsWriteFlushLogger 055 * @see WriteResultPublisher#rsWriteResultLogger 056 */ 057 protected static Log rsReadLogger = LogDelegateFactory.getHiddenLog(AbstractListenerReadPublisher.class); 058 059 060 private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED); 061 062 private volatile long demand; 063 064 @SuppressWarnings("rawtypes") 065 private static final AtomicLongFieldUpdater<AbstractListenerReadPublisher> DEMAND_FIELD_UPDATER = 066 AtomicLongFieldUpdater.newUpdater(AbstractListenerReadPublisher.class, "demand"); 067 068 @Nullable 069 private volatile Subscriber<? super T> subscriber; 070 071 private volatile boolean completionBeforeDemand; 072 073 @Nullable 074 private volatile Throwable errorBeforeDemand; 075 076 private final String logPrefix; 077 078 079 public AbstractListenerReadPublisher() { 080 this(""); 081 } 082 083 /** 084 * Create an instance with the given log prefix. 085 * @since 5.1 086 */ 087 public AbstractListenerReadPublisher(String logPrefix) { 088 this.logPrefix = logPrefix; 089 } 090 091 092 /** 093 * Return the configured log message prefix. 094 * @since 5.1 095 */ 096 public String getLogPrefix() { 097 return this.logPrefix; 098 } 099 100 101 // Publisher implementation... 102 103 @Override 104 public void subscribe(Subscriber<? super T> subscriber) { 105 this.state.get().subscribe(this, subscriber); 106 } 107 108 109 // Async I/O notification methods... 110 111 /** 112 * Invoked when reading is possible, either in the same thread after a check 113 * via {@link #checkOnDataAvailable()}, or as a callback from the underlying 114 * container. 115 */ 116 public final void onDataAvailable() { 117 rsReadLogger.trace(getLogPrefix() + "onDataAvailable"); 118 this.state.get().onDataAvailable(this); 119 } 120 121 /** 122 * Sub-classes can call this method to delegate a contain notification when 123 * all data has been read. 124 */ 125 public void onAllDataRead() { 126 rsReadLogger.trace(getLogPrefix() + "onAllDataRead"); 127 this.state.get().onAllDataRead(this); 128 } 129 130 /** 131 * Sub-classes can call this to delegate container error notifications. 132 */ 133 public final void onError(Throwable ex) { 134 if (rsReadLogger.isTraceEnabled()) { 135 rsReadLogger.trace(getLogPrefix() + "Connection error: " + ex); 136 } 137 this.state.get().onError(this, ex); 138 } 139 140 141 // Read API methods to be implemented or template methods to override... 142 143 /** 144 * Check if data is available and either call {@link #onDataAvailable()} 145 * immediately or schedule a notification. 146 */ 147 protected abstract void checkOnDataAvailable(); 148 149 /** 150 * Read once from the input, if possible. 151 * @return the item that was read; or {@code null} 152 */ 153 @Nullable 154 protected abstract T read() throws IOException; 155 156 /** 157 * Invoked when reading is paused due to a lack of demand. 158 * <p><strong>Note:</strong> This method is guaranteed not to compete with 159 * {@link #checkOnDataAvailable()} so it can be used to safely suspend 160 * reading, if the underlying API supports it, i.e. without competing with 161 * an implicit call to resume via {@code checkOnDataAvailable()}. 162 * @since 5.0.2 163 */ 164 protected abstract void readingPaused(); 165 166 /** 167 * Invoked after an I/O read error from the underlying server or after a 168 * cancellation signal from the downstream consumer to allow sub-classes 169 * to discard any current cached data they might have. 170 * @since 5.0.11 171 */ 172 protected abstract void discardData(); 173 174 175 // Private methods for use in State... 176 177 /** 178 * Read and publish data one at a time until there is no more data, no more 179 * demand, or perhaps we completed in the mean time. 180 * @return {@code true} if there is more demand; {@code false} if there is 181 * no more demand or we have completed. 182 */ 183 private boolean readAndPublish() throws IOException { 184 long r; 185 while ((r = this.demand) > 0 && !this.state.get().equals(State.COMPLETED)) { 186 T data = read(); 187 if (data != null) { 188 if (r != Long.MAX_VALUE) { 189 DEMAND_FIELD_UPDATER.addAndGet(this, -1L); 190 } 191 Subscriber<? super T> subscriber = this.subscriber; 192 Assert.state(subscriber != null, "No subscriber"); 193 if (rsReadLogger.isTraceEnabled()) { 194 rsReadLogger.trace(getLogPrefix() + "Publishing data read"); 195 } 196 subscriber.onNext(data); 197 } 198 else { 199 if (rsReadLogger.isTraceEnabled()) { 200 rsReadLogger.trace(getLogPrefix() + "No more data to read"); 201 } 202 return true; 203 } 204 } 205 return false; 206 } 207 208 private boolean changeState(State oldState, State newState) { 209 boolean result = this.state.compareAndSet(oldState, newState); 210 if (result && rsReadLogger.isTraceEnabled()) { 211 rsReadLogger.trace(getLogPrefix() + oldState + " -> " + newState); 212 } 213 return result; 214 } 215 216 private void changeToDemandState(State oldState) { 217 if (changeState(oldState, State.DEMAND)) { 218 // Protect from infinite recursion in Undertow, where we can't check if data 219 // is available, so all we can do is to try to read. 220 // Generally, no need to check if we just came out of readAndPublish()... 221 if (!oldState.equals(State.READING)) { 222 checkOnDataAvailable(); 223 } 224 } 225 } 226 227 private void handleCompletionOrErrorBeforeDemand() { 228 State state = this.state.get(); 229 if (!state.equals(State.UNSUBSCRIBED) && !state.equals(State.SUBSCRIBING)) { 230 if (this.completionBeforeDemand) { 231 rsReadLogger.trace(getLogPrefix() + "Completed before demand"); 232 this.state.get().onAllDataRead(this); 233 } 234 Throwable ex = this.errorBeforeDemand; 235 if (ex != null) { 236 if (rsReadLogger.isTraceEnabled()) { 237 rsReadLogger.trace(getLogPrefix() + "Completed with error before demand: " + ex); 238 } 239 this.state.get().onError(this, ex); 240 } 241 } 242 } 243 244 private Subscription createSubscription() { 245 return new ReadSubscription(); 246 } 247 248 249 /** 250 * Subscription that delegates signals to State. 251 */ 252 private final class ReadSubscription implements Subscription { 253 254 255 @Override 256 public final void request(long n) { 257 if (rsReadLogger.isTraceEnabled()) { 258 rsReadLogger.trace(getLogPrefix() + n + " requested"); 259 } 260 state.get().request(AbstractListenerReadPublisher.this, n); 261 } 262 263 @Override 264 public final void cancel() { 265 if (rsReadLogger.isTraceEnabled()) { 266 rsReadLogger.trace(getLogPrefix() + "Cancellation"); 267 } 268 state.get().cancel(AbstractListenerReadPublisher.this); 269 } 270 } 271 272 273 /** 274 * Represents a state for the {@link Publisher} to be in. 275 * <p><pre> 276 * UNSUBSCRIBED 277 * | 278 * v 279 * SUBSCRIBING 280 * | 281 * v 282 * +---- NO_DEMAND ---------------> DEMAND ---+ 283 * | ^ ^ | 284 * | | | | 285 * | +------- READING <--------+ | 286 * | | | 287 * | v | 288 * +--------------> COMPLETED <---------------+ 289 * </pre> 290 */ 291 private enum State { 292 293 UNSUBSCRIBED { 294 @Override 295 <T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? super T> subscriber) { 296 Assert.notNull(publisher, "Publisher must not be null"); 297 Assert.notNull(subscriber, "Subscriber must not be null"); 298 if (publisher.changeState(this, SUBSCRIBING)) { 299 Subscription subscription = publisher.createSubscription(); 300 publisher.subscriber = subscriber; 301 subscriber.onSubscribe(subscription); 302 publisher.changeState(SUBSCRIBING, NO_DEMAND); 303 publisher.handleCompletionOrErrorBeforeDemand(); 304 } 305 else { 306 throw new IllegalStateException("Failed to transition to SUBSCRIBING, " + 307 "subscriber: " + subscriber); 308 } 309 } 310 311 @Override 312 <T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) { 313 publisher.completionBeforeDemand = true; 314 publisher.handleCompletionOrErrorBeforeDemand(); 315 } 316 317 @Override 318 <T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) { 319 publisher.errorBeforeDemand = ex; 320 publisher.handleCompletionOrErrorBeforeDemand(); 321 } 322 }, 323 324 /** 325 * Very brief state where we know we have a Subscriber but must not 326 * send onComplete and onError until we after onSubscribe. 327 */ 328 SUBSCRIBING { 329 @Override 330 <T> void request(AbstractListenerReadPublisher<T> publisher, long n) { 331 if (Operators.validate(n)) { 332 Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); 333 publisher.changeToDemandState(this); 334 } 335 } 336 337 @Override 338 <T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) { 339 publisher.completionBeforeDemand = true; 340 publisher.handleCompletionOrErrorBeforeDemand(); 341 } 342 343 @Override 344 <T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) { 345 publisher.errorBeforeDemand = ex; 346 publisher.handleCompletionOrErrorBeforeDemand(); 347 } 348 }, 349 350 NO_DEMAND { 351 @Override 352 <T> void request(AbstractListenerReadPublisher<T> publisher, long n) { 353 if (Operators.validate(n)) { 354 Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); 355 publisher.changeToDemandState(this); 356 } 357 } 358 }, 359 360 DEMAND { 361 @Override 362 <T> void request(AbstractListenerReadPublisher<T> publisher, long n) { 363 if (Operators.validate(n)) { 364 Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); 365 // Did a concurrent read transition to NO_DEMAND just before us? 366 publisher.changeToDemandState(NO_DEMAND); 367 } 368 } 369 370 @Override 371 <T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) { 372 if (publisher.changeState(this, READING)) { 373 try { 374 boolean demandAvailable = publisher.readAndPublish(); 375 if (demandAvailable) { 376 publisher.changeToDemandState(READING); 377 } 378 else { 379 publisher.readingPaused(); 380 if (publisher.changeState(READING, NO_DEMAND)) { 381 // Demand may have arrived since readAndPublish returned 382 long r = publisher.demand; 383 if (r > 0) { 384 publisher.changeToDemandState(NO_DEMAND); 385 } 386 } 387 } 388 } 389 catch (IOException ex) { 390 publisher.onError(ex); 391 } 392 } 393 // Else, either competing onDataAvailable (request vs container), or concurrent completion 394 } 395 }, 396 397 READING { 398 @Override 399 <T> void request(AbstractListenerReadPublisher<T> publisher, long n) { 400 if (Operators.validate(n)) { 401 Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); 402 // Did a concurrent read transition to NO_DEMAND just before us? 403 publisher.changeToDemandState(NO_DEMAND); 404 } 405 } 406 }, 407 408 COMPLETED { 409 @Override 410 <T> void request(AbstractListenerReadPublisher<T> publisher, long n) { 411 // ignore 412 } 413 @Override 414 <T> void cancel(AbstractListenerReadPublisher<T> publisher) { 415 // ignore 416 } 417 @Override 418 <T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) { 419 // ignore 420 } 421 @Override 422 <T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) { 423 // ignore 424 } 425 }; 426 427 <T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? super T> subscriber) { 428 throw new IllegalStateException(toString()); 429 } 430 431 <T> void request(AbstractListenerReadPublisher<T> publisher, long n) { 432 throw new IllegalStateException(toString()); 433 } 434 435 <T> void cancel(AbstractListenerReadPublisher<T> publisher) { 436 if (publisher.changeState(this, COMPLETED)) { 437 publisher.discardData(); 438 } 439 else { 440 publisher.state.get().cancel(publisher); 441 } 442 } 443 444 <T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) { 445 // ignore 446 } 447 448 <T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) { 449 if (publisher.changeState(this, COMPLETED)) { 450 Subscriber<? super T> s = publisher.subscriber; 451 if (s != null) { 452 s.onComplete(); 453 } 454 } 455 else { 456 publisher.state.get().onAllDataRead(publisher); 457 } 458 } 459 460 <T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) { 461 if (publisher.changeState(this, COMPLETED)) { 462 publisher.discardData(); 463 Subscriber<? super T> s = publisher.subscriber; 464 if (s != null) { 465 s.onError(t); 466 } 467 } 468 else { 469 publisher.state.get().onError(publisher, t); 470 } 471 } 472 } 473 474}