001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.reactive.function; 018 019import java.util.List; 020import java.util.Optional; 021import java.util.function.Function; 022import java.util.function.Supplier; 023import java.util.stream.Collectors; 024 025import org.reactivestreams.Publisher; 026import reactor.core.publisher.Flux; 027import reactor.core.publisher.Mono; 028 029import org.springframework.core.ParameterizedTypeReference; 030import org.springframework.core.ResolvableType; 031import org.springframework.core.io.buffer.DataBuffer; 032import org.springframework.core.io.buffer.DataBufferUtils; 033import org.springframework.http.MediaType; 034import org.springframework.http.ReactiveHttpInputMessage; 035import org.springframework.http.client.reactive.ClientHttpResponse; 036import org.springframework.http.codec.HttpMessageReader; 037import org.springframework.http.codec.multipart.Part; 038import org.springframework.http.server.reactive.ServerHttpRequest; 039import org.springframework.util.MultiValueMap; 040 041/** 042 * Static factory methods for {@link BodyExtractor} implementations. 043 * 044 * @author Arjen Poutsma 045 * @author Sebastien Deleuze 046 * @author Rossen Stoyanchev 047 * @author Brian Clozel 048 * @since 5.0 049 */ 050public abstract class BodyExtractors { 051 052 private static final ResolvableType FORM_DATA_TYPE = 053 ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, String.class); 054 055 private static final ResolvableType MULTIPART_DATA_TYPE = ResolvableType.forClassWithGenerics( 056 MultiValueMap.class, String.class, Part.class); 057 058 private static final ResolvableType PART_TYPE = ResolvableType.forClass(Part.class); 059 060 private static final ResolvableType VOID_TYPE = ResolvableType.forClass(Void.class); 061 062 063 /** 064 * Extractor to decode the input content into {@code Mono<T>}. 065 * @param elementClass the class of the element type to decode to 066 * @param <T> the element type to decode to 067 * @return {@code BodyExtractor} for {@code Mono<T>} 068 */ 069 public static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(Class<? extends T> elementClass) { 070 return toMono(ResolvableType.forClass(elementClass)); 071 } 072 073 /** 074 * Variant of {@link #toMono(Class)} for type information with generics. 075 * @param elementTypeRef the type reference for the type to decode to 076 * @param <T> the element type to decode to 077 * @return {@code BodyExtractor} for {@code Mono<T>} 078 */ 079 public static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(ParameterizedTypeReference<T> elementTypeRef) { 080 return toMono(ResolvableType.forType(elementTypeRef.getType())); 081 } 082 083 private static <T> BodyExtractor<Mono<T>, ReactiveHttpInputMessage> toMono(ResolvableType elementType) { 084 return (inputMessage, context) -> 085 readWithMessageReaders(inputMessage, context, elementType, 086 (HttpMessageReader<T> reader) -> readToMono(inputMessage, context, elementType, reader), 087 ex -> Mono.from(unsupportedErrorHandler(inputMessage, ex)), 088 skipBodyAsMono(inputMessage)); 089 } 090 091 /** 092 * Extractor to decode the input content into {@code Flux<T>}. 093 * @param elementClass the class of the element type to decode to 094 * @param <T> the element type to decode to 095 * @return {@code BodyExtractor} for {@code Flux<T>} 096 */ 097 public static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(Class<? extends T> elementClass) { 098 return toFlux(ResolvableType.forClass(elementClass)); 099 } 100 101 /** 102 * Variant of {@link #toFlux(Class)} for type information with generics. 103 * @param typeRef the type reference for the type to decode to 104 * @param <T> the element type to decode to 105 * @return {@code BodyExtractor} for {@code Flux<T>} 106 */ 107 public static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(ParameterizedTypeReference<T> typeRef) { 108 return toFlux(ResolvableType.forType(typeRef.getType())); 109 } 110 111 @SuppressWarnings("unchecked") 112 private static <T> BodyExtractor<Flux<T>, ReactiveHttpInputMessage> toFlux(ResolvableType elementType) { 113 return (inputMessage, context) -> 114 readWithMessageReaders(inputMessage, context, elementType, 115 (HttpMessageReader<T> reader) -> readToFlux(inputMessage, context, elementType, reader), 116 ex -> unsupportedErrorHandler(inputMessage, ex), 117 skipBodyAsFlux(inputMessage)); 118 } 119 120 121 // Extractors for specific content .. 122 123 /** 124 * Extractor to read form data into {@code MultiValueMap<String, String>}. 125 * <p>As of 5.1 this method can also be used on the client side to read form 126 * data from a server response (e.g. OAuth). 127 * @return {@code BodyExtractor} for form data 128 */ 129 public static BodyExtractor<Mono<MultiValueMap<String, String>>, ReactiveHttpInputMessage> toFormData() { 130 return (message, context) -> { 131 ResolvableType elementType = FORM_DATA_TYPE; 132 MediaType mediaType = MediaType.APPLICATION_FORM_URLENCODED; 133 HttpMessageReader<MultiValueMap<String, String>> reader = findReader(elementType, mediaType, context); 134 return readToMono(message, context, elementType, reader); 135 }; 136 } 137 138 /** 139 * Extractor to read multipart data into a {@code MultiValueMap<String, Part>}. 140 * @return {@code BodyExtractor} for multipart data 141 */ 142 // Parameterized for server-side use 143 public static BodyExtractor<Mono<MultiValueMap<String, Part>>, ServerHttpRequest> toMultipartData() { 144 return (serverRequest, context) -> { 145 ResolvableType elementType = MULTIPART_DATA_TYPE; 146 MediaType mediaType = MediaType.MULTIPART_FORM_DATA; 147 HttpMessageReader<MultiValueMap<String, Part>> reader = findReader(elementType, mediaType, context); 148 return readToMono(serverRequest, context, elementType, reader); 149 }; 150 } 151 152 /** 153 * Extractor to read multipart data into {@code Flux<Part>}. 154 * @return {@code BodyExtractor} for multipart request parts 155 */ 156 // Parameterized for server-side use 157 public static BodyExtractor<Flux<Part>, ServerHttpRequest> toParts() { 158 return (serverRequest, context) -> { 159 ResolvableType elementType = PART_TYPE; 160 MediaType mediaType = MediaType.MULTIPART_FORM_DATA; 161 HttpMessageReader<Part> reader = findReader(elementType, mediaType, context); 162 return readToFlux(serverRequest, context, elementType, reader); 163 }; 164 } 165 166 /** 167 * Extractor that returns the raw {@link DataBuffer DataBuffers}. 168 * <p><strong>Note:</strong> the data buffers should be 169 * {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) 170 * released} after being used. 171 * @return {@code BodyExtractor} for data buffers 172 */ 173 public static BodyExtractor<Flux<DataBuffer>, ReactiveHttpInputMessage> toDataBuffers() { 174 return (inputMessage, context) -> inputMessage.getBody(); 175 } 176 177 178 // Private support methods 179 180 private static <T, S extends Publisher<T>> S readWithMessageReaders( 181 ReactiveHttpInputMessage message, BodyExtractor.Context context, ResolvableType elementType, 182 Function<HttpMessageReader<T>, S> readerFunction, 183 Function<UnsupportedMediaTypeException, S> errorFunction, 184 Supplier<S> emptySupplier) { 185 186 if (VOID_TYPE.equals(elementType)) { 187 return emptySupplier.get(); 188 } 189 MediaType contentType = Optional.ofNullable(message.getHeaders().getContentType()) 190 .orElse(MediaType.APPLICATION_OCTET_STREAM); 191 192 return context.messageReaders().stream() 193 .filter(reader -> reader.canRead(elementType, contentType)) 194 .findFirst() 195 .map(BodyExtractors::<T>cast) 196 .map(readerFunction) 197 .orElseGet(() -> { 198 List<MediaType> mediaTypes = context.messageReaders().stream() 199 .flatMap(reader -> reader.getReadableMediaTypes().stream()) 200 .collect(Collectors.toList()); 201 return errorFunction.apply( 202 new UnsupportedMediaTypeException(contentType, mediaTypes, elementType)); 203 }); 204 } 205 206 private static <T> Mono<T> readToMono(ReactiveHttpInputMessage message, BodyExtractor.Context context, 207 ResolvableType type, HttpMessageReader<T> reader) { 208 209 return context.serverResponse() 210 .map(response -> reader.readMono(type, type, (ServerHttpRequest) message, response, context.hints())) 211 .orElseGet(() -> reader.readMono(type, message, context.hints())); 212 } 213 214 private static <T> Flux<T> readToFlux(ReactiveHttpInputMessage message, BodyExtractor.Context context, 215 ResolvableType type, HttpMessageReader<T> reader) { 216 217 return context.serverResponse() 218 .map(response -> reader.read(type, type, (ServerHttpRequest) message, response, context.hints())) 219 .orElseGet(() -> reader.read(type, message, context.hints())); 220 } 221 222 private static <T> Flux<T> unsupportedErrorHandler( 223 ReactiveHttpInputMessage message, UnsupportedMediaTypeException ex) { 224 225 Flux<T> result; 226 if (message.getHeaders().getContentType() == null) { 227 // Maybe it's okay there is no content type, if there is no content.. 228 result = message.getBody().map(buffer -> { 229 DataBufferUtils.release(buffer); 230 throw ex; 231 }); 232 } 233 else { 234 result = message instanceof ClientHttpResponse ? 235 consumeAndCancel(message).thenMany(Flux.error(ex)) : Flux.error(ex); 236 } 237 return result; 238 } 239 240 private static <T> HttpMessageReader<T> findReader( 241 ResolvableType elementType, MediaType mediaType, BodyExtractor.Context context) { 242 243 return context.messageReaders().stream() 244 .filter(messageReader -> messageReader.canRead(elementType, mediaType)) 245 .findFirst() 246 .map(BodyExtractors::<T>cast) 247 .orElseThrow(() -> new IllegalStateException( 248 "No HttpMessageReader for \"" + mediaType + "\" and \"" + elementType + "\"")); 249 } 250 251 @SuppressWarnings("unchecked") 252 private static <T> HttpMessageReader<T> cast(HttpMessageReader<?> reader) { 253 return (HttpMessageReader<T>) reader; 254 } 255 256 private static <T> Supplier<Flux<T>> skipBodyAsFlux(ReactiveHttpInputMessage message) { 257 return message instanceof ClientHttpResponse ? 258 () -> consumeAndCancel(message).thenMany(Mono.empty()) : Flux::empty; 259 } 260 261 @SuppressWarnings("unchecked") 262 private static <T> Supplier<Mono<T>> skipBodyAsMono(ReactiveHttpInputMessage message) { 263 return message instanceof ClientHttpResponse ? 264 () -> consumeAndCancel(message).then(Mono.empty()) : Mono::empty; 265 } 266 267 private static Flux<DataBuffer> consumeAndCancel(ReactiveHttpInputMessage message) { 268 return message.getBody().takeWhile(buffer -> { 269 DataBufferUtils.release(buffer); 270 return false; 271 }); 272 } 273 274}