001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.http.server.reactive; 018 019import java.io.IOException; 020import java.util.concurrent.atomic.AtomicReference; 021 022import org.apache.commons.logging.Log; 023import org.reactivestreams.Processor; 024import org.reactivestreams.Publisher; 025import org.reactivestreams.Subscriber; 026import org.reactivestreams.Subscription; 027 028import org.springframework.core.log.LogDelegateFactory; 029import org.springframework.lang.Nullable; 030import org.springframework.util.Assert; 031 032/** 033 * An alternative to {@link AbstractListenerWriteProcessor} but instead writing 034 * a {@code Publisher<Publisher<T>>} with flush boundaries enforces after 035 * the completion of each nested Publisher. 036 * 037 * @author Arjen Poutsma 038 * @author Violeta Georgieva 039 * @author Rossen Stoyanchev 040 * @since 5.0 041 * @param <T> the type of element signaled to the {@link Subscriber} 042 */ 043public abstract class AbstractListenerWriteFlushProcessor<T> implements Processor<Publisher<? extends T>, Void> { 044 045 /** 046 * Special logger for debugging Reactive Streams signals. 047 * @see LogDelegateFactory#getHiddenLog(Class) 048 * @see AbstractListenerReadPublisher#rsReadLogger 049 * @see AbstractListenerWriteProcessor#rsWriteLogger 050 * @see WriteResultPublisher#rsWriteResultLogger 051 */ 052 protected static final Log rsWriteFlushLogger = 053 LogDelegateFactory.getHiddenLog(AbstractListenerWriteFlushProcessor.class); 054 055 056 private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED); 057 058 @Nullable 059 private Subscription subscription; 060 061 private volatile boolean subscriberCompleted; 062 063 private final WriteResultPublisher resultPublisher; 064 065 private final String logPrefix; 066 067 068 public AbstractListenerWriteFlushProcessor() { 069 this(""); 070 } 071 072 /** 073 * Create an instance with the given log prefix. 074 * @since 5.1 075 */ 076 public AbstractListenerWriteFlushProcessor(String logPrefix) { 077 this.logPrefix = logPrefix; 078 this.resultPublisher = new WriteResultPublisher(logPrefix); 079 } 080 081 082 /** 083 * Create an instance with the given log prefix. 084 * @since 5.1 085 */ 086 public String getLogPrefix() { 087 return this.logPrefix; 088 } 089 090 091 // Subscriber methods and async I/O notification methods... 092 093 @Override 094 public final void onSubscribe(Subscription subscription) { 095 this.state.get().onSubscribe(this, subscription); 096 } 097 098 @Override 099 public final void onNext(Publisher<? extends T> publisher) { 100 if (rsWriteFlushLogger.isTraceEnabled()) { 101 rsWriteFlushLogger.trace(getLogPrefix() + "Received onNext publisher"); 102 } 103 this.state.get().onNext(this, publisher); 104 } 105 106 /** 107 * Error signal from the upstream, write Publisher. This is also used by 108 * sub-classes to delegate error notifications from the container. 109 */ 110 @Override 111 public final void onError(Throwable ex) { 112 if (rsWriteFlushLogger.isTraceEnabled()) { 113 rsWriteFlushLogger.trace(getLogPrefix() + "Received onError: " + ex); 114 } 115 this.state.get().onError(this, ex); 116 } 117 118 /** 119 * Completion signal from the upstream, write Publisher. This is also used 120 * by sub-classes to delegate completion notifications from the container. 121 */ 122 @Override 123 public final void onComplete() { 124 if (rsWriteFlushLogger.isTraceEnabled()) { 125 rsWriteFlushLogger.trace(getLogPrefix() + "Received onComplete"); 126 } 127 this.state.get().onComplete(this); 128 } 129 130 /** 131 * Invoked when flushing is possible, either in the same thread after a check 132 * via {@link #isWritePossible()}, or as a callback from the underlying 133 * container. 134 */ 135 protected final void onFlushPossible() { 136 this.state.get().onFlushPossible(this); 137 } 138 139 /** 140 * Invoked during an error or completion callback from the underlying 141 * container to cancel the upstream subscription. 142 */ 143 protected void cancel() { 144 if (rsWriteFlushLogger.isTraceEnabled()) { 145 rsWriteFlushLogger.trace(getLogPrefix() + "Received request to cancel"); 146 } 147 if (this.subscription != null) { 148 this.subscription.cancel(); 149 } 150 } 151 152 153 // Publisher implementation for result notifications... 154 155 @Override 156 public final void subscribe(Subscriber<? super Void> subscriber) { 157 this.resultPublisher.subscribe(subscriber); 158 } 159 160 161 // Write API methods to be implemented or template methods to override... 162 163 /** 164 * Create a new processor for the current flush boundary. 165 */ 166 protected abstract Processor<? super T, Void> createWriteProcessor(); 167 168 /** 169 * Whether writing/flushing is possible. 170 */ 171 protected abstract boolean isWritePossible(); 172 173 /** 174 * Flush the output if ready, or otherwise {@link #isFlushPending()} should 175 * return true after. 176 * <p>This is primarily for the Servlet non-blocking I/O API where flush 177 * cannot be called without a readyToWrite check. 178 */ 179 protected abstract void flush() throws IOException; 180 181 /** 182 * Whether flushing is pending. 183 * <p>This is primarily for the Servlet non-blocking I/O API where flush 184 * cannot be called without a readyToWrite check. 185 */ 186 protected abstract boolean isFlushPending(); 187 188 /** 189 * Invoked when an error happens while flushing. Sub-classes may choose 190 * to ignore this if they know the underlying API will provide an error 191 * notification in a container thread. 192 * <p>Defaults to no-op. 193 */ 194 protected void flushingFailed(Throwable t) { 195 } 196 197 198 // Private methods for use in State... 199 200 private boolean changeState(State oldState, State newState) { 201 boolean result = this.state.compareAndSet(oldState, newState); 202 if (result && rsWriteFlushLogger.isTraceEnabled()) { 203 rsWriteFlushLogger.trace(getLogPrefix() + oldState + " -> " + newState); 204 } 205 return result; 206 } 207 208 private void flushIfPossible() { 209 boolean result = isWritePossible(); 210 if (rsWriteFlushLogger.isTraceEnabled()) { 211 rsWriteFlushLogger.trace(getLogPrefix() + "isWritePossible[" + result + "]"); 212 } 213 if (result) { 214 onFlushPossible(); 215 } 216 } 217 218 219 /** 220 * Represents a state for the {@link Processor} to be in. 221 * 222 * <p><pre> 223 * UNSUBSCRIBED 224 * | 225 * v 226 * REQUESTED <---> RECEIVED ------+ 227 * | | | 228 * | v | 229 * | FLUSHING | 230 * | | | 231 * | v | 232 * +--------> COMPLETED <-----+ 233 * </pre> 234 */ 235 private enum State { 236 237 UNSUBSCRIBED { 238 @Override 239 public <T> void onSubscribe(AbstractListenerWriteFlushProcessor<T> processor, Subscription subscription) { 240 Assert.notNull(subscription, "Subscription must not be null"); 241 if (processor.changeState(this, REQUESTED)) { 242 processor.subscription = subscription; 243 subscription.request(1); 244 } 245 else { 246 super.onSubscribe(processor, subscription); 247 } 248 } 249 250 @Override 251 public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) { 252 // This can happen on (very early) completion notification from container.. 253 if (processor.changeState(this, COMPLETED)) { 254 processor.resultPublisher.publishComplete(); 255 } 256 else { 257 processor.state.get().onComplete(processor); 258 } 259 } 260 }, 261 262 REQUESTED { 263 @Override 264 public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor, 265 Publisher<? extends T> currentPublisher) { 266 267 if (processor.changeState(this, RECEIVED)) { 268 Processor<? super T, Void> currentProcessor = processor.createWriteProcessor(); 269 currentPublisher.subscribe(currentProcessor); 270 currentProcessor.subscribe(new WriteResultSubscriber(processor)); 271 } 272 } 273 @Override 274 public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) { 275 if (processor.changeState(this, COMPLETED)) { 276 processor.resultPublisher.publishComplete(); 277 } 278 else { 279 processor.state.get().onComplete(processor); 280 } 281 } 282 }, 283 284 RECEIVED { 285 @Override 286 public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor) { 287 try { 288 processor.flush(); 289 } 290 catch (Throwable ex) { 291 processor.flushingFailed(ex); 292 return; 293 } 294 if (processor.changeState(this, REQUESTED)) { 295 if (processor.subscriberCompleted) { 296 handleSubscriberCompleted(processor); 297 } 298 else { 299 Assert.state(processor.subscription != null, "No subscription"); 300 processor.subscription.request(1); 301 } 302 } 303 } 304 @Override 305 public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) { 306 processor.subscriberCompleted = true; 307 // A competing write might have completed very quickly 308 if (processor.state.get().equals(State.REQUESTED)) { 309 handleSubscriberCompleted(processor); 310 } 311 } 312 313 private <T> void handleSubscriberCompleted(AbstractListenerWriteFlushProcessor<T> processor) { 314 if (processor.isFlushPending()) { 315 // Ensure the final flush 316 processor.changeState(State.REQUESTED, State.FLUSHING); 317 processor.flushIfPossible(); 318 } 319 else if (processor.changeState(State.REQUESTED, State.COMPLETED)) { 320 processor.resultPublisher.publishComplete(); 321 } 322 else { 323 processor.state.get().onComplete(processor); 324 } 325 } 326 }, 327 328 FLUSHING { 329 @Override 330 public <T> void onFlushPossible(AbstractListenerWriteFlushProcessor<T> processor) { 331 try { 332 processor.flush(); 333 } 334 catch (Throwable ex) { 335 processor.flushingFailed(ex); 336 return; 337 } 338 if (processor.changeState(this, COMPLETED)) { 339 processor.resultPublisher.publishComplete(); 340 } 341 else { 342 processor.state.get().onComplete(processor); 343 } 344 } 345 @Override 346 public <T> void onNext(AbstractListenerWriteFlushProcessor<T> proc, Publisher<? extends T> pub) { 347 // ignore 348 } 349 @Override 350 public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) { 351 // ignore 352 } 353 }, 354 355 COMPLETED { 356 @Override 357 public <T> void onNext(AbstractListenerWriteFlushProcessor<T> proc, Publisher<? extends T> pub) { 358 // ignore 359 } 360 @Override 361 public <T> void onError(AbstractListenerWriteFlushProcessor<T> processor, Throwable t) { 362 // ignore 363 } 364 @Override 365 public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) { 366 // ignore 367 } 368 }; 369 370 371 public <T> void onSubscribe(AbstractListenerWriteFlushProcessor<T> proc, Subscription subscription) { 372 subscription.cancel(); 373 } 374 375 public <T> void onNext(AbstractListenerWriteFlushProcessor<T> proc, Publisher<? extends T> pub) { 376 throw new IllegalStateException(toString()); 377 } 378 379 public <T> void onError(AbstractListenerWriteFlushProcessor<T> processor, Throwable ex) { 380 if (processor.changeState(this, COMPLETED)) { 381 processor.resultPublisher.publishError(ex); 382 } 383 else { 384 processor.state.get().onError(processor, ex); 385 } 386 } 387 388 public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) { 389 throw new IllegalStateException(toString()); 390 } 391 392 public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor) { 393 throw new IllegalStateException(toString()); 394 } 395 396 public <T> void onFlushPossible(AbstractListenerWriteFlushProcessor<T> processor) { 397 // ignore 398 } 399 400 401 /** 402 * Subscriber to receive and delegate completion notifications for from 403 * the current Publisher, i.e. for the current flush boundary. 404 */ 405 private static class WriteResultSubscriber implements Subscriber<Void> { 406 407 private final AbstractListenerWriteFlushProcessor<?> processor; 408 409 410 public WriteResultSubscriber(AbstractListenerWriteFlushProcessor<?> processor) { 411 this.processor = processor; 412 } 413 414 @Override 415 public void onSubscribe(Subscription subscription) { 416 subscription.request(Long.MAX_VALUE); 417 } 418 419 @Override 420 public void onNext(Void aVoid) { 421 } 422 423 @Override 424 public void onError(Throwable ex) { 425 this.processor.cancel(); 426 this.processor.onError(ex); 427 } 428 429 @Override 430 public void onComplete() { 431 if (rsWriteFlushLogger.isTraceEnabled()) { 432 rsWriteFlushLogger.trace(this.processor.getLogPrefix() + this.processor.state + " writeComplete"); 433 } 434 this.processor.state.get().writeComplete(this.processor); 435 } 436 } 437 } 438 439}