001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.http.codec.protobuf; 018 019import java.lang.reflect.Method; 020import java.util.HashMap; 021import java.util.Map; 022import java.util.concurrent.ConcurrentMap; 023 024import com.google.protobuf.Descriptors; 025import com.google.protobuf.Message; 026import org.reactivestreams.Publisher; 027import reactor.core.publisher.Flux; 028import reactor.core.publisher.Mono; 029 030import org.springframework.core.ResolvableType; 031import org.springframework.core.codec.DecodingException; 032import org.springframework.core.codec.Encoder; 033import org.springframework.http.MediaType; 034import org.springframework.http.ReactiveHttpOutputMessage; 035import org.springframework.http.codec.EncoderHttpMessageWriter; 036import org.springframework.http.codec.HttpMessageEncoder; 037import org.springframework.lang.Nullable; 038import org.springframework.util.ConcurrentReferenceHashMap; 039 040/** 041 * {@code HttpMessageWriter} that can write a protobuf {@link Message} and adds 042 * {@code X-Protobuf-Schema}, {@code X-Protobuf-Message} headers and a 043 * {@code delimited=true} parameter is added to the content type if a flux is serialized. 044 * 045 * <p>For {@code HttpMessageReader}, just use 046 * {@code new DecoderHttpMessageReader(new ProtobufDecoder())}. 047 * 048 * @author S茅bastien Deleuze 049 * @since 5.1 050 * @see ProtobufEncoder 051 */ 052public class ProtobufHttpMessageWriter extends EncoderHttpMessageWriter<Message> { 053 054 private static final String X_PROTOBUF_SCHEMA_HEADER = "X-Protobuf-Schema"; 055 056 private static final String X_PROTOBUF_MESSAGE_HEADER = "X-Protobuf-Message"; 057 058 private static final ConcurrentMap<Class<?>, Method> methodCache = new ConcurrentReferenceHashMap<>(); 059 060 061 /** 062 * Create a new {@code ProtobufHttpMessageWriter} with a default {@link ProtobufEncoder}. 063 */ 064 public ProtobufHttpMessageWriter() { 065 super(new ProtobufEncoder()); 066 } 067 068 /** 069 * Create a new {@code ProtobufHttpMessageWriter} with the given encoder. 070 * @param encoder the Protobuf message encoder to use 071 */ 072 public ProtobufHttpMessageWriter(Encoder<Message> encoder) { 073 super(encoder); 074 } 075 076 077 @SuppressWarnings("unchecked") 078 @Override 079 public Mono<Void> write(Publisher<? extends Message> inputStream, ResolvableType elementType, 080 @Nullable MediaType mediaType, ReactiveHttpOutputMessage message, Map<String, Object> hints) { 081 082 try { 083 Message.Builder builder = getMessageBuilder(elementType.toClass()); 084 Descriptors.Descriptor descriptor = builder.getDescriptorForType(); 085 message.getHeaders().add(X_PROTOBUF_SCHEMA_HEADER, descriptor.getFile().getName()); 086 message.getHeaders().add(X_PROTOBUF_MESSAGE_HEADER, descriptor.getFullName()); 087 if (inputStream instanceof Flux) { 088 if (mediaType == null) { 089 message.getHeaders().setContentType(((HttpMessageEncoder<?>)getEncoder()).getStreamingMediaTypes().get(0)); 090 } 091 else if (!ProtobufEncoder.DELIMITED_VALUE.equals(mediaType.getParameters().get(ProtobufEncoder.DELIMITED_KEY))) { 092 Map<String, String> parameters = new HashMap<>(mediaType.getParameters()); 093 parameters.put(ProtobufEncoder.DELIMITED_KEY, ProtobufEncoder.DELIMITED_VALUE); 094 message.getHeaders().setContentType(new MediaType(mediaType.getType(), mediaType.getSubtype(), parameters)); 095 } 096 } 097 return super.write(inputStream, elementType, mediaType, message, hints); 098 } 099 catch (Exception ex) { 100 return Mono.error(new DecodingException("Could not read Protobuf message: " + ex.getMessage(), ex)); 101 } 102 } 103 104 /** 105 * Create a new {@code Message.Builder} instance for the given class. 106 * <p>This method uses a ConcurrentHashMap for caching method lookups. 107 */ 108 private static Message.Builder getMessageBuilder(Class<?> clazz) throws Exception { 109 Method method = methodCache.get(clazz); 110 if (method == null) { 111 method = clazz.getMethod("newBuilder"); 112 methodCache.put(clazz, method); 113 } 114 return (Message.Builder) method.invoke(clazz); 115 } 116 117}