001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.http.server.reactive; 018 019import java.util.function.Function; 020 021import org.reactivestreams.Publisher; 022import org.reactivestreams.Subscriber; 023import org.reactivestreams.Subscription; 024import reactor.core.CoreSubscriber; 025import reactor.core.Scannable; 026import reactor.core.publisher.Flux; 027import reactor.core.publisher.Mono; 028import reactor.core.publisher.Operators; 029import reactor.util.context.Context; 030 031import org.springframework.core.io.buffer.DataBuffer; 032import org.springframework.core.io.buffer.DataBufferUtils; 033import org.springframework.lang.Nullable; 034import org.springframework.util.Assert; 035 036/** 037 * Given a write function that accepts a source {@code Publisher<T>} to write 038 * with and returns {@code Publisher<Void>} for the result, this operator helps 039 * to defer the invocation of the write function, until we know if the source 040 * publisher will begin publishing without an error. If the first emission is 041 * an error, the write function is bypassed, and the error is sent directly 042 * through the result publisher. Otherwise the write function is invoked. 043 * 044 * @author Rossen Stoyanchev 045 * @author Stephane Maldini 046 * @since 5.0 047 * @param <T> the type of element signaled 048 */ 049public class ChannelSendOperator<T> extends Mono<Void> implements Scannable { 050 051 private final Function<Publisher<T>, Publisher<Void>> writeFunction; 052 053 private final Flux<T> source; 054 055 056 public ChannelSendOperator(Publisher<? extends T> source, Function<Publisher<T>, Publisher<Void>> writeFunction) { 057 this.source = Flux.from(source); 058 this.writeFunction = writeFunction; 059 } 060 061 062 @Override 063 @Nullable 064 @SuppressWarnings("rawtypes") 065 public Object scanUnsafe(Attr key) { 066 if (key == Attr.PREFETCH) { 067 return Integer.MAX_VALUE; 068 } 069 if (key == Attr.PARENT) { 070 return this.source; 071 } 072 return null; 073 } 074 075 @Override 076 public void subscribe(CoreSubscriber<? super Void> actual) { 077 this.source.subscribe(new WriteBarrier(actual)); 078 } 079 080 081 private enum State { 082 083 /** No emissions from the upstream source yet. */ 084 NEW, 085 086 /** 087 * At least one signal of any kind has been received; we're ready to 088 * call the write function and proceed with actual writing. 089 */ 090 FIRST_SIGNAL_RECEIVED, 091 092 /** 093 * The write subscriber has subscribed and requested; we're going to 094 * emit the cached signals. 095 */ 096 EMITTING_CACHED_SIGNALS, 097 098 /** 099 * The write subscriber has subscribed, and cached signals have been 100 * emitted to it; we're ready to switch to a simple pass-through mode 101 * for all remaining signals. 102 **/ 103 READY_TO_WRITE 104 105 } 106 107 108 /** 109 * A barrier inserted between the write source and the write subscriber 110 * (i.e. the HTTP server adapter) that pre-fetches and waits for the first 111 * signal before deciding whether to hook in to the write subscriber. 112 * 113 * <p>Acts as: 114 * <ul> 115 * <li>Subscriber to the write source. 116 * <li>Subscription to the write subscriber. 117 * <li>Publisher to the write subscriber. 118 * </ul> 119 * 120 * <p>Also uses {@link WriteCompletionBarrier} to communicate completion 121 * and detect cancel signals from the completion subscriber. 122 */ 123 private class WriteBarrier implements CoreSubscriber<T>, Subscription, Publisher<T> { 124 125 /* Bridges signals to and from the completionSubscriber */ 126 private final WriteCompletionBarrier writeCompletionBarrier; 127 128 /* Upstream write source subscription */ 129 @Nullable 130 private Subscription subscription; 131 132 /** Cached data item before readyToWrite. */ 133 @Nullable 134 private T item; 135 136 /** Cached error signal before readyToWrite. */ 137 @Nullable 138 private Throwable error; 139 140 /** Cached onComplete signal before readyToWrite. */ 141 private boolean completed = false; 142 143 /** Recursive demand while emitting cached signals. */ 144 private long demandBeforeReadyToWrite; 145 146 /** Current state. */ 147 private State state = State.NEW; 148 149 /** The actual writeSubscriber from the HTTP server adapter. */ 150 @Nullable 151 private Subscriber<? super T> writeSubscriber; 152 153 154 WriteBarrier(CoreSubscriber<? super Void> completionSubscriber) { 155 this.writeCompletionBarrier = new WriteCompletionBarrier(completionSubscriber, this); 156 } 157 158 159 // Subscriber<T> methods (we're the subscriber to the write source).. 160 161 @Override 162 public final void onSubscribe(Subscription s) { 163 if (Operators.validate(this.subscription, s)) { 164 this.subscription = s; 165 this.writeCompletionBarrier.connect(); 166 s.request(1); 167 } 168 } 169 170 @Override 171 public final void onNext(T item) { 172 if (this.state == State.READY_TO_WRITE) { 173 requiredWriteSubscriber().onNext(item); 174 return; 175 } 176 //FIXME revisit in case of reentrant sync deadlock 177 synchronized (this) { 178 if (this.state == State.READY_TO_WRITE) { 179 requiredWriteSubscriber().onNext(item); 180 } 181 else if (this.state == State.NEW) { 182 this.item = item; 183 this.state = State.FIRST_SIGNAL_RECEIVED; 184 Publisher<Void> result; 185 try { 186 result = writeFunction.apply(this); 187 } 188 catch (Throwable ex) { 189 this.writeCompletionBarrier.onError(ex); 190 return; 191 } 192 result.subscribe(this.writeCompletionBarrier); 193 } 194 else { 195 if (this.subscription != null) { 196 this.subscription.cancel(); 197 } 198 this.writeCompletionBarrier.onError(new IllegalStateException("Unexpected item.")); 199 } 200 } 201 } 202 203 private Subscriber<? super T> requiredWriteSubscriber() { 204 Assert.state(this.writeSubscriber != null, "No write subscriber"); 205 return this.writeSubscriber; 206 } 207 208 @Override 209 public final void onError(Throwable ex) { 210 if (this.state == State.READY_TO_WRITE) { 211 requiredWriteSubscriber().onError(ex); 212 return; 213 } 214 synchronized (this) { 215 if (this.state == State.READY_TO_WRITE) { 216 requiredWriteSubscriber().onError(ex); 217 } 218 else if (this.state == State.NEW) { 219 this.state = State.FIRST_SIGNAL_RECEIVED; 220 this.writeCompletionBarrier.onError(ex); 221 } 222 else { 223 this.error = ex; 224 } 225 } 226 } 227 228 @Override 229 public final void onComplete() { 230 if (this.state == State.READY_TO_WRITE) { 231 requiredWriteSubscriber().onComplete(); 232 return; 233 } 234 synchronized (this) { 235 if (this.state == State.READY_TO_WRITE) { 236 requiredWriteSubscriber().onComplete(); 237 } 238 else if (this.state == State.NEW) { 239 this.completed = true; 240 this.state = State.FIRST_SIGNAL_RECEIVED; 241 Publisher<Void> result; 242 try { 243 result = writeFunction.apply(this); 244 } 245 catch (Throwable ex) { 246 this.writeCompletionBarrier.onError(ex); 247 return; 248 } 249 result.subscribe(this.writeCompletionBarrier); 250 } 251 else { 252 this.completed = true; 253 } 254 } 255 } 256 257 @Override 258 public Context currentContext() { 259 return this.writeCompletionBarrier.currentContext(); 260 } 261 262 263 // Subscription methods (we're the Subscription to the writeSubscriber).. 264 265 @Override 266 public void request(long n) { 267 Subscription s = this.subscription; 268 if (s == null) { 269 return; 270 } 271 if (this.state == State.READY_TO_WRITE) { 272 s.request(n); 273 return; 274 } 275 synchronized (this) { 276 if (this.writeSubscriber != null) { 277 if (this.state == State.EMITTING_CACHED_SIGNALS) { 278 this.demandBeforeReadyToWrite = n; 279 return; 280 } 281 try { 282 this.state = State.EMITTING_CACHED_SIGNALS; 283 if (emitCachedSignals()) { 284 return; 285 } 286 n = n + this.demandBeforeReadyToWrite - 1; 287 if (n == 0) { 288 return; 289 } 290 } 291 finally { 292 this.state = State.READY_TO_WRITE; 293 } 294 } 295 } 296 s.request(n); 297 } 298 299 private boolean emitCachedSignals() { 300 if (this.error != null) { 301 try { 302 requiredWriteSubscriber().onError(this.error); 303 } 304 finally { 305 releaseCachedItem(); 306 } 307 return true; 308 } 309 T item = this.item; 310 this.item = null; 311 if (item != null) { 312 requiredWriteSubscriber().onNext(item); 313 } 314 if (this.completed) { 315 requiredWriteSubscriber().onComplete(); 316 return true; 317 } 318 return false; 319 } 320 321 @Override 322 public void cancel() { 323 Subscription s = this.subscription; 324 if (s != null) { 325 this.subscription = null; 326 try { 327 s.cancel(); 328 } 329 finally { 330 releaseCachedItem(); 331 } 332 } 333 } 334 335 private void releaseCachedItem() { 336 synchronized (this) { 337 Object item = this.item; 338 if (item instanceof DataBuffer) { 339 DataBufferUtils.release((DataBuffer) item); 340 } 341 this.item = null; 342 } 343 } 344 345 346 // Publisher<T> methods (we're the Publisher to the writeSubscriber).. 347 348 @Override 349 public void subscribe(Subscriber<? super T> writeSubscriber) { 350 synchronized (this) { 351 Assert.state(this.writeSubscriber == null, "Only one write subscriber supported"); 352 this.writeSubscriber = writeSubscriber; 353 if (this.error != null || this.completed) { 354 this.writeSubscriber.onSubscribe(Operators.emptySubscription()); 355 emitCachedSignals(); 356 } 357 else { 358 this.writeSubscriber.onSubscribe(this); 359 } 360 } 361 } 362 } 363 364 365 /** 366 * We need an extra barrier between the WriteBarrier itself and the actual 367 * completion subscriber. 368 * 369 * <p>The completionSubscriber is subscribed initially to the WriteBarrier. 370 * Later after the first signal is received, we need one more subscriber 371 * instance (per spec can only subscribe once) to subscribe to the write 372 * function and switch to delegating completion signals from it. 373 */ 374 private class WriteCompletionBarrier implements CoreSubscriber<Void>, Subscription { 375 376 /* Downstream write completion subscriber */ 377 private final CoreSubscriber<? super Void> completionSubscriber; 378 379 private final WriteBarrier writeBarrier; 380 381 @Nullable 382 private Subscription subscription; 383 384 385 public WriteCompletionBarrier(CoreSubscriber<? super Void> subscriber, WriteBarrier writeBarrier) { 386 this.completionSubscriber = subscriber; 387 this.writeBarrier = writeBarrier; 388 } 389 390 391 /** 392 * Connect the underlying completion subscriber to this barrier in order 393 * to track cancel signals and pass them on to the write barrier. 394 */ 395 public void connect() { 396 this.completionSubscriber.onSubscribe(this); 397 } 398 399 // Subscriber methods (we're the subscriber to the write function).. 400 401 @Override 402 public void onSubscribe(Subscription subscription) { 403 this.subscription = subscription; 404 subscription.request(Long.MAX_VALUE); 405 } 406 407 @Override 408 public void onNext(Void aVoid) { 409 } 410 411 @Override 412 public void onError(Throwable ex) { 413 try { 414 this.completionSubscriber.onError(ex); 415 } 416 finally { 417 this.writeBarrier.releaseCachedItem(); 418 } 419 } 420 421 @Override 422 public void onComplete() { 423 this.completionSubscriber.onComplete(); 424 } 425 426 @Override 427 public Context currentContext() { 428 return this.completionSubscriber.currentContext(); 429 } 430 431 432 @Override 433 public void request(long n) { 434 // Ignore: we don't produce data 435 } 436 437 @Override 438 public void cancel() { 439 this.writeBarrier.cancel(); 440 Subscription subscription = this.subscription; 441 if (subscription != null) { 442 subscription.cancel(); 443 } 444 } 445 } 446 447}