001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.http.codec.protobuf;
018
019import java.lang.reflect.Method;
020import java.util.HashMap;
021import java.util.Map;
022import java.util.concurrent.ConcurrentMap;
023
024import com.google.protobuf.Descriptors;
025import com.google.protobuf.Message;
026import org.reactivestreams.Publisher;
027import reactor.core.publisher.Flux;
028import reactor.core.publisher.Mono;
029
030import org.springframework.core.ResolvableType;
031import org.springframework.core.codec.DecodingException;
032import org.springframework.core.codec.Encoder;
033import org.springframework.http.MediaType;
034import org.springframework.http.ReactiveHttpOutputMessage;
035import org.springframework.http.codec.EncoderHttpMessageWriter;
036import org.springframework.http.codec.HttpMessageEncoder;
037import org.springframework.lang.Nullable;
038import org.springframework.util.ConcurrentReferenceHashMap;
039
040/**
041 * {@code HttpMessageWriter} that can write a protobuf {@link Message} and adds
042 * {@code X-Protobuf-Schema}, {@code X-Protobuf-Message} headers and a
043 * {@code delimited=true} parameter is added to the content type if a flux is serialized.
044 *
045 * <p>For {@code HttpMessageReader}, just use
046 * {@code new DecoderHttpMessageReader(new ProtobufDecoder())}.
047 *
048 * @author S茅bastien Deleuze
049 * @since 5.1
050 * @see ProtobufEncoder
051 */
052public class ProtobufHttpMessageWriter extends EncoderHttpMessageWriter<Message> {
053
054        private static final String X_PROTOBUF_SCHEMA_HEADER = "X-Protobuf-Schema";
055
056        private static final String X_PROTOBUF_MESSAGE_HEADER = "X-Protobuf-Message";
057
058        private static final ConcurrentMap<Class<?>, Method> methodCache = new ConcurrentReferenceHashMap<>();
059
060
061        /**
062         * Create a new {@code ProtobufHttpMessageWriter} with a default {@link ProtobufEncoder}.
063         */
064        public ProtobufHttpMessageWriter() {
065                super(new ProtobufEncoder());
066        }
067
068        /**
069         * Create a new {@code ProtobufHttpMessageWriter} with the given encoder.
070         * @param encoder the Protobuf message encoder to use
071         */
072        public ProtobufHttpMessageWriter(Encoder<Message> encoder) {
073                super(encoder);
074        }
075
076
077        @SuppressWarnings("unchecked")
078        @Override
079        public Mono<Void> write(Publisher<? extends Message> inputStream, ResolvableType elementType,
080                        @Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) {
081
082                try {
083                        Message.Builder builder = getMessageBuilder(elementType.toClass());
084                        Descriptors.Descriptor descriptor = builder.getDescriptorForType();
085                        message.getHeaders().add(X_PROTOBUF_SCHEMA_HEADER, descriptor.getFile().getName());
086                        message.getHeaders().add(X_PROTOBUF_MESSAGE_HEADER, descriptor.getFullName());
087                        if (inputStream instanceof Flux) {
088                                if (mediaType == null) {
089                                        message.getHeaders().setContentType(((HttpMessageEncoder<?>)getEncoder()).getStreamingMediaTypes().get(0));
090                                }
091                                else if (!ProtobufEncoder.DELIMITED_VALUE.equals(mediaType.getParameters().get(ProtobufEncoder.DELIMITED_KEY))) {
092                                        Map<String, String> parameters = new HashMap<>(mediaType.getParameters());
093                                        parameters.put(ProtobufEncoder.DELIMITED_KEY, ProtobufEncoder.DELIMITED_VALUE);
094                                        message.getHeaders().setContentType(new MediaType(mediaType.getType(), mediaType.getSubtype(), parameters));
095                                }
096                        }
097                        return super.write(inputStream, elementType, mediaType, message, hints);
098                }
099                catch (Exception ex) {
100                        return Mono.error(new DecodingException("Could not read Protobuf message: " + ex.getMessage(), ex));
101                }
102        }
103
104        /**
105         * Create a new {@code Message.Builder} instance for the given class.
106         * <p>This method uses a ConcurrentHashMap for caching method lookups.
107         */
108        private static Message.Builder getMessageBuilder(Class<?> clazz) throws Exception {
109                Method method = methodCache.get(clazz);
110                if (method == null) {
111                        method = clazz.getMethod("newBuilder");
112                        methodCache.put(clazz, method);
113                }
114                return (Message.Builder) method.invoke(clazz);
115        }
116
117}