001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.handler.invocation.reactive;
018
019import java.lang.reflect.Method;
020import java.util.Collections;
021import java.util.List;
022import java.util.Map;
023
024import kotlin.reflect.KFunction;
025import kotlin.reflect.jvm.ReflectJvmMapping;
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028import org.reactivestreams.Publisher;
029import reactor.core.publisher.Flux;
030import reactor.core.publisher.Mono;
031
032import org.springframework.core.KotlinDetector;
033import org.springframework.core.MethodParameter;
034import org.springframework.core.ReactiveAdapter;
035import org.springframework.core.ReactiveAdapterRegistry;
036import org.springframework.core.ResolvableType;
037import org.springframework.core.codec.Encoder;
038import org.springframework.core.io.buffer.DataBuffer;
039import org.springframework.core.io.buffer.DataBufferFactory;
040import org.springframework.core.io.buffer.DefaultDataBufferFactory;
041import org.springframework.lang.Nullable;
042import org.springframework.messaging.Message;
043import org.springframework.messaging.MessageHeaders;
044import org.springframework.messaging.MessagingException;
045import org.springframework.util.Assert;
046import org.springframework.util.MimeType;
047
048/**
049 * Base class for a return value handler that encodes return values to
050 * {@code Flux<DataBuffer>} through the configured {@link Encoder}s.
051 *
052 * <p>Subclasses must implement the abstract method
053 * {@link #handleEncodedContent} to handle the resulting encoded content.
054 *
055 * <p>This handler should be ordered last since its {@link #supportsReturnType}
056 * returns {@code true} for any method parameter type.
057 *
058 * @author Rossen Stoyanchev
059 * @since 5.2
060 */
061public abstract class AbstractEncoderMethodReturnValueHandler implements HandlerMethodReturnValueHandler {
062
063        private static final ResolvableType VOID_RESOLVABLE_TYPE = ResolvableType.forClass(Void.class);
064
065        private static final ResolvableType OBJECT_RESOLVABLE_TYPE = ResolvableType.forClass(Object.class);
066
067        private static final String COROUTINES_FLOW_CLASS_NAME = "kotlinx.coroutines.flow.Flow";
068
069
070        protected final Log logger = LogFactory.getLog(getClass());
071
072        private final List<Encoder<?>> encoders;
073
074        private final ReactiveAdapterRegistry adapterRegistry;
075
076        private DataBufferFactory defaultBufferFactory = new DefaultDataBufferFactory();
077
078
079        protected AbstractEncoderMethodReturnValueHandler(List<Encoder<?>> encoders, ReactiveAdapterRegistry registry) {
080                Assert.notEmpty(encoders, "At least one Encoder is required");
081                Assert.notNull(registry, "ReactiveAdapterRegistry is required");
082                this.encoders = Collections.unmodifiableList(encoders);
083                this.adapterRegistry = registry;
084        }
085
086
087        /**
088         * The configured encoders.
089         */
090        public List<Encoder<?>> getEncoders() {
091                return this.encoders;
092        }
093
094        /**
095         * The configured adapter registry.
096         */
097        public ReactiveAdapterRegistry getAdapterRegistry() {
098                return this.adapterRegistry;
099        }
100
101
102        @Override
103        public boolean supportsReturnType(MethodParameter returnType) {
104                // We could check canEncode but we're probably last in order anyway
105                return true;
106        }
107
108        @Override
109        public Mono<Void> handleReturnValue(
110                        @Nullable Object returnValue, MethodParameter returnType, Message<?> message) {
111
112                if (returnValue == null) {
113                        return handleNoContent(returnType, message);
114                }
115
116                DataBufferFactory bufferFactory = (DataBufferFactory) message.getHeaders()
117                                .getOrDefault(HandlerMethodReturnValueHandler.DATA_BUFFER_FACTORY_HEADER, this.defaultBufferFactory);
118
119                MimeType mimeType = (MimeType) message.getHeaders().get(MessageHeaders.CONTENT_TYPE);
120                Flux<DataBuffer> encodedContent = encodeContent(
121                                returnValue, returnType, bufferFactory, mimeType, Collections.emptyMap());
122
123                return new ChannelSendOperator<>(encodedContent, publisher ->
124                                handleEncodedContent(Flux.from(publisher), returnType, message));
125        }
126
127        @SuppressWarnings("unchecked")
128        private Flux<DataBuffer> encodeContent(
129                        @Nullable Object content, MethodParameter returnType, DataBufferFactory bufferFactory,
130                        @Nullable MimeType mimeType, Map<String, Object> hints) {
131
132                ResolvableType returnValueType = ResolvableType.forMethodParameter(returnType);
133                ReactiveAdapter adapter = getAdapterRegistry().getAdapter(returnValueType.resolve(), content);
134
135                Publisher<?> publisher;
136                ResolvableType elementType;
137                if (adapter != null) {
138                        publisher = adapter.toPublisher(content);
139                        boolean isUnwrapped = KotlinDetector.isKotlinReflectPresent() &&
140                                        KotlinDetector.isKotlinType(returnType.getContainingClass()) &&
141                                        KotlinDelegate.isSuspend(returnType.getMethod()) &&
142                                        !COROUTINES_FLOW_CLASS_NAME.equals(returnValueType.toClass().getName());
143                        ResolvableType genericType = isUnwrapped ? returnValueType : returnValueType.getGeneric();
144                        elementType = getElementType(adapter, genericType);
145                }
146                else {
147                        publisher = Mono.justOrEmpty(content);
148                        elementType = (returnValueType.toClass() == Object.class && content != null ?
149                                        ResolvableType.forInstance(content) : returnValueType);
150                }
151
152                if (elementType.resolve() == void.class || elementType.resolve() == Void.class) {
153                        return Flux.from(publisher).cast(DataBuffer.class);
154                }
155
156                Encoder<?> encoder = getEncoder(elementType, mimeType);
157                return Flux.from(publisher).map(value ->
158                                encodeValue(value, elementType, encoder, bufferFactory, mimeType, hints));
159        }
160
161        private ResolvableType getElementType(ReactiveAdapter adapter, ResolvableType type) {
162                if (adapter.isNoValue()) {
163                        return VOID_RESOLVABLE_TYPE;
164                }
165                else if (type != ResolvableType.NONE) {
166                        return type;
167                }
168                else {
169                        return OBJECT_RESOLVABLE_TYPE;
170                }
171        }
172
173        @Nullable
174        @SuppressWarnings("unchecked")
175        private <T> Encoder<T> getEncoder(ResolvableType elementType, @Nullable MimeType mimeType) {
176                for (Encoder<?> encoder : getEncoders()) {
177                        if (encoder.canEncode(elementType, mimeType)) {
178                                return (Encoder<T>) encoder;
179                        }
180                }
181                return null;
182        }
183
184        @SuppressWarnings("unchecked")
185        private <T> DataBuffer encodeValue(
186                        Object element, ResolvableType elementType, @Nullable Encoder<T> encoder,
187                        DataBufferFactory bufferFactory, @Nullable MimeType mimeType,
188                        @Nullable Map<String, Object> hints) {
189
190                if (encoder == null) {
191                        encoder = getEncoder(ResolvableType.forInstance(element), mimeType);
192                        if (encoder == null) {
193                                throw new MessagingException(
194                                                "No encoder for " + elementType + ", current value type is " + element.getClass());
195                        }
196                }
197                return encoder.encodeValue((T) element, bufferFactory, elementType, mimeType, hints);
198        }
199
200        /**
201         * Subclasses implement this method to handle encoded values in some way
202         * such as creating and sending messages.
203         * @param encodedContent the encoded content; each {@code DataBuffer}
204         * represents the fully-aggregated, encoded content for one value
205         * (i.e. payload) returned from the HandlerMethod.
206         * @param returnType return type of the handler method that produced the data
207         * @param message the input message handled by the handler method
208         * @return completion {@code Mono<Void>} for the handling
209         */
210        protected abstract Mono<Void> handleEncodedContent(
211                        Flux<DataBuffer> encodedContent, MethodParameter returnType, Message<?> message);
212
213        /**
214         * Invoked for a {@code null} return value, which could mean a void method
215         * or method returning an async type parameterized by void.
216         * @param returnType return type of the handler method that produced the data
217         * @param message the input message handled by the handler method
218         * @return completion {@code Mono<Void>} for the handling
219         */
220        protected abstract Mono<Void> handleNoContent(MethodParameter returnType, Message<?> message);
221
222
223        /**
224         * Inner class to avoid a hard dependency on Kotlin at runtime.
225         */
226        private static class KotlinDelegate {
227
228                static private boolean isSuspend(@Nullable Method method) {
229                        if (method == null) {
230                                return false;
231                        }
232                        KFunction<?> function = ReflectJvmMapping.getKotlinFunction(method);
233                        return (function != null && function.isSuspend());
234                }
235        }
236
237}