001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.test.web.reactive.server; 018 019import java.util.function.Consumer; 020 021import reactor.core.publisher.Flux; 022 023/** 024 * {@code ExchangeResult} variant with the response body decoded as 025 * {@code Flux<T>} but not yet consumed. 026 * 027 * @author Rossen Stoyanchev 028 * @since 5.0 029 * @param <T> the type of elements in the response body 030 * @see EntityExchangeResult 031 */ 032public class FluxExchangeResult<T> extends ExchangeResult { 033 034 private final Flux<T> body; 035 036 037 FluxExchangeResult(ExchangeResult result, Flux<T> body) { 038 super(result); 039 this.body = body; 040 } 041 042 043 /** 044 * Return the response body as a {@code Flux<T>} of decoded elements. 045 * 046 * <p>The response body stream can then be consumed further with the 047 * "reactor-test" {@code StepVerifier} and cancelled when enough elements have been 048 * consumed from the (possibly infinite) stream: 049 * 050 * <pre> 051 * FluxExchangeResult<Person> result = this.client.get() 052 * .uri("/persons") 053 * .accept(TEXT_EVENT_STREAM) 054 * .exchange() 055 * .expectStatus().isOk() 056 * .expectHeader().contentType(TEXT_EVENT_STREAM) 057 * .expectBody(Person.class) 058 * .returnResult(); 059 * 060 * StepVerifier.create(result.getResponseBody()) 061 * .expectNext(new Person("Jane"), new Person("Jason")) 062 * .expectNextCount(4) 063 * .expectNext(new Person("Jay")) 064 * .thenCancel() 065 * .verify(); 066 * </pre> 067 */ 068 public Flux<T> getResponseBody() { 069 return this.body; 070 } 071 072 /** 073 * Invoke the given consumer within {@link #assertWithDiagnostics(Runnable)} 074 * passing {@code "this"} instance to it. This method allows the following, 075 * without leaving the {@code WebTestClient} chain of calls: 076 * <pre class="code"> 077 * client.get() 078 * .uri("/persons") 079 * .accept(TEXT_EVENT_STREAM) 080 * .exchange() 081 * .expectStatus().isOk() 082 * .returnResult() 083 * .consumeWith(result -> assertThat(...); 084 * </pre> 085 * @param consumer the consumer for {@code "this"} instance 086 */ 087 public void consumeWith(Consumer<FluxExchangeResult<T>> consumer) { 088 assertWithDiagnostics(() -> consumer.accept(this)); 089 } 090 091}