001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.handler.annotation.reactive; 018 019import java.lang.annotation.Annotation; 020import java.util.ArrayList; 021import java.util.Collections; 022import java.util.List; 023import java.util.Map; 024import java.util.function.Consumer; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.reactivestreams.Publisher; 029import reactor.core.publisher.Flux; 030import reactor.core.publisher.Mono; 031 032import org.springframework.core.Conventions; 033import org.springframework.core.MethodParameter; 034import org.springframework.core.ReactiveAdapter; 035import org.springframework.core.ReactiveAdapterRegistry; 036import org.springframework.core.ResolvableType; 037import org.springframework.core.annotation.AnnotationUtils; 038import org.springframework.core.codec.Decoder; 039import org.springframework.core.codec.DecodingException; 040import org.springframework.core.io.buffer.DataBuffer; 041import org.springframework.lang.Nullable; 042import org.springframework.messaging.Message; 043import org.springframework.messaging.MessageHeaders; 044import org.springframework.messaging.handler.annotation.Payload; 045import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException; 046import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException; 047import org.springframework.messaging.handler.invocation.reactive.HandlerMethodArgumentResolver; 048import org.springframework.util.Assert; 049import org.springframework.util.CollectionUtils; 050import org.springframework.util.MimeType; 051import org.springframework.util.MimeTypeUtils; 052import org.springframework.util.ObjectUtils; 053import org.springframework.util.StringUtils; 054import org.springframework.validation.BeanPropertyBindingResult; 055import org.springframework.validation.SmartValidator; 056import org.springframework.validation.Validator; 057import org.springframework.validation.annotation.Validated; 058 059/** 060 * A resolver to extract and decode the payload of a message using a 061 * {@link Decoder}, where the payload is expected to be a {@link Publisher} of 062 * {@link DataBuffer DataBuffer}. 063 * 064 * <p>Validation is applied if the method argument is annotated with 065 * {@code @javax.validation.Valid} or 066 * {@link org.springframework.validation.annotation.Validated}. Validation 067 * failure results in an {@link MethodArgumentNotValidException}. 068 * 069 * <p>This resolver should be ordered last if {@link #useDefaultResolution} is 070 * set to {@code true} since in that case it supports all types and does not 071 * require the presence of {@link Payload}. 072 * 073 * @author Rossen Stoyanchev 074 * @since 5.2 075 */ 076public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResolver { 077 078 protected final Log logger = LogFactory.getLog(getClass()); 079 080 081 private final List<Decoder<?>> decoders; 082 083 @Nullable 084 private final Validator validator; 085 086 private final ReactiveAdapterRegistry adapterRegistry; 087 088 private final boolean useDefaultResolution; 089 090 091 public PayloadMethodArgumentResolver(List<? extends Decoder<?>> decoders, @Nullable Validator validator, 092 @Nullable ReactiveAdapterRegistry registry, boolean useDefaultResolution) { 093 094 Assert.isTrue(!CollectionUtils.isEmpty(decoders), "At least one Decoder is required"); 095 this.decoders = Collections.unmodifiableList(new ArrayList<>(decoders)); 096 this.validator = validator; 097 this.adapterRegistry = registry != null ? registry : ReactiveAdapterRegistry.getSharedInstance(); 098 this.useDefaultResolution = useDefaultResolution; 099 } 100 101 102 /** 103 * Return a read-only list of the configured decoders. 104 */ 105 public List<Decoder<?>> getDecoders() { 106 return this.decoders; 107 } 108 109 /** 110 * Return the configured validator, if any. 111 */ 112 @Nullable 113 public Validator getValidator() { 114 return this.validator; 115 } 116 117 /** 118 * Return the configured {@link ReactiveAdapterRegistry}. 119 */ 120 public ReactiveAdapterRegistry getAdapterRegistry() { 121 return this.adapterRegistry; 122 } 123 124 /** 125 * Whether this resolver is configured to use default resolution, i.e. 126 * works for any argument type regardless of whether {@code @Payload} is 127 * present or not. 128 */ 129 public boolean isUseDefaultResolution() { 130 return this.useDefaultResolution; 131 } 132 133 134 @Override 135 public boolean supportsParameter(MethodParameter parameter) { 136 return parameter.hasParameterAnnotation(Payload.class) || this.useDefaultResolution; 137 } 138 139 140 /** 141 * Decode the content of the given message payload through a compatible 142 * {@link Decoder}. 143 * 144 * <p>Validation is applied if the method argument is annotated with 145 * {@code @javax.validation.Valid} or 146 * {@link org.springframework.validation.annotation.Validated}. Validation 147 * failure results in an {@link MethodArgumentNotValidException}. 148 * 149 * @param parameter the target method argument that we are decoding to 150 * @param message the message from which the content was extracted 151 * @return a Mono with the result of argument resolution 152 * 153 * @see #extractContent(MethodParameter, Message) 154 * @see #getMimeType(Message) 155 */ 156 @Override 157 public final Mono<Object> resolveArgument(MethodParameter parameter, Message<?> message) { 158 159 Payload ann = parameter.getParameterAnnotation(Payload.class); 160 if (ann != null && StringUtils.hasText(ann.expression())) { 161 throw new IllegalStateException("@Payload SpEL expressions not supported by this resolver"); 162 } 163 164 MimeType mimeType = getMimeType(message); 165 mimeType = mimeType != null ? mimeType : MimeTypeUtils.APPLICATION_OCTET_STREAM; 166 167 Flux<DataBuffer> content = extractContent(parameter, message); 168 return decodeContent(parameter, message, ann == null || ann.required(), content, mimeType); 169 } 170 171 @SuppressWarnings("unchecked") 172 private Flux<DataBuffer> extractContent(MethodParameter parameter, Message<?> message) { 173 Object payload = message.getPayload(); 174 if (payload instanceof DataBuffer) { 175 return Flux.just((DataBuffer) payload); 176 } 177 if (payload instanceof Publisher) { 178 return Flux.from((Publisher<?>) payload).map(value -> { 179 if (value instanceof DataBuffer) { 180 return (DataBuffer) value; 181 } 182 String className = value.getClass().getName(); 183 throw getUnexpectedPayloadError(message, parameter, "Publisher<" + className + ">"); 184 }); 185 } 186 return Flux.error(getUnexpectedPayloadError(message, parameter, payload.getClass().getName())); 187 } 188 189 private MethodArgumentResolutionException getUnexpectedPayloadError( 190 Message<?> message, MethodParameter parameter, String actualType) { 191 192 return new MethodArgumentResolutionException(message, parameter, 193 "Expected DataBuffer or Publisher<DataBuffer> for the Message payload, actual: " + actualType); 194 } 195 196 /** 197 * Return the mime type for the content. By default this method checks the 198 * {@link MessageHeaders#CONTENT_TYPE} header expecting to find a 199 * {@link MimeType} value or a String to parse to a {@link MimeType}. 200 * @param message the input message 201 */ 202 @Nullable 203 protected MimeType getMimeType(Message<?> message) { 204 Object headerValue = message.getHeaders().get(MessageHeaders.CONTENT_TYPE); 205 if (headerValue == null) { 206 return null; 207 } 208 else if (headerValue instanceof String) { 209 return MimeTypeUtils.parseMimeType((String) headerValue); 210 } 211 else if (headerValue instanceof MimeType) { 212 return (MimeType) headerValue; 213 } 214 else { 215 throw new IllegalArgumentException("Unexpected MimeType value: " + headerValue); 216 } 217 } 218 219 private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message, 220 boolean isContentRequired, Flux<DataBuffer> content, MimeType mimeType) { 221 222 ResolvableType targetType = ResolvableType.forMethodParameter(parameter); 223 Class<?> resolvedType = targetType.resolve(); 224 ReactiveAdapter adapter = (resolvedType != null ? getAdapterRegistry().getAdapter(resolvedType) : null); 225 ResolvableType elementType = (adapter != null ? targetType.getGeneric() : targetType); 226 isContentRequired = isContentRequired || (adapter != null && !adapter.supportsEmpty()); 227 Consumer<Object> validator = getValidator(message, parameter); 228 229 Map<String, Object> hints = Collections.emptyMap(); 230 231 for (Decoder<?> decoder : this.decoders) { 232 if (decoder.canDecode(elementType, mimeType)) { 233 if (adapter != null && adapter.isMultiValue()) { 234 Flux<?> flux = content 235 .map(buffer -> decoder.decode(buffer, elementType, mimeType, hints)) 236 .onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex))); 237 if (isContentRequired) { 238 flux = flux.switchIfEmpty(Flux.error(() -> handleMissingBody(parameter, message))); 239 } 240 if (validator != null) { 241 flux = flux.doOnNext(validator); 242 } 243 return Mono.just(adapter.fromPublisher(flux)); 244 } 245 else { 246 // Single-value (with or without reactive type wrapper) 247 Mono<?> mono = content.next() 248 .map(buffer -> decoder.decode(buffer, elementType, mimeType, hints)) 249 .onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex))); 250 if (isContentRequired) { 251 mono = mono.switchIfEmpty(Mono.error(() -> handleMissingBody(parameter, message))); 252 } 253 if (validator != null) { 254 mono = mono.doOnNext(validator); 255 } 256 return (adapter != null ? Mono.just(adapter.fromPublisher(mono)) : Mono.from(mono)); 257 } 258 } 259 } 260 261 return Mono.error(new MethodArgumentResolutionException( 262 message, parameter, "Cannot decode to [" + targetType + "]" + message)); 263 } 264 265 private Throwable handleReadError(MethodParameter parameter, Message<?> message, Throwable ex) { 266 return ex instanceof DecodingException ? 267 new MethodArgumentResolutionException(message, parameter, "Failed to read HTTP message", ex) : ex; 268 } 269 270 private MethodArgumentResolutionException handleMissingBody(MethodParameter param, Message<?> message) { 271 return new MethodArgumentResolutionException(message, param, 272 "Payload content is missing: " + param.getExecutable().toGenericString()); 273 } 274 275 @Nullable 276 private Consumer<Object> getValidator(Message<?> message, MethodParameter parameter) { 277 if (this.validator == null) { 278 return null; 279 } 280 for (Annotation ann : parameter.getParameterAnnotations()) { 281 Validated validatedAnn = AnnotationUtils.getAnnotation(ann, Validated.class); 282 if (validatedAnn != null || ann.annotationType().getSimpleName().startsWith("Valid")) { 283 Object hints = (validatedAnn != null ? validatedAnn.value() : AnnotationUtils.getValue(ann)); 284 Object[] validationHints = (hints instanceof Object[] ? (Object[]) hints : new Object[] {hints}); 285 String name = Conventions.getVariableNameForParameter(parameter); 286 return target -> { 287 BeanPropertyBindingResult bindingResult = new BeanPropertyBindingResult(target, name); 288 if (!ObjectUtils.isEmpty(validationHints) && this.validator instanceof SmartValidator) { 289 ((SmartValidator) this.validator).validate(target, bindingResult, validationHints); 290 } 291 else { 292 this.validator.validate(target, bindingResult); 293 } 294 if (bindingResult.hasErrors()) { 295 throw new MethodArgumentNotValidException(message, parameter, bindingResult); 296 } 297 }; 298 } 299 } 300 return null; 301 } 302 303}