001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.http.server.reactive; 018 019import java.io.IOException; 020import java.util.concurrent.atomic.AtomicReference; 021 022import org.apache.commons.logging.Log; 023import org.reactivestreams.Processor; 024import org.reactivestreams.Subscriber; 025import org.reactivestreams.Subscription; 026 027import org.springframework.core.log.LogDelegateFactory; 028import org.springframework.lang.Nullable; 029import org.springframework.util.Assert; 030import org.springframework.util.StringUtils; 031 032/** 033 * Abstract base class for {@code Processor} implementations that bridge between 034 * event-listener write APIs and Reactive Streams. 035 * 036 * <p>Specifically a base class for writing to the HTTP response body with 037 * Servlet 3.1 non-blocking I/O and Undertow XNIO as well for writing WebSocket 038 * messages through the Java WebSocket API (JSR-356), Jetty, and Undertow. 039 * 040 * @author Arjen Poutsma 041 * @author Violeta Georgieva 042 * @author Rossen Stoyanchev 043 * @since 5.0 044 * @param <T> the type of element signaled to the {@link Subscriber} 045 */ 046public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, Void> { 047 048 /** 049 * Special logger for debugging Reactive Streams signals. 050 * @see LogDelegateFactory#getHiddenLog(Class) 051 * @see AbstractListenerReadPublisher#rsReadLogger 052 * @see AbstractListenerWriteFlushProcessor#rsWriteFlushLogger 053 * @see WriteResultPublisher#rsWriteResultLogger 054 */ 055 protected static final Log rsWriteLogger = LogDelegateFactory.getHiddenLog(AbstractListenerWriteProcessor.class); 056 057 058 private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED); 059 060 @Nullable 061 private Subscription subscription; 062 063 @Nullable 064 private volatile T currentData; 065 066 /* Indicates "onComplete" was received during the (last) write. */ 067 private volatile boolean subscriberCompleted; 068 069 /** 070 * Indicates we're waiting for one last isReady-onWritePossible cycle 071 * after "onComplete" because some Servlet containers expect this to take 072 * place prior to calling AsyncContext.complete(). 073 * See https://github.com/eclipse-ee4j/servlet-api/issues/273 074 */ 075 private volatile boolean readyToCompleteAfterLastWrite; 076 077 private final WriteResultPublisher resultPublisher; 078 079 private final String logPrefix; 080 081 082 public AbstractListenerWriteProcessor() { 083 this(""); 084 } 085 086 /** 087 * Create an instance with the given log prefix. 088 * @since 5.1 089 */ 090 public AbstractListenerWriteProcessor(String logPrefix) { 091 this.logPrefix = (StringUtils.hasText(logPrefix) ? logPrefix : ""); 092 this.resultPublisher = new WriteResultPublisher(logPrefix); 093 } 094 095 096 /** 097 * Get the configured log prefix. 098 * @since 5.1 099 */ 100 public String getLogPrefix() { 101 return this.logPrefix; 102 } 103 104 105 // Subscriber methods and async I/O notification methods... 106 107 @Override 108 public final void onSubscribe(Subscription subscription) { 109 this.state.get().onSubscribe(this, subscription); 110 } 111 112 @Override 113 public final void onNext(T data) { 114 if (rsWriteLogger.isTraceEnabled()) { 115 rsWriteLogger.trace(getLogPrefix() + "Item to write"); 116 } 117 this.state.get().onNext(this, data); 118 } 119 120 /** 121 * Error signal from the upstream, write Publisher. This is also used by 122 * sub-classes to delegate error notifications from the container. 123 */ 124 @Override 125 public final void onError(Throwable ex) { 126 if (rsWriteLogger.isTraceEnabled()) { 127 rsWriteLogger.trace(getLogPrefix() + "Write source error: " + ex); 128 } 129 this.state.get().onError(this, ex); 130 } 131 132 /** 133 * Completion signal from the upstream, write Publisher. This is also used 134 * by sub-classes to delegate completion notifications from the container. 135 */ 136 @Override 137 public final void onComplete() { 138 if (rsWriteLogger.isTraceEnabled()) { 139 rsWriteLogger.trace(getLogPrefix() + "No more items to write"); 140 } 141 this.state.get().onComplete(this); 142 } 143 144 /** 145 * Invoked when writing is possible, either in the same thread after a check 146 * via {@link #isWritePossible()}, or as a callback from the underlying 147 * container. 148 */ 149 public final void onWritePossible() { 150 if (rsWriteLogger.isTraceEnabled()) { 151 rsWriteLogger.trace(getLogPrefix() + "onWritePossible"); 152 } 153 this.state.get().onWritePossible(this); 154 } 155 156 /** 157 * Invoked during an error or completion callback from the underlying 158 * container to cancel the upstream subscription. 159 */ 160 public void cancel() { 161 rsWriteLogger.trace(getLogPrefix() + "Cancellation"); 162 if (this.subscription != null) { 163 this.subscription.cancel(); 164 } 165 } 166 167 // Publisher implementation for result notifications... 168 169 @Override 170 public final void subscribe(Subscriber<? super Void> subscriber) { 171 // Technically, cancellation from the result subscriber should be propagated 172 // to the upstream subscription. In practice, HttpHandler server adapters 173 // don't have a reason to cancel the result subscription. 174 this.resultPublisher.subscribe(subscriber); 175 } 176 177 178 // Write API methods to be implemented or template methods to override... 179 180 /** 181 * Whether the given data item has any content to write. 182 * If false the item is not written. 183 */ 184 protected abstract boolean isDataEmpty(T data); 185 186 /** 187 * Template method invoked after a data item to write is received via 188 * {@link Subscriber#onNext(Object)}. The default implementation saves the 189 * data item for writing once that is possible. 190 */ 191 protected void dataReceived(T data) { 192 T prev = this.currentData; 193 if (prev != null) { 194 // This shouldn't happen: 195 // 1. dataReceived can only be called from REQUESTED state 196 // 2. currentData is cleared before requesting 197 discardData(data); 198 cancel(); 199 onError(new IllegalStateException("Received new data while current not processed yet.")); 200 } 201 this.currentData = data; 202 } 203 204 /** 205 * Whether writing is possible. 206 */ 207 protected abstract boolean isWritePossible(); 208 209 /** 210 * Write the given item. 211 * <p><strong>Note:</strong> Sub-classes are responsible for releasing any 212 * data buffer associated with the item, once fully written, if pooled 213 * buffers apply to the underlying container. 214 * @param data the item to write 215 * @return {@code true} if the current data item was written completely and 216 * a new item requested, or {@code false} if it was written partially and 217 * we'll need more write callbacks before it is fully written 218 */ 219 protected abstract boolean write(T data) throws IOException; 220 221 /** 222 * Invoked after the current data has been written and before requesting 223 * the next item from the upstream, write Publisher. 224 * <p>The default implementation is a no-op. 225 * @deprecated originally introduced for Undertow to stop write notifications 226 * when no data is available, but deprecated as of as of 5.0.6 since constant 227 * switching on every requested item causes a significant slowdown. 228 */ 229 @Deprecated 230 protected void writingPaused() { 231 } 232 233 /** 234 * Invoked after onComplete or onError notification. 235 * <p>The default implementation is a no-op. 236 */ 237 protected void writingComplete() { 238 } 239 240 /** 241 * Invoked when an I/O error occurs during a write. Sub-classes may choose 242 * to ignore this if they know the underlying API will provide an error 243 * notification in a container thread. 244 * <p>Defaults to no-op. 245 */ 246 protected void writingFailed(Throwable ex) { 247 } 248 249 /** 250 * Invoked after any error (either from the upstream write Publisher, or 251 * from I/O operations to the underlying server) and cancellation 252 * to discard in-flight data that was in 253 * the process of being written when the error took place. 254 * @param data the data to be released 255 * @since 5.0.11 256 */ 257 protected abstract void discardData(T data); 258 259 260 // Private methods for use from State's... 261 262 private boolean changeState(State oldState, State newState) { 263 boolean result = this.state.compareAndSet(oldState, newState); 264 if (result && rsWriteLogger.isTraceEnabled()) { 265 rsWriteLogger.trace(getLogPrefix() + oldState + " -> " + newState); 266 } 267 return result; 268 } 269 270 private void changeStateToReceived(State oldState) { 271 if (changeState(oldState, State.RECEIVED)) { 272 writeIfPossible(); 273 } 274 } 275 276 private void changeStateToComplete(State oldState) { 277 if (changeState(oldState, State.COMPLETED)) { 278 discardCurrentData(); 279 writingComplete(); 280 this.resultPublisher.publishComplete(); 281 } 282 else { 283 this.state.get().onComplete(this); 284 } 285 } 286 287 private void writeIfPossible() { 288 boolean result = isWritePossible(); 289 if (!result && rsWriteLogger.isTraceEnabled()) { 290 rsWriteLogger.trace(getLogPrefix() + "isWritePossible: false"); 291 } 292 if (result) { 293 onWritePossible(); 294 } 295 } 296 297 private void discardCurrentData() { 298 T data = this.currentData; 299 this.currentData = null; 300 if (data != null) { 301 discardData(data); 302 } 303 } 304 305 306 /** 307 * Represents a state for the {@link Processor} to be in. 308 * 309 * <p><pre> 310 * UNSUBSCRIBED 311 * | 312 * v 313 * +--- REQUESTED -------------> RECEIVED ---+ 314 * | ^ ^ | 315 * | | | | 316 * | + ------ WRITING <------+ | 317 * | | | 318 * | v | 319 * +--------------> COMPLETED <--------------+ 320 * </pre> 321 */ 322 private enum State { 323 324 UNSUBSCRIBED { 325 @Override 326 public <T> void onSubscribe(AbstractListenerWriteProcessor<T> processor, Subscription subscription) { 327 Assert.notNull(subscription, "Subscription must not be null"); 328 if (processor.changeState(this, REQUESTED)) { 329 processor.subscription = subscription; 330 subscription.request(1); 331 } 332 else { 333 super.onSubscribe(processor, subscription); 334 } 335 } 336 337 @Override 338 public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) { 339 // This can happen on (very early) completion notification from container.. 340 processor.changeStateToComplete(this); 341 } 342 }, 343 344 REQUESTED { 345 @Override 346 public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) { 347 if (processor.isDataEmpty(data)) { 348 Assert.state(processor.subscription != null, "No subscription"); 349 processor.subscription.request(1); 350 } 351 else { 352 processor.dataReceived(data); 353 processor.changeStateToReceived(this); 354 } 355 } 356 @Override 357 public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) { 358 processor.readyToCompleteAfterLastWrite = true; 359 processor.changeStateToReceived(this); 360 } 361 }, 362 363 RECEIVED { 364 @SuppressWarnings("deprecation") 365 @Override 366 public <T> void onWritePossible(AbstractListenerWriteProcessor<T> processor) { 367 if (processor.readyToCompleteAfterLastWrite) { 368 processor.changeStateToComplete(RECEIVED); 369 } 370 else if (processor.changeState(this, WRITING)) { 371 T data = processor.currentData; 372 Assert.state(data != null, "No data"); 373 try { 374 if (processor.write(data)) { 375 if (processor.changeState(WRITING, REQUESTED)) { 376 processor.currentData = null; 377 if (processor.subscriberCompleted) { 378 processor.readyToCompleteAfterLastWrite = true; 379 processor.changeStateToReceived(REQUESTED); 380 } 381 else { 382 processor.writingPaused(); 383 Assert.state(processor.subscription != null, "No subscription"); 384 processor.subscription.request(1); 385 } 386 } 387 } 388 else { 389 processor.changeStateToReceived(WRITING); 390 } 391 } 392 catch (IOException ex) { 393 processor.writingFailed(ex); 394 } 395 } 396 } 397 398 @Override 399 public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) { 400 processor.subscriberCompleted = true; 401 // A competing write might have completed very quickly 402 if (processor.state.get().equals(State.REQUESTED)) { 403 processor.changeStateToComplete(State.REQUESTED); 404 } 405 } 406 }, 407 408 WRITING { 409 @Override 410 public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) { 411 processor.subscriberCompleted = true; 412 // A competing write might have completed very quickly 413 if (processor.state.get().equals(State.REQUESTED)) { 414 processor.changeStateToComplete(State.REQUESTED); 415 } 416 } 417 }, 418 419 COMPLETED { 420 @Override 421 public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) { 422 // ignore 423 } 424 @Override 425 public <T> void onError(AbstractListenerWriteProcessor<T> processor, Throwable ex) { 426 // ignore 427 } 428 @Override 429 public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) { 430 // ignore 431 } 432 }; 433 434 public <T> void onSubscribe(AbstractListenerWriteProcessor<T> processor, Subscription subscription) { 435 subscription.cancel(); 436 } 437 438 public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) { 439 processor.discardData(data); 440 processor.cancel(); 441 processor.onError(new IllegalStateException("Illegal onNext without demand")); 442 } 443 444 public <T> void onError(AbstractListenerWriteProcessor<T> processor, Throwable ex) { 445 if (processor.changeState(this, COMPLETED)) { 446 processor.discardCurrentData(); 447 processor.writingComplete(); 448 processor.resultPublisher.publishError(ex); 449 } 450 else { 451 processor.state.get().onError(processor, ex); 452 } 453 } 454 455 public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) { 456 throw new IllegalStateException(toString()); 457 } 458 459 public <T> void onWritePossible(AbstractListenerWriteProcessor<T> processor) { 460 // ignore 461 } 462 } 463 464}