001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.http.codec;
018
019import java.nio.charset.StandardCharsets;
020import java.time.Duration;
021import java.util.Arrays;
022import java.util.Collections;
023import java.util.List;
024import java.util.Map;
025
026import org.reactivestreams.Publisher;
027import reactor.core.publisher.Flux;
028import reactor.core.publisher.Mono;
029
030import org.springframework.core.ResolvableType;
031import org.springframework.core.codec.CodecException;
032import org.springframework.core.codec.Encoder;
033import org.springframework.core.codec.Hints;
034import org.springframework.core.io.buffer.DataBuffer;
035import org.springframework.core.io.buffer.DataBufferFactory;
036import org.springframework.core.io.buffer.DataBufferUtils;
037import org.springframework.core.io.buffer.PooledDataBuffer;
038import org.springframework.http.MediaType;
039import org.springframework.http.ReactiveHttpOutputMessage;
040import org.springframework.http.server.reactive.ServerHttpRequest;
041import org.springframework.http.server.reactive.ServerHttpResponse;
042import org.springframework.lang.Nullable;
043import org.springframework.util.Assert;
044import org.springframework.util.StringUtils;
045
046/**
047 * {@code HttpMessageWriter} for {@code "text/event-stream"} responses.
048 *
049 * @author Sebastien Deleuze
050 * @author Arjen Poutsma
051 * @author Rossen Stoyanchev
052 * @since 5.0
053 */
054public class ServerSentEventHttpMessageWriter implements HttpMessageWriter<Object> {
055
056        private static final MediaType DEFAULT_MEDIA_TYPE = new MediaType("text", "event-stream", StandardCharsets.UTF_8);
057
058        private static final List<MediaType> WRITABLE_MEDIA_TYPES = Collections.singletonList(MediaType.TEXT_EVENT_STREAM);
059
060
061        @Nullable
062        private final Encoder<?> encoder;
063
064
065        /**
066         * Constructor without an {@code Encoder}. In this mode only {@code String}
067         * is supported for event data to be encoded.
068         */
069        public ServerSentEventHttpMessageWriter() {
070                this(null);
071        }
072
073        /**
074         * Constructor with JSON {@code Encoder} for encoding objects.
075         * Support for {@code String} event data is built-in.
076         * @param encoder the Encoder to use (may be {@code null})
077         */
078        public ServerSentEventHttpMessageWriter(@Nullable Encoder<?> encoder) {
079                this.encoder = encoder;
080        }
081
082
083        /**
084         * Return the configured {@code Encoder}, if any.
085         */
086        @Nullable
087        public Encoder<?> getEncoder() {
088                return this.encoder;
089        }
090
091        @Override
092        public List<MediaType> getWritableMediaTypes() {
093                return WRITABLE_MEDIA_TYPES;
094        }
095
096
097        @Override
098        public boolean canWrite(ResolvableType elementType, @Nullable MediaType mediaType) {
099                return (mediaType == null || MediaType.TEXT_EVENT_STREAM.includes(mediaType) ||
100                                ServerSentEvent.class.isAssignableFrom(elementType.toClass()));
101        }
102
103        @Override
104        public Mono<Void> write(Publisher<?> input, ResolvableType elementType, @Nullable MediaType mediaType,
105                        ReactiveHttpOutputMessage message, Map<String, Object> hints) {
106
107                mediaType = (mediaType != null && mediaType.getCharset() != null ? mediaType : DEFAULT_MEDIA_TYPE);
108                DataBufferFactory bufferFactory = message.bufferFactory();
109
110                message.getHeaders().setContentType(mediaType);
111                return message.writeAndFlushWith(encode(input, elementType, mediaType, bufferFactory, hints));
112        }
113
114        private Flux<Publisher<DataBuffer>> encode(Publisher<?> input, ResolvableType elementType,
115                        MediaType mediaType, DataBufferFactory factory, Map<String, Object> hints) {
116
117                ResolvableType dataType = (ServerSentEvent.class.isAssignableFrom(elementType.toClass()) ?
118                                elementType.getGeneric() : elementType);
119
120                return Flux.from(input).map(element -> {
121
122                        ServerSentEvent<?> sse = (element instanceof ServerSentEvent ?
123                                        (ServerSentEvent<?>) element : ServerSentEvent.builder().data(element).build());
124
125                        StringBuilder sb = new StringBuilder();
126                        String id = sse.id();
127                        String event = sse.event();
128                        Duration retry = sse.retry();
129                        String comment = sse.comment();
130                        Object data = sse.data();
131                        if (id != null) {
132                                writeField("id", id, sb);
133                        }
134                        if (event != null) {
135                                writeField("event", event, sb);
136                        }
137                        if (retry != null) {
138                                writeField("retry", retry.toMillis(), sb);
139                        }
140                        if (comment != null) {
141                                sb.append(':').append(StringUtils.replace(comment, "\n", "\n:")).append("\n");
142                        }
143                        if (data != null) {
144                                sb.append("data:");
145                        }
146
147                        Flux<DataBuffer> result;
148                        if (data == null) {
149                                result = Flux.just(encodeText(sb + "\n", mediaType, factory));
150                        }
151                        else if (data instanceof String) {
152                                data = StringUtils.replace((String) data, "\n", "\ndata:");
153                                result = Flux.just(encodeText(sb + (String) data + "\n\n", mediaType, factory));
154                        }
155                        else {
156                                result = encodeEvent(sb, data, dataType, mediaType, factory, hints);
157                        }
158
159                        return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
160                });
161        }
162
163        @SuppressWarnings("unchecked")
164        private <T> Flux<DataBuffer> encodeEvent(StringBuilder eventContent, T data, ResolvableType dataType,
165                        MediaType mediaType, DataBufferFactory factory, Map<String, Object> hints) {
166
167                if (this.encoder == null) {
168                        throw new CodecException("No SSE encoder configured and the data is not String.");
169                }
170                return Flux.just(factory.join(Arrays.asList(
171                                encodeText(eventContent, mediaType, factory),
172                                ((Encoder<T>) this.encoder).encodeValue(data, factory, dataType, mediaType, hints),
173                                encodeText("\n\n", mediaType, factory))));
174        }
175
176        private void writeField(String fieldName, Object fieldValue, StringBuilder sb) {
177                sb.append(fieldName).append(':').append(fieldValue).append("\n");
178        }
179
180        private DataBuffer encodeText(CharSequence text, MediaType mediaType, DataBufferFactory bufferFactory) {
181                Assert.notNull(mediaType.getCharset(), "Expected MediaType with charset");
182                byte[] bytes = text.toString().getBytes(mediaType.getCharset());
183                return bufferFactory.wrap(bytes);  // wrapping, not allocating
184        }
185
186        @Override
187        public Mono<Void> write(Publisher<?> input, ResolvableType actualType, ResolvableType elementType,
188                        @Nullable MediaType mediaType, ServerHttpRequest request, ServerHttpResponse response,
189                        Map<String, Object> hints) {
190
191                Map<String, Object> allHints = Hints.merge(hints,
192                                getEncodeHints(actualType, elementType, mediaType, request, response));
193
194                return write(input, elementType, mediaType, response, allHints);
195        }
196
197        private Map<String, Object> getEncodeHints(ResolvableType actualType, ResolvableType elementType,
198                        @Nullable MediaType mediaType, ServerHttpRequest request, ServerHttpResponse response) {
199
200                if (this.encoder instanceof HttpMessageEncoder) {
201                        HttpMessageEncoder<?> encoder = (HttpMessageEncoder<?>) this.encoder;
202                        return encoder.getEncodeHints(actualType, elementType, mediaType, request, response);
203                }
204                return Hints.none();
205        }
206
207}