001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.rsocket;
018
019import java.net.URI;
020import java.util.function.Consumer;
021
022import io.rsocket.ConnectionSetupPayload;
023import io.rsocket.Payload;
024import io.rsocket.RSocket;
025import io.rsocket.transport.ClientTransport;
026import io.rsocket.transport.netty.client.TcpClientTransport;
027import io.rsocket.transport.netty.client.WebsocketClientTransport;
028import org.reactivestreams.Publisher;
029import reactor.core.publisher.Flux;
030import reactor.core.publisher.Mono;
031
032import org.springframework.core.ParameterizedTypeReference;
033import org.springframework.core.ReactiveAdapterRegistry;
034import org.springframework.core.codec.Decoder;
035import org.springframework.lang.Nullable;
036import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
037import org.springframework.util.MimeType;
038
039/**
040 * A thin wrapper around a sending {@link RSocket} with a fluent API accepting
041 * and returning higher level Objects for input and for output, along with
042 * methods to prepare routing and other metadata.
043 *
044 * @author Rossen Stoyanchev
045 * @author Brian Clozel
046 * @since 5.2
047 */
048public interface RSocketRequester {
049
050        /**
051         * Return the underlying sending RSocket.
052         */
053        RSocket rsocket();
054
055        /**
056         * Return the data {@code MimeType} selected for the underlying RSocket
057         * at connection time. On the client side this is configured via
058         * {@link RSocketRequester.Builder#dataMimeType(MimeType)} while on the
059         * server side it's obtained from the {@link ConnectionSetupPayload}.
060         */
061        MimeType dataMimeType();
062
063        /**
064         * Return the metadata {@code MimeType} selected for the underlying RSocket
065         * at connection time. On the client side this is configured via
066         * {@link RSocketRequester.Builder#metadataMimeType(MimeType)} while on the
067         * server side it's obtained from the {@link ConnectionSetupPayload}.
068         */
069        MimeType metadataMimeType();
070
071        /**
072         * Begin to specify a new request with the given route to a remote handler.
073         * <p>The route can be a template with placeholders, e.g.
074         * {@code "flight.{code}"} in which case the supplied route variables are
075         * formatted via {@code toString()} and expanded into the template.
076         * If a formatted variable contains a "." it is replaced with the escape
077         * sequence "%2E" to avoid treating it as separator by the responder .
078         * <p>If the connection is set to use composite metadata, the route is
079         * encoded as {@code "message/x.rsocket.routing.v0"}. Otherwise the route
080         * is encoded according to the mime type for the connection.
081         * @param route the route expressing a remote handler mapping
082         * @param routeVars variables to be expanded into the route template
083         * @return a spec for further defining and executing the request
084         */
085        RequestSpec route(String route, Object... routeVars);
086
087        /**
088         * Begin to specify a new request with the given metadata value, which can
089         * be a concrete value or any producer of a single value that can be adapted
090         * to a {@link Publisher} via {@link ReactiveAdapterRegistry}.
091         * @param metadata the metadata value to encode
092         * @param mimeType the mime type that describes the metadata;
093         * This is required for connection using composite metadata. Otherwise the
094         * value is encoded according to the mime type for the connection and this
095         * argument may be left as {@code null}.
096         */
097        RequestSpec metadata(Object metadata, @Nullable MimeType mimeType);
098
099
100        /**
101         * Obtain a builder to create a client {@link RSocketRequester} by connecting
102         * to an RSocket server.
103         */
104        static RSocketRequester.Builder builder() {
105                return new DefaultRSocketRequesterBuilder();
106        }
107
108        /**
109         * Wrap an existing {@link RSocket}. Typically used in client or server
110         * responders to wrap the {@code RSocket} for the remote side.
111         */
112        static RSocketRequester wrap(
113                        RSocket rsocket, MimeType dataMimeType, MimeType metadataMimeType,
114                        RSocketStrategies strategies) {
115
116                return new DefaultRSocketRequester(rsocket, dataMimeType, metadataMimeType, strategies);
117        }
118
119
120        /**
121         * Builder to create a requester by connecting to a server.
122         */
123        interface Builder {
124
125                /**
126                 * Configure the payload data MimeType to specify on the {@code SETUP}
127                 * frame that applies to the whole connection.
128                 * <p>If not set, this will be initialized to the MimeType of the first
129                 * {@link RSocketStrategies.Builder#decoder(Decoder[])  non-default}
130                 * {@code Decoder}, or otherwise the MimeType of the first decoder.
131                 */
132                RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType);
133
134                /**
135                 * Configure the payload metadata MimeType to specify on the {@code SETUP}
136                 * frame that applies to the whole connection.
137                 * <p>By default this is set to
138                 * {@code "message/x.rsocket.composite-metadata.v0"} in which case the
139                 * route, if provided, is encoded as a {@code "message/x.rsocket.routing.v0"}
140                 * composite metadata entry. If this is set to any other MimeType, it is
141                 * assumed that's the MimeType for the route, if provided.
142                 */
143                RSocketRequester.Builder metadataMimeType(MimeType mimeType);
144
145                /**
146                 * Set the data for the setup payload. The data will be encoded
147                 * according to the configured {@link #dataMimeType(MimeType)}.
148                 * The data be a concrete value or any producer of a single value that
149                 * can be adapted to a {@link Publisher} via {@link ReactiveAdapterRegistry}.
150                 * <p>By default this is not set.
151                 */
152                RSocketRequester.Builder setupData(Object data);
153
154                /**
155                 * Set the route for the setup payload. The rules for formatting and
156                 * encoding the route are the same as those for a request route as
157                 * described in {@link #route(String, Object...)}.
158                 * <p>By default this is not set.
159                 */
160                RSocketRequester.Builder setupRoute(String route, Object... routeVars);
161
162                /**
163                 * Add metadata entry to the setup payload. Composite metadata must be
164                 * in use if this is called more than once or in addition to
165                 * {@link #setupRoute(String, Object...)}. The metadata value be a
166                 * concrete value or any producer of a single value that can be adapted
167                 * to a {@link Publisher} via {@link ReactiveAdapterRegistry}.
168                 */
169                RSocketRequester.Builder setupMetadata(Object value, @Nullable MimeType mimeType);
170
171                /**
172                 * Provide the {@link RSocketStrategies} to use.
173                 * <p>This is useful for changing the default settings, yet still allowing
174                 * further customizations via {@link #rsocketStrategies(Consumer)}.
175                 * If not set, defaults are obtained from {@link RSocketStrategies#builder()}.
176                 * @param strategies the strategies to use
177                 */
178                RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies);
179
180                /**
181                 * Customize the {@link RSocketStrategies}.
182                 * <p>Allows further customization on {@link RSocketStrategies},
183                 * mutating them if they were {@link #rsocketStrategies(RSocketStrategies) set},
184                 * or starting from {@link RSocketStrategies#builder()} defaults}.
185                 */
186                RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer);
187
188                /**
189                 * Callback to configure the {@code RSocketConnector} directly.
190                 * <ul>
191                 * <li>The data and metadata mime types cannot be set directly
192                 * on the {@code RSocketConnector} and will be overridden. Use the
193                 * shortcuts {@link #dataMimeType(MimeType)} and
194                 * {@link #metadataMimeType(MimeType)} on this builder instead.
195                 * <li>The frame decoder also cannot be set directly and instead is set
196                 * to match the configured {@code DataBufferFactory}.
197                 * <li>For the
198                 * {@link io.rsocket.core.RSocketConnector#setupPayload(Payload)
199                 * setupPayload}, consider using methods on this builder to specify the
200                 * route, other metadata, and data as Object values to be encoded.
201                 * <li>To configure client side responding, see
202                 * {@link RSocketMessageHandler#responder(RSocketStrategies, Object...)}.
203                 * </ul>
204                 * @since 5.2.6
205                 */
206                RSocketRequester.Builder rsocketConnector(RSocketConnectorConfigurer configurer);
207
208                /**
209                 * Callback to configure the {@code ClientRSocketFactory} directly.
210                 * <ul>
211                 * <li>The data and metadata mime types cannot be set directly
212                 * on the {@code ClientRSocketFactory} and will be overridden. Use the
213                 * shortcuts {@link #dataMimeType(MimeType)} and
214                 * {@link #metadataMimeType(MimeType)} on this builder instead.
215                 * <li>The frame decoder also cannot be set directly and instead is set
216                 * to match the configured {@code DataBufferFactory}.
217                 * <li>For the
218                 * {@link io.rsocket.RSocketFactory.ClientRSocketFactory#setupPayload(Payload)
219                 * setupPayload}, consider using methods on this builder to specify the
220                 * route, other metadata, and data as Object values to be encoded.
221                 * <li>To configure client side responding, see
222                 * {@link RSocketMessageHandler#clientResponder(RSocketStrategies, Object...)}.
223                 * </ul>
224                 * @deprecated as of 5.2.6 following the deprecation of
225                 * {@link io.rsocket.RSocketFactory.ClientRSocketFactory RSocketFactory.ClientRSocketFactory}
226                 * in RSocket 1.0 RC7. Please, use {@link #rsocketConnector(RSocketConnectorConfigurer)}.
227                 */
228                @Deprecated
229                RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer);
230
231                /**
232                 * Configure this builder through a {@code Consumer}. This enables
233                 * libraries such as Spring Security to provide shortcuts for applying
234                 * a set of related customizations.
235                 * @param configurer the configurer to apply
236                 */
237                RSocketRequester.Builder apply(Consumer<RSocketRequester.Builder> configurer);
238
239                /**
240                 * Connect to the server over TCP.
241                 * @param host the server host
242                 * @param port the server port
243                 * @return an {@code RSocketRequester} for the connection
244                 * @see TcpClientTransport
245                 */
246                Mono<RSocketRequester> connectTcp(String host, int port);
247
248                /**
249                 * Connect to the server over WebSocket.
250                 * @param uri the RSocket server endpoint URI
251                 * @return an {@code RSocketRequester} for the connection
252                 * @see WebsocketClientTransport
253                 */
254                Mono<RSocketRequester> connectWebSocket(URI uri);
255
256                /**
257                 * Connect to the server with the given {@code ClientTransport}.
258                 * @param transport the client transport to use
259                 * @return an {@code RSocketRequester} for the connection
260                 */
261                Mono<RSocketRequester> connect(ClientTransport transport);
262
263        }
264
265        /**
266         * Spec to declare the input for an RSocket request.
267         */
268        interface RequestSpec extends MetadataSpec<RequestSpec>, RetrieveSpec {
269
270                /**
271                 * Append additional metadata entries through a {@code Consumer}.
272                 * This enables libraries such as Spring Security to provide shortcuts
273                 * for applying a set of customizations.
274                 * @param configurer the configurer to apply
275                 * @throws IllegalArgumentException if not using composite metadata.
276                 */
277                RequestSpec metadata(Consumer<MetadataSpec<?>> configurer);
278
279                /**
280                 * Provide payload data for the request. This can be one of:
281                 * <ul>
282                 * <li>Concrete value
283                 * <li>{@link Publisher} of value(s)
284                 * <li>Any other producer of value(s) that can be adapted to a
285                 * {@link Publisher} via {@link ReactiveAdapterRegistry}
286                 * </ul>
287                 * @param data the Object value for the payload data
288                 * @return spec to declare the expected response
289                 */
290                RetrieveSpec data(Object data);
291
292                /**
293                 * Variant of {@link #data(Object)} that also accepts a hint for the
294                 * types of values that will be produced. The class hint is used to
295                 * find a compatible {@code Encoder} once, up front vs per value.
296                 * @param producer the source of payload data value(s). This must be a
297                 * {@link Publisher} or another producer adaptable to a
298                 * {@code Publisher} via {@link ReactiveAdapterRegistry}
299                 * @param elementClass the type of values to be produced
300                 * @return spec to declare the expected response
301                 */
302                RetrieveSpec data(Object producer, Class<?> elementClass);
303
304                /**
305                 * Variant of {@link #data(Object, Class)} for when the type hint has
306                 * to have a generic type. See {@link ParameterizedTypeReference}.
307                 * @param producer the source of payload data value(s). This must be a
308                 * {@link Publisher} or another producer adaptable to a
309                 * {@code Publisher} via {@link ReactiveAdapterRegistry}
310                 * @param elementTypeRef the type of values to be produced
311                 * @return spec to declare the expected response
312                 */
313                RetrieveSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
314        }
315
316
317        /**
318         * Spec for providing additional composite metadata entries.
319         *
320         * @param <S> a self reference to the spec type
321         */
322        interface MetadataSpec<S extends MetadataSpec<S>> {
323
324                /**
325                 * Use this to append additional metadata entries when using composite
326                 * metadata. An {@link IllegalArgumentException} is raised if this
327                 * method is used when not using composite metadata.
328                 * The metadata value be a concrete value or any producer of a single
329                 * value that can be adapted to a {@link Publisher} via
330                 * {@link ReactiveAdapterRegistry}.
331                 * @param metadata an Object to be encoded with a suitable
332                 * {@link org.springframework.core.codec.Encoder Encoder}, or a
333                 * {@link org.springframework.core.io.buffer.DataBuffer DataBuffer}
334                 * @param mimeType the mime type that describes the metadata
335                 */
336                S metadata(Object metadata, MimeType mimeType);
337        }
338
339
340        /**
341         * Spec to declare the expected output for an RSocket request.
342         * @since 5.2.2
343         */
344        interface RetrieveSpec {
345
346                /**
347                 * Perform a {@link RSocket#fireAndForget fireAndForget}.
348                 */
349                Mono<Void> send();
350
351                /**
352                 * Perform a {@link RSocket#requestResponse requestResponse} exchange.
353                 * <p>If the return type is {@code Mono<Void>}, the {@code Mono} will
354                 * complete after all data is consumed.
355                 * <p><strong>Note:</strong> This method will raise an error if
356                 * the request payload is a multi-valued {@link Publisher} as there is
357                 * no many-to-one RSocket interaction.
358                 * @param dataType the expected data type for the response
359                 * @param <T> parameter for the expected data type
360                 * @return the decoded response
361                 */
362                <T> Mono<T> retrieveMono(Class<T> dataType);
363
364                /**
365                 * Variant of {@link #retrieveMono(Class)} for when the dataType has
366                 * to have a generic type. See {@link ParameterizedTypeReference}.
367                 */
368                <T> Mono<T> retrieveMono(ParameterizedTypeReference<T> dataTypeRef);
369
370                /**
371                 * Perform an {@link RSocket#requestStream requestStream} or a
372                 * {@link RSocket#requestChannel requestChannel} exchange depending on
373                 * whether the request input is single or multi-payload.
374                 * <p>If the return type is {@code Flux<Void>}, the {@code Flux} will
375                 * complete after all data is consumed.
376                 * @param dataType the expected type for values in the response
377                 * @param <T> parameterize the expected type of values
378                 * @return the decoded response
379                 */
380                <T> Flux<T> retrieveFlux(Class<T> dataType);
381
382                /**
383                 * Variant of {@link #retrieveFlux(Class)} for when the dataType has
384                 * to have a generic type. See {@link ParameterizedTypeReference}.
385                 */
386                <T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef);
387        }
388
389}