001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.http.codec; 018 019import java.nio.charset.StandardCharsets; 020import java.time.Duration; 021import java.util.Arrays; 022import java.util.Collections; 023import java.util.List; 024import java.util.Map; 025 026import org.reactivestreams.Publisher; 027import reactor.core.publisher.Flux; 028import reactor.core.publisher.Mono; 029 030import org.springframework.core.ResolvableType; 031import org.springframework.core.codec.CodecException; 032import org.springframework.core.codec.Encoder; 033import org.springframework.core.codec.Hints; 034import org.springframework.core.io.buffer.DataBuffer; 035import org.springframework.core.io.buffer.DataBufferFactory; 036import org.springframework.core.io.buffer.DataBufferUtils; 037import org.springframework.core.io.buffer.PooledDataBuffer; 038import org.springframework.http.MediaType; 039import org.springframework.http.ReactiveHttpOutputMessage; 040import org.springframework.http.server.reactive.ServerHttpRequest; 041import org.springframework.http.server.reactive.ServerHttpResponse; 042import org.springframework.lang.Nullable; 043import org.springframework.util.Assert; 044import org.springframework.util.StringUtils; 045 046/** 047 * {@code HttpMessageWriter} for {@code "text/event-stream"} responses. 048 * 049 * @author Sebastien Deleuze 050 * @author Arjen Poutsma 051 * @author Rossen Stoyanchev 052 * @since 5.0 053 */ 054public class ServerSentEventHttpMessageWriter implements HttpMessageWriter<Object> { 055 056 private static final MediaType DEFAULT_MEDIA_TYPE = new MediaType("text", "event-stream", StandardCharsets.UTF_8); 057 058 private static final List<MediaType> WRITABLE_MEDIA_TYPES = Collections.singletonList(MediaType.TEXT_EVENT_STREAM); 059 060 061 @Nullable 062 private final Encoder<?> encoder; 063 064 065 /** 066 * Constructor without an {@code Encoder}. In this mode only {@code String} 067 * is supported for event data to be encoded. 068 */ 069 public ServerSentEventHttpMessageWriter() { 070 this(null); 071 } 072 073 /** 074 * Constructor with JSON {@code Encoder} for encoding objects. 075 * Support for {@code String} event data is built-in. 076 * @param encoder the Encoder to use (may be {@code null}) 077 */ 078 public ServerSentEventHttpMessageWriter(@Nullable Encoder<?> encoder) { 079 this.encoder = encoder; 080 } 081 082 083 /** 084 * Return the configured {@code Encoder}, if any. 085 */ 086 @Nullable 087 public Encoder<?> getEncoder() { 088 return this.encoder; 089 } 090 091 @Override 092 public List<MediaType> getWritableMediaTypes() { 093 return WRITABLE_MEDIA_TYPES; 094 } 095 096 097 @Override 098 public boolean canWrite(ResolvableType elementType, @Nullable MediaType mediaType) { 099 return (mediaType == null || MediaType.TEXT_EVENT_STREAM.includes(mediaType) || 100 ServerSentEvent.class.isAssignableFrom(elementType.toClass())); 101 } 102 103 @Override 104 public Mono<Void> write(Publisher<?> input, ResolvableType elementType, @Nullable MediaType mediaType, 105 ReactiveHttpOutputMessage message, Map<String, Object> hints) { 106 107 mediaType = (mediaType != null && mediaType.getCharset() != null ? mediaType : DEFAULT_MEDIA_TYPE); 108 DataBufferFactory bufferFactory = message.bufferFactory(); 109 110 message.getHeaders().setContentType(mediaType); 111 return message.writeAndFlushWith(encode(input, elementType, mediaType, bufferFactory, hints)); 112 } 113 114 private Flux<Publisher<DataBuffer>> encode(Publisher<?> input, ResolvableType elementType, 115 MediaType mediaType, DataBufferFactory factory, Map<String, Object> hints) { 116 117 ResolvableType dataType = (ServerSentEvent.class.isAssignableFrom(elementType.toClass()) ? 118 elementType.getGeneric() : elementType); 119 120 return Flux.from(input).map(element -> { 121 122 ServerSentEvent<?> sse = (element instanceof ServerSentEvent ? 123 (ServerSentEvent<?>) element : ServerSentEvent.builder().data(element).build()); 124 125 StringBuilder sb = new StringBuilder(); 126 String id = sse.id(); 127 String event = sse.event(); 128 Duration retry = sse.retry(); 129 String comment = sse.comment(); 130 Object data = sse.data(); 131 if (id != null) { 132 writeField("id", id, sb); 133 } 134 if (event != null) { 135 writeField("event", event, sb); 136 } 137 if (retry != null) { 138 writeField("retry", retry.toMillis(), sb); 139 } 140 if (comment != null) { 141 sb.append(':').append(StringUtils.replace(comment, "\n", "\n:")).append("\n"); 142 } 143 if (data != null) { 144 sb.append("data:"); 145 } 146 147 Flux<DataBuffer> result; 148 if (data == null) { 149 result = Flux.just(encodeText(sb + "\n", mediaType, factory)); 150 } 151 else if (data instanceof String) { 152 data = StringUtils.replace((String) data, "\n", "\ndata:"); 153 result = Flux.just(encodeText(sb + (String) data + "\n\n", mediaType, factory)); 154 } 155 else { 156 result = encodeEvent(sb, data, dataType, mediaType, factory, hints); 157 } 158 159 return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); 160 }); 161 } 162 163 @SuppressWarnings("unchecked") 164 private <T> Flux<DataBuffer> encodeEvent(StringBuilder eventContent, T data, ResolvableType dataType, 165 MediaType mediaType, DataBufferFactory factory, Map<String, Object> hints) { 166 167 if (this.encoder == null) { 168 throw new CodecException("No SSE encoder configured and the data is not String."); 169 } 170 return Flux.just(factory.join(Arrays.asList( 171 encodeText(eventContent, mediaType, factory), 172 ((Encoder<T>) this.encoder).encodeValue(data, factory, dataType, mediaType, hints), 173 encodeText("\n\n", mediaType, factory)))); 174 } 175 176 private void writeField(String fieldName, Object fieldValue, StringBuilder sb) { 177 sb.append(fieldName).append(':').append(fieldValue).append("\n"); 178 } 179 180 private DataBuffer encodeText(CharSequence text, MediaType mediaType, DataBufferFactory bufferFactory) { 181 Assert.notNull(mediaType.getCharset(), "Expected MediaType with charset"); 182 byte[] bytes = text.toString().getBytes(mediaType.getCharset()); 183 return bufferFactory.wrap(bytes); // wrapping, not allocating 184 } 185 186 @Override 187 public Mono<Void> write(Publisher<?> input, ResolvableType actualType, ResolvableType elementType, 188 @Nullable MediaType mediaType, ServerHttpRequest request, ServerHttpResponse response, 189 Map<String, Object> hints) { 190 191 Map<String, Object> allHints = Hints.merge(hints, 192 getEncodeHints(actualType, elementType, mediaType, request, response)); 193 194 return write(input, elementType, mediaType, response, allHints); 195 } 196 197 private Map<String, Object> getEncodeHints(ResolvableType actualType, ResolvableType elementType, 198 @Nullable MediaType mediaType, ServerHttpRequest request, ServerHttpResponse response) { 199 200 if (this.encoder instanceof HttpMessageEncoder) { 201 HttpMessageEncoder<?> encoder = (HttpMessageEncoder<?>) this.encoder; 202 return encoder.getEncodeHints(actualType, elementType, mediaType, request, response); 203 } 204 return Hints.none(); 205 } 206 207}