001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.rsocket; 018 019import java.net.URI; 020import java.util.function.Consumer; 021 022import io.rsocket.ConnectionSetupPayload; 023import io.rsocket.Payload; 024import io.rsocket.RSocket; 025import io.rsocket.transport.ClientTransport; 026import io.rsocket.transport.netty.client.TcpClientTransport; 027import io.rsocket.transport.netty.client.WebsocketClientTransport; 028import org.reactivestreams.Publisher; 029import reactor.core.publisher.Flux; 030import reactor.core.publisher.Mono; 031 032import org.springframework.core.ParameterizedTypeReference; 033import org.springframework.core.ReactiveAdapterRegistry; 034import org.springframework.core.codec.Decoder; 035import org.springframework.lang.Nullable; 036import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; 037import org.springframework.util.MimeType; 038 039/** 040 * A thin wrapper around a sending {@link RSocket} with a fluent API accepting 041 * and returning higher level Objects for input and for output, along with 042 * methods to prepare routing and other metadata. 043 * 044 * @author Rossen Stoyanchev 045 * @author Brian Clozel 046 * @since 5.2 047 */ 048public interface RSocketRequester { 049 050 /** 051 * Return the underlying sending RSocket. 052 */ 053 RSocket rsocket(); 054 055 /** 056 * Return the data {@code MimeType} selected for the underlying RSocket 057 * at connection time. On the client side this is configured via 058 * {@link RSocketRequester.Builder#dataMimeType(MimeType)} while on the 059 * server side it's obtained from the {@link ConnectionSetupPayload}. 060 */ 061 MimeType dataMimeType(); 062 063 /** 064 * Return the metadata {@code MimeType} selected for the underlying RSocket 065 * at connection time. On the client side this is configured via 066 * {@link RSocketRequester.Builder#metadataMimeType(MimeType)} while on the 067 * server side it's obtained from the {@link ConnectionSetupPayload}. 068 */ 069 MimeType metadataMimeType(); 070 071 /** 072 * Begin to specify a new request with the given route to a remote handler. 073 * <p>The route can be a template with placeholders, e.g. 074 * {@code "flight.{code}"} in which case the supplied route variables are 075 * formatted via {@code toString()} and expanded into the template. 076 * If a formatted variable contains a "." it is replaced with the escape 077 * sequence "%2E" to avoid treating it as separator by the responder . 078 * <p>If the connection is set to use composite metadata, the route is 079 * encoded as {@code "message/x.rsocket.routing.v0"}. Otherwise the route 080 * is encoded according to the mime type for the connection. 081 * @param route the route expressing a remote handler mapping 082 * @param routeVars variables to be expanded into the route template 083 * @return a spec for further defining and executing the request 084 */ 085 RequestSpec route(String route, Object... routeVars); 086 087 /** 088 * Begin to specify a new request with the given metadata value, which can 089 * be a concrete value or any producer of a single value that can be adapted 090 * to a {@link Publisher} via {@link ReactiveAdapterRegistry}. 091 * @param metadata the metadata value to encode 092 * @param mimeType the mime type that describes the metadata; 093 * This is required for connection using composite metadata. Otherwise the 094 * value is encoded according to the mime type for the connection and this 095 * argument may be left as {@code null}. 096 */ 097 RequestSpec metadata(Object metadata, @Nullable MimeType mimeType); 098 099 100 /** 101 * Obtain a builder to create a client {@link RSocketRequester} by connecting 102 * to an RSocket server. 103 */ 104 static RSocketRequester.Builder builder() { 105 return new DefaultRSocketRequesterBuilder(); 106 } 107 108 /** 109 * Wrap an existing {@link RSocket}. Typically used in client or server 110 * responders to wrap the {@code RSocket} for the remote side. 111 */ 112 static RSocketRequester wrap( 113 RSocket rsocket, MimeType dataMimeType, MimeType metadataMimeType, 114 RSocketStrategies strategies) { 115 116 return new DefaultRSocketRequester(rsocket, dataMimeType, metadataMimeType, strategies); 117 } 118 119 120 /** 121 * Builder to create a requester by connecting to a server. 122 */ 123 interface Builder { 124 125 /** 126 * Configure the payload data MimeType to specify on the {@code SETUP} 127 * frame that applies to the whole connection. 128 * <p>If not set, this will be initialized to the MimeType of the first 129 * {@link RSocketStrategies.Builder#decoder(Decoder[]) non-default} 130 * {@code Decoder}, or otherwise the MimeType of the first decoder. 131 */ 132 RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType); 133 134 /** 135 * Configure the payload metadata MimeType to specify on the {@code SETUP} 136 * frame that applies to the whole connection. 137 * <p>By default this is set to 138 * {@code "message/x.rsocket.composite-metadata.v0"} in which case the 139 * route, if provided, is encoded as a {@code "message/x.rsocket.routing.v0"} 140 * composite metadata entry. If this is set to any other MimeType, it is 141 * assumed that's the MimeType for the route, if provided. 142 */ 143 RSocketRequester.Builder metadataMimeType(MimeType mimeType); 144 145 /** 146 * Set the data for the setup payload. The data will be encoded 147 * according to the configured {@link #dataMimeType(MimeType)}. 148 * The data be a concrete value or any producer of a single value that 149 * can be adapted to a {@link Publisher} via {@link ReactiveAdapterRegistry}. 150 * <p>By default this is not set. 151 */ 152 RSocketRequester.Builder setupData(Object data); 153 154 /** 155 * Set the route for the setup payload. The rules for formatting and 156 * encoding the route are the same as those for a request route as 157 * described in {@link #route(String, Object...)}. 158 * <p>By default this is not set. 159 */ 160 RSocketRequester.Builder setupRoute(String route, Object... routeVars); 161 162 /** 163 * Add metadata entry to the setup payload. Composite metadata must be 164 * in use if this is called more than once or in addition to 165 * {@link #setupRoute(String, Object...)}. The metadata value be a 166 * concrete value or any producer of a single value that can be adapted 167 * to a {@link Publisher} via {@link ReactiveAdapterRegistry}. 168 */ 169 RSocketRequester.Builder setupMetadata(Object value, @Nullable MimeType mimeType); 170 171 /** 172 * Provide the {@link RSocketStrategies} to use. 173 * <p>This is useful for changing the default settings, yet still allowing 174 * further customizations via {@link #rsocketStrategies(Consumer)}. 175 * If not set, defaults are obtained from {@link RSocketStrategies#builder()}. 176 * @param strategies the strategies to use 177 */ 178 RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies); 179 180 /** 181 * Customize the {@link RSocketStrategies}. 182 * <p>Allows further customization on {@link RSocketStrategies}, 183 * mutating them if they were {@link #rsocketStrategies(RSocketStrategies) set}, 184 * or starting from {@link RSocketStrategies#builder()} defaults}. 185 */ 186 RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer); 187 188 /** 189 * Callback to configure the {@code RSocketConnector} directly. 190 * <ul> 191 * <li>The data and metadata mime types cannot be set directly 192 * on the {@code RSocketConnector} and will be overridden. Use the 193 * shortcuts {@link #dataMimeType(MimeType)} and 194 * {@link #metadataMimeType(MimeType)} on this builder instead. 195 * <li>The frame decoder also cannot be set directly and instead is set 196 * to match the configured {@code DataBufferFactory}. 197 * <li>For the 198 * {@link io.rsocket.core.RSocketConnector#setupPayload(Payload) 199 * setupPayload}, consider using methods on this builder to specify the 200 * route, other metadata, and data as Object values to be encoded. 201 * <li>To configure client side responding, see 202 * {@link RSocketMessageHandler#responder(RSocketStrategies, Object...)}. 203 * </ul> 204 * @since 5.2.6 205 */ 206 RSocketRequester.Builder rsocketConnector(RSocketConnectorConfigurer configurer); 207 208 /** 209 * Callback to configure the {@code ClientRSocketFactory} directly. 210 * <ul> 211 * <li>The data and metadata mime types cannot be set directly 212 * on the {@code ClientRSocketFactory} and will be overridden. Use the 213 * shortcuts {@link #dataMimeType(MimeType)} and 214 * {@link #metadataMimeType(MimeType)} on this builder instead. 215 * <li>The frame decoder also cannot be set directly and instead is set 216 * to match the configured {@code DataBufferFactory}. 217 * <li>For the 218 * {@link io.rsocket.RSocketFactory.ClientRSocketFactory#setupPayload(Payload) 219 * setupPayload}, consider using methods on this builder to specify the 220 * route, other metadata, and data as Object values to be encoded. 221 * <li>To configure client side responding, see 222 * {@link RSocketMessageHandler#clientResponder(RSocketStrategies, Object...)}. 223 * </ul> 224 * @deprecated as of 5.2.6 following the deprecation of 225 * {@link io.rsocket.RSocketFactory.ClientRSocketFactory RSocketFactory.ClientRSocketFactory} 226 * in RSocket 1.0 RC7. Please, use {@link #rsocketConnector(RSocketConnectorConfigurer)}. 227 */ 228 @Deprecated 229 RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer); 230 231 /** 232 * Configure this builder through a {@code Consumer}. This enables 233 * libraries such as Spring Security to provide shortcuts for applying 234 * a set of related customizations. 235 * @param configurer the configurer to apply 236 */ 237 RSocketRequester.Builder apply(Consumer<RSocketRequester.Builder> configurer); 238 239 /** 240 * Connect to the server over TCP. 241 * @param host the server host 242 * @param port the server port 243 * @return an {@code RSocketRequester} for the connection 244 * @see TcpClientTransport 245 */ 246 Mono<RSocketRequester> connectTcp(String host, int port); 247 248 /** 249 * Connect to the server over WebSocket. 250 * @param uri the RSocket server endpoint URI 251 * @return an {@code RSocketRequester} for the connection 252 * @see WebsocketClientTransport 253 */ 254 Mono<RSocketRequester> connectWebSocket(URI uri); 255 256 /** 257 * Connect to the server with the given {@code ClientTransport}. 258 * @param transport the client transport to use 259 * @return an {@code RSocketRequester} for the connection 260 */ 261 Mono<RSocketRequester> connect(ClientTransport transport); 262 263 } 264 265 /** 266 * Spec to declare the input for an RSocket request. 267 */ 268 interface RequestSpec extends MetadataSpec<RequestSpec>, RetrieveSpec { 269 270 /** 271 * Append additional metadata entries through a {@code Consumer}. 272 * This enables libraries such as Spring Security to provide shortcuts 273 * for applying a set of customizations. 274 * @param configurer the configurer to apply 275 * @throws IllegalArgumentException if not using composite metadata. 276 */ 277 RequestSpec metadata(Consumer<MetadataSpec<?>> configurer); 278 279 /** 280 * Provide payload data for the request. This can be one of: 281 * <ul> 282 * <li>Concrete value 283 * <li>{@link Publisher} of value(s) 284 * <li>Any other producer of value(s) that can be adapted to a 285 * {@link Publisher} via {@link ReactiveAdapterRegistry} 286 * </ul> 287 * @param data the Object value for the payload data 288 * @return spec to declare the expected response 289 */ 290 RetrieveSpec data(Object data); 291 292 /** 293 * Variant of {@link #data(Object)} that also accepts a hint for the 294 * types of values that will be produced. The class hint is used to 295 * find a compatible {@code Encoder} once, up front vs per value. 296 * @param producer the source of payload data value(s). This must be a 297 * {@link Publisher} or another producer adaptable to a 298 * {@code Publisher} via {@link ReactiveAdapterRegistry} 299 * @param elementClass the type of values to be produced 300 * @return spec to declare the expected response 301 */ 302 RetrieveSpec data(Object producer, Class<?> elementClass); 303 304 /** 305 * Variant of {@link #data(Object, Class)} for when the type hint has 306 * to have a generic type. See {@link ParameterizedTypeReference}. 307 * @param producer the source of payload data value(s). This must be a 308 * {@link Publisher} or another producer adaptable to a 309 * {@code Publisher} via {@link ReactiveAdapterRegistry} 310 * @param elementTypeRef the type of values to be produced 311 * @return spec to declare the expected response 312 */ 313 RetrieveSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef); 314 } 315 316 317 /** 318 * Spec for providing additional composite metadata entries. 319 * 320 * @param <S> a self reference to the spec type 321 */ 322 interface MetadataSpec<S extends MetadataSpec<S>> { 323 324 /** 325 * Use this to append additional metadata entries when using composite 326 * metadata. An {@link IllegalArgumentException} is raised if this 327 * method is used when not using composite metadata. 328 * The metadata value be a concrete value or any producer of a single 329 * value that can be adapted to a {@link Publisher} via 330 * {@link ReactiveAdapterRegistry}. 331 * @param metadata an Object to be encoded with a suitable 332 * {@link org.springframework.core.codec.Encoder Encoder}, or a 333 * {@link org.springframework.core.io.buffer.DataBuffer DataBuffer} 334 * @param mimeType the mime type that describes the metadata 335 */ 336 S metadata(Object metadata, MimeType mimeType); 337 } 338 339 340 /** 341 * Spec to declare the expected output for an RSocket request. 342 * @since 5.2.2 343 */ 344 interface RetrieveSpec { 345 346 /** 347 * Perform a {@link RSocket#fireAndForget fireAndForget}. 348 */ 349 Mono<Void> send(); 350 351 /** 352 * Perform a {@link RSocket#requestResponse requestResponse} exchange. 353 * <p>If the return type is {@code Mono<Void>}, the {@code Mono} will 354 * complete after all data is consumed. 355 * <p><strong>Note:</strong> This method will raise an error if 356 * the request payload is a multi-valued {@link Publisher} as there is 357 * no many-to-one RSocket interaction. 358 * @param dataType the expected data type for the response 359 * @param <T> parameter for the expected data type 360 * @return the decoded response 361 */ 362 <T> Mono<T> retrieveMono(Class<T> dataType); 363 364 /** 365 * Variant of {@link #retrieveMono(Class)} for when the dataType has 366 * to have a generic type. See {@link ParameterizedTypeReference}. 367 */ 368 <T> Mono<T> retrieveMono(ParameterizedTypeReference<T> dataTypeRef); 369 370 /** 371 * Perform an {@link RSocket#requestStream requestStream} or a 372 * {@link RSocket#requestChannel requestChannel} exchange depending on 373 * whether the request input is single or multi-payload. 374 * <p>If the return type is {@code Flux<Void>}, the {@code Flux} will 375 * complete after all data is consumed. 376 * @param dataType the expected type for values in the response 377 * @param <T> parameterize the expected type of values 378 * @return the decoded response 379 */ 380 <T> Flux<T> retrieveFlux(Class<T> dataType); 381 382 /** 383 * Variant of {@link #retrieveFlux(Class)} for when the dataType has 384 * to have a generic type. See {@link ParameterizedTypeReference}. 385 */ 386 <T> Flux<T> retrieveFlux(ParameterizedTypeReference<T> dataTypeRef); 387 } 388 389}