001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.test.web.reactive.server;
018
019import java.util.function.Consumer;
020
021import reactor.core.publisher.Flux;
022
023/**
024 * {@code ExchangeResult} variant with the response body decoded as
025 * {@code Flux<T>} but not yet consumed.
026 *
027 * @author Rossen Stoyanchev
028 * @since 5.0
029 * @param <T> the type of elements in the response body
030 * @see EntityExchangeResult
031 */
032public class FluxExchangeResult<T> extends ExchangeResult {
033
034        private final Flux<T> body;
035
036
037        FluxExchangeResult(ExchangeResult result, Flux<T> body) {
038                super(result);
039                this.body = body;
040        }
041
042
043        /**
044         * Return the response body as a {@code Flux<T>} of decoded elements.
045         *
046         * <p>The response body stream can then be consumed further with the
047         * "reactor-test" {@code StepVerifier} and cancelled when enough elements have been
048         * consumed from the (possibly infinite) stream:
049         *
050         * <pre>
051         * FluxExchangeResult&lt;Person&gt; result = this.client.get()
052         *      .uri("/persons")
053         *      .accept(TEXT_EVENT_STREAM)
054         *      .exchange()
055         *      .expectStatus().isOk()
056         *      .expectHeader().contentType(TEXT_EVENT_STREAM)
057         *      .expectBody(Person.class)
058         *      .returnResult();
059         *
060         * StepVerifier.create(result.getResponseBody())
061         *      .expectNext(new Person("Jane"), new Person("Jason"))
062         *      .expectNextCount(4)
063         *      .expectNext(new Person("Jay"))
064         *      .thenCancel()
065         *      .verify();
066         * </pre>
067         */
068        public Flux<T> getResponseBody() {
069                return this.body;
070        }
071
072        /**
073         * Invoke the given consumer within {@link #assertWithDiagnostics(Runnable)}
074         * passing {@code "this"} instance to it. This method allows the following,
075         * without leaving the {@code WebTestClient} chain of calls:
076         * <pre class="code">
077         *      client.get()
078         *              .uri("/persons")
079         *              .accept(TEXT_EVENT_STREAM)
080         *              .exchange()
081         *              .expectStatus().isOk()
082         *              .returnResult()
083         *              .consumeWith(result -> assertThat(...);
084         * </pre>
085         * @param consumer the consumer for {@code "this"} instance
086         */
087        public void consumeWith(Consumer<FluxExchangeResult<T>> consumer) {
088                assertWithDiagnostics(() -> consumer.accept(this));
089        }
090
091}