001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.handler.annotation.reactive;
018
019import java.lang.annotation.Annotation;
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.List;
023import java.util.Map;
024import java.util.function.Consumer;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.reactivestreams.Publisher;
029import reactor.core.publisher.Flux;
030import reactor.core.publisher.Mono;
031
032import org.springframework.core.Conventions;
033import org.springframework.core.MethodParameter;
034import org.springframework.core.ReactiveAdapter;
035import org.springframework.core.ReactiveAdapterRegistry;
036import org.springframework.core.ResolvableType;
037import org.springframework.core.annotation.AnnotationUtils;
038import org.springframework.core.codec.Decoder;
039import org.springframework.core.codec.DecodingException;
040import org.springframework.core.io.buffer.DataBuffer;
041import org.springframework.lang.Nullable;
042import org.springframework.messaging.Message;
043import org.springframework.messaging.MessageHeaders;
044import org.springframework.messaging.handler.annotation.Payload;
045import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException;
046import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException;
047import org.springframework.messaging.handler.invocation.reactive.HandlerMethodArgumentResolver;
048import org.springframework.util.Assert;
049import org.springframework.util.CollectionUtils;
050import org.springframework.util.MimeType;
051import org.springframework.util.MimeTypeUtils;
052import org.springframework.util.ObjectUtils;
053import org.springframework.util.StringUtils;
054import org.springframework.validation.BeanPropertyBindingResult;
055import org.springframework.validation.SmartValidator;
056import org.springframework.validation.Validator;
057import org.springframework.validation.annotation.Validated;
058
059/**
060 * A resolver to extract and decode the payload of a message using a
061 * {@link Decoder}, where the payload is expected to be a {@link Publisher} of
062 * {@link DataBuffer DataBuffer}.
063 *
064 * <p>Validation is applied if the method argument is annotated with
065 * {@code @javax.validation.Valid} or
066 * {@link org.springframework.validation.annotation.Validated}. Validation
067 * failure results in an {@link MethodArgumentNotValidException}.
068 *
069 * <p>This resolver should be ordered last if {@link #useDefaultResolution} is
070 * set to {@code true} since in that case it supports all types and does not
071 * require the presence of {@link Payload}.
072 *
073 * @author Rossen Stoyanchev
074 * @since 5.2
075 */
076public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResolver {
077
078        protected final Log logger = LogFactory.getLog(getClass());
079
080
081        private final List<Decoder<?>> decoders;
082
083        @Nullable
084        private final Validator validator;
085
086        private final ReactiveAdapterRegistry adapterRegistry;
087
088        private final boolean useDefaultResolution;
089
090
091        public PayloadMethodArgumentResolver(List<? extends Decoder<?>> decoders, @Nullable Validator validator,
092                        @Nullable ReactiveAdapterRegistry registry, boolean useDefaultResolution) {
093
094                Assert.isTrue(!CollectionUtils.isEmpty(decoders), "At least one Decoder is required");
095                this.decoders = Collections.unmodifiableList(new ArrayList<>(decoders));
096                this.validator = validator;
097                this.adapterRegistry = registry != null ? registry : ReactiveAdapterRegistry.getSharedInstance();
098                this.useDefaultResolution = useDefaultResolution;
099        }
100
101
102        /**
103         * Return a read-only list of the configured decoders.
104         */
105        public List<Decoder<?>> getDecoders() {
106                return this.decoders;
107        }
108
109        /**
110         * Return the configured validator, if any.
111         */
112        @Nullable
113        public Validator getValidator() {
114                return this.validator;
115        }
116
117        /**
118         * Return the configured {@link ReactiveAdapterRegistry}.
119         */
120        public ReactiveAdapterRegistry getAdapterRegistry() {
121                return this.adapterRegistry;
122        }
123
124        /**
125         * Whether this resolver is configured to use default resolution, i.e.
126         * works for any argument type regardless of whether {@code @Payload} is
127         * present or not.
128         */
129        public boolean isUseDefaultResolution() {
130                return this.useDefaultResolution;
131        }
132
133
134        @Override
135        public boolean supportsParameter(MethodParameter parameter) {
136                return parameter.hasParameterAnnotation(Payload.class) || this.useDefaultResolution;
137        }
138
139
140        /**
141         * Decode the content of the given message payload through a compatible
142         * {@link Decoder}.
143         *
144         * <p>Validation is applied if the method argument is annotated with
145         * {@code @javax.validation.Valid} or
146         * {@link org.springframework.validation.annotation.Validated}. Validation
147         * failure results in an {@link MethodArgumentNotValidException}.
148         *
149         * @param parameter the target method argument that we are decoding to
150         * @param message the message from which the content was extracted
151         * @return a Mono with the result of argument resolution
152         *
153         * @see #extractContent(MethodParameter, Message)
154         * @see #getMimeType(Message)
155         */
156        @Override
157        public final Mono<Object> resolveArgument(MethodParameter parameter, Message<?> message) {
158
159                Payload ann = parameter.getParameterAnnotation(Payload.class);
160                if (ann != null && StringUtils.hasText(ann.expression())) {
161                        throw new IllegalStateException("@Payload SpEL expressions not supported by this resolver");
162                }
163
164                MimeType mimeType = getMimeType(message);
165                mimeType = mimeType != null ? mimeType : MimeTypeUtils.APPLICATION_OCTET_STREAM;
166
167                Flux<DataBuffer> content = extractContent(parameter, message);
168                return decodeContent(parameter, message, ann == null || ann.required(), content, mimeType);
169        }
170
171        @SuppressWarnings("unchecked")
172        private Flux<DataBuffer> extractContent(MethodParameter parameter, Message<?> message) {
173                Object payload = message.getPayload();
174                if (payload instanceof DataBuffer) {
175                        return Flux.just((DataBuffer) payload);
176                }
177                if (payload instanceof Publisher) {
178                        return Flux.from((Publisher<?>) payload).map(value -> {
179                                if (value instanceof DataBuffer) {
180                                        return (DataBuffer) value;
181                                }
182                                String className = value.getClass().getName();
183                                throw getUnexpectedPayloadError(message, parameter, "Publisher<" + className + ">");
184                        });
185                }
186                return Flux.error(getUnexpectedPayloadError(message, parameter, payload.getClass().getName()));
187        }
188
189        private MethodArgumentResolutionException getUnexpectedPayloadError(
190                        Message<?> message, MethodParameter parameter, String actualType) {
191
192                return new MethodArgumentResolutionException(message, parameter,
193                                "Expected DataBuffer or Publisher<DataBuffer> for the Message payload, actual: " + actualType);
194        }
195
196        /**
197         * Return the mime type for the content. By default this method checks the
198         * {@link MessageHeaders#CONTENT_TYPE} header expecting to find a
199         * {@link MimeType} value or a String to parse to a {@link MimeType}.
200         * @param message the input message
201         */
202        @Nullable
203        protected MimeType getMimeType(Message<?> message) {
204                Object headerValue = message.getHeaders().get(MessageHeaders.CONTENT_TYPE);
205                if (headerValue == null) {
206                        return null;
207                }
208                else if (headerValue instanceof String) {
209                        return MimeTypeUtils.parseMimeType((String) headerValue);
210                }
211                else if (headerValue instanceof MimeType) {
212                        return (MimeType) headerValue;
213                }
214                else {
215                        throw new IllegalArgumentException("Unexpected MimeType value: " + headerValue);
216                }
217        }
218
219        private Mono<Object> decodeContent(MethodParameter parameter, Message<?> message,
220                        boolean isContentRequired, Flux<DataBuffer> content, MimeType mimeType) {
221
222                ResolvableType targetType = ResolvableType.forMethodParameter(parameter);
223                Class<?> resolvedType = targetType.resolve();
224                ReactiveAdapter adapter = (resolvedType != null ? getAdapterRegistry().getAdapter(resolvedType) : null);
225                ResolvableType elementType = (adapter != null ? targetType.getGeneric() : targetType);
226                isContentRequired = isContentRequired || (adapter != null && !adapter.supportsEmpty());
227                Consumer<Object> validator = getValidator(message, parameter);
228
229                Map<String, Object> hints = Collections.emptyMap();
230
231                for (Decoder<?> decoder : this.decoders) {
232                        if (decoder.canDecode(elementType, mimeType)) {
233                                if (adapter != null && adapter.isMultiValue()) {
234                                        Flux<?> flux = content
235                                                        .map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
236                                                        .onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex)));
237                                        if (isContentRequired) {
238                                                flux = flux.switchIfEmpty(Flux.error(() -> handleMissingBody(parameter, message)));
239                                        }
240                                        if (validator != null) {
241                                                flux = flux.doOnNext(validator);
242                                        }
243                                        return Mono.just(adapter.fromPublisher(flux));
244                                }
245                                else {
246                                        // Single-value (with or without reactive type wrapper)
247                                        Mono<?> mono = content.next()
248                                                        .map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
249                                                        .onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex)));
250                                        if (isContentRequired) {
251                                                mono = mono.switchIfEmpty(Mono.error(() -> handleMissingBody(parameter, message)));
252                                        }
253                                        if (validator != null) {
254                                                mono = mono.doOnNext(validator);
255                                        }
256                                        return (adapter != null ? Mono.just(adapter.fromPublisher(mono)) : Mono.from(mono));
257                                }
258                        }
259                }
260
261                return Mono.error(new MethodArgumentResolutionException(
262                                message, parameter, "Cannot decode to [" + targetType + "]" + message));
263        }
264
265        private Throwable handleReadError(MethodParameter parameter, Message<?> message, Throwable ex) {
266                return ex instanceof DecodingException ?
267                                new MethodArgumentResolutionException(message, parameter, "Failed to read HTTP message", ex) : ex;
268        }
269
270        private MethodArgumentResolutionException handleMissingBody(MethodParameter param, Message<?> message) {
271                return new MethodArgumentResolutionException(message, param,
272                                "Payload content is missing: " + param.getExecutable().toGenericString());
273        }
274
275        @Nullable
276        private Consumer<Object> getValidator(Message<?> message, MethodParameter parameter) {
277                if (this.validator == null) {
278                        return null;
279                }
280                for (Annotation ann : parameter.getParameterAnnotations()) {
281                        Validated validatedAnn = AnnotationUtils.getAnnotation(ann, Validated.class);
282                        if (validatedAnn != null || ann.annotationType().getSimpleName().startsWith("Valid")) {
283                                Object hints = (validatedAnn != null ? validatedAnn.value() : AnnotationUtils.getValue(ann));
284                                Object[] validationHints = (hints instanceof Object[] ? (Object[]) hints : new Object[] {hints});
285                                String name = Conventions.getVariableNameForParameter(parameter);
286                                return target -> {
287                                        BeanPropertyBindingResult bindingResult = new BeanPropertyBindingResult(target, name);
288                                        if (!ObjectUtils.isEmpty(validationHints) && this.validator instanceof SmartValidator) {
289                                                ((SmartValidator) this.validator).validate(target, bindingResult, validationHints);
290                                        }
291                                        else {
292
293                                        }
294                                        if (bindingResult.hasErrors()) {
295                                                throw new MethodArgumentNotValidException(message, parameter, bindingResult);
296                                        }
297                                };
298                        }
299                }
300                return null;
301        }
302
303}