001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.handler.invocation.reactive; 018 019import java.lang.reflect.Method; 020import java.util.Collections; 021import java.util.List; 022import java.util.Map; 023 024import kotlin.reflect.KFunction; 025import kotlin.reflect.jvm.ReflectJvmMapping; 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028import org.reactivestreams.Publisher; 029import reactor.core.publisher.Flux; 030import reactor.core.publisher.Mono; 031 032import org.springframework.core.KotlinDetector; 033import org.springframework.core.MethodParameter; 034import org.springframework.core.ReactiveAdapter; 035import org.springframework.core.ReactiveAdapterRegistry; 036import org.springframework.core.ResolvableType; 037import org.springframework.core.codec.Encoder; 038import org.springframework.core.io.buffer.DataBuffer; 039import org.springframework.core.io.buffer.DataBufferFactory; 040import org.springframework.core.io.buffer.DefaultDataBufferFactory; 041import org.springframework.lang.Nullable; 042import org.springframework.messaging.Message; 043import org.springframework.messaging.MessageHeaders; 044import org.springframework.messaging.MessagingException; 045import org.springframework.util.Assert; 046import org.springframework.util.MimeType; 047 048/** 049 * Base class for a return value handler that encodes return values to 050 * {@code Flux<DataBuffer>} through the configured {@link Encoder}s. 051 * 052 * <p>Subclasses must implement the abstract method 053 * {@link #handleEncodedContent} to handle the resulting encoded content. 054 * 055 * <p>This handler should be ordered last since its {@link #supportsReturnType} 056 * returns {@code true} for any method parameter type. 057 * 058 * @author Rossen Stoyanchev 059 * @since 5.2 060 */ 061public abstract class AbstractEncoderMethodReturnValueHandler implements HandlerMethodReturnValueHandler { 062 063 private static final ResolvableType VOID_RESOLVABLE_TYPE = ResolvableType.forClass(Void.class); 064 065 private static final ResolvableType OBJECT_RESOLVABLE_TYPE = ResolvableType.forClass(Object.class); 066 067 private static final String COROUTINES_FLOW_CLASS_NAME = "kotlinx.coroutines.flow.Flow"; 068 069 070 protected final Log logger = LogFactory.getLog(getClass()); 071 072 private final List<Encoder<?>> encoders; 073 074 private final ReactiveAdapterRegistry adapterRegistry; 075 076 private DataBufferFactory defaultBufferFactory = new DefaultDataBufferFactory(); 077 078 079 protected AbstractEncoderMethodReturnValueHandler(List<Encoder<?>> encoders, ReactiveAdapterRegistry registry) { 080 Assert.notEmpty(encoders, "At least one Encoder is required"); 081 Assert.notNull(registry, "ReactiveAdapterRegistry is required"); 082 this.encoders = Collections.unmodifiableList(encoders); 083 this.adapterRegistry = registry; 084 } 085 086 087 /** 088 * The configured encoders. 089 */ 090 public List<Encoder<?>> getEncoders() { 091 return this.encoders; 092 } 093 094 /** 095 * The configured adapter registry. 096 */ 097 public ReactiveAdapterRegistry getAdapterRegistry() { 098 return this.adapterRegistry; 099 } 100 101 102 @Override 103 public boolean supportsReturnType(MethodParameter returnType) { 104 // We could check canEncode but we're probably last in order anyway 105 return true; 106 } 107 108 @Override 109 public Mono<Void> handleReturnValue( 110 @Nullable Object returnValue, MethodParameter returnType, Message<?> message) { 111 112 if (returnValue == null) { 113 return handleNoContent(returnType, message); 114 } 115 116 DataBufferFactory bufferFactory = (DataBufferFactory) message.getHeaders() 117 .getOrDefault(HandlerMethodReturnValueHandler.DATA_BUFFER_FACTORY_HEADER, this.defaultBufferFactory); 118 119 MimeType mimeType = (MimeType) message.getHeaders().get(MessageHeaders.CONTENT_TYPE); 120 Flux<DataBuffer> encodedContent = encodeContent( 121 returnValue, returnType, bufferFactory, mimeType, Collections.emptyMap()); 122 123 return new ChannelSendOperator<>(encodedContent, publisher -> 124 handleEncodedContent(Flux.from(publisher), returnType, message)); 125 } 126 127 @SuppressWarnings("unchecked") 128 private Flux<DataBuffer> encodeContent( 129 @Nullable Object content, MethodParameter returnType, DataBufferFactory bufferFactory, 130 @Nullable MimeType mimeType, Map<String, Object> hints) { 131 132 ResolvableType returnValueType = ResolvableType.forMethodParameter(returnType); 133 ReactiveAdapter adapter = getAdapterRegistry().getAdapter(returnValueType.resolve(), content); 134 135 Publisher<?> publisher; 136 ResolvableType elementType; 137 if (adapter != null) { 138 publisher = adapter.toPublisher(content); 139 boolean isUnwrapped = KotlinDetector.isKotlinReflectPresent() && 140 KotlinDetector.isKotlinType(returnType.getContainingClass()) && 141 KotlinDelegate.isSuspend(returnType.getMethod()) && 142 !COROUTINES_FLOW_CLASS_NAME.equals(returnValueType.toClass().getName()); 143 ResolvableType genericType = isUnwrapped ? returnValueType : returnValueType.getGeneric(); 144 elementType = getElementType(adapter, genericType); 145 } 146 else { 147 publisher = Mono.justOrEmpty(content); 148 elementType = (returnValueType.toClass() == Object.class && content != null ? 149 ResolvableType.forInstance(content) : returnValueType); 150 } 151 152 if (elementType.resolve() == void.class || elementType.resolve() == Void.class) { 153 return Flux.from(publisher).cast(DataBuffer.class); 154 } 155 156 Encoder<?> encoder = getEncoder(elementType, mimeType); 157 return Flux.from(publisher).map(value -> 158 encodeValue(value, elementType, encoder, bufferFactory, mimeType, hints)); 159 } 160 161 private ResolvableType getElementType(ReactiveAdapter adapter, ResolvableType type) { 162 if (adapter.isNoValue()) { 163 return VOID_RESOLVABLE_TYPE; 164 } 165 else if (type != ResolvableType.NONE) { 166 return type; 167 } 168 else { 169 return OBJECT_RESOLVABLE_TYPE; 170 } 171 } 172 173 @Nullable 174 @SuppressWarnings("unchecked") 175 private <T> Encoder<T> getEncoder(ResolvableType elementType, @Nullable MimeType mimeType) { 176 for (Encoder<?> encoder : getEncoders()) { 177 if (encoder.canEncode(elementType, mimeType)) { 178 return (Encoder<T>) encoder; 179 } 180 } 181 return null; 182 } 183 184 @SuppressWarnings("unchecked") 185 private <T> DataBuffer encodeValue( 186 Object element, ResolvableType elementType, @Nullable Encoder<T> encoder, 187 DataBufferFactory bufferFactory, @Nullable MimeType mimeType, 188 @Nullable Map<String, Object> hints) { 189 190 if (encoder == null) { 191 encoder = getEncoder(ResolvableType.forInstance(element), mimeType); 192 if (encoder == null) { 193 throw new MessagingException( 194 "No encoder for " + elementType + ", current value type is " + element.getClass()); 195 } 196 } 197 return encoder.encodeValue((T) element, bufferFactory, elementType, mimeType, hints); 198 } 199 200 /** 201 * Subclasses implement this method to handle encoded values in some way 202 * such as creating and sending messages. 203 * @param encodedContent the encoded content; each {@code DataBuffer} 204 * represents the fully-aggregated, encoded content for one value 205 * (i.e. payload) returned from the HandlerMethod. 206 * @param returnType return type of the handler method that produced the data 207 * @param message the input message handled by the handler method 208 * @return completion {@code Mono<Void>} for the handling 209 */ 210 protected abstract Mono<Void> handleEncodedContent( 211 Flux<DataBuffer> encodedContent, MethodParameter returnType, Message<?> message); 212 213 /** 214 * Invoked for a {@code null} return value, which could mean a void method 215 * or method returning an async type parameterized by void. 216 * @param returnType return type of the handler method that produced the data 217 * @param message the input message handled by the handler method 218 * @return completion {@code Mono<Void>} for the handling 219 */ 220 protected abstract Mono<Void> handleNoContent(MethodParameter returnType, Message<?> message); 221 222 223 /** 224 * Inner class to avoid a hard dependency on Kotlin at runtime. 225 */ 226 private static class KotlinDelegate { 227 228 static private boolean isSuspend(@Nullable Method method) { 229 if (method == null) { 230 return false; 231 } 232 KFunction<?> function = ReflectJvmMapping.getKotlinFunction(method); 233 return (function != null && function.isSuspend()); 234 } 235 } 236 237}