001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.http.codec;
018
019import java.nio.charset.StandardCharsets;
020import java.time.Duration;
021import java.util.Collections;
022import java.util.List;
023import java.util.Map;
024
025import reactor.core.publisher.Flux;
026import reactor.core.publisher.Mono;
027
028import org.springframework.core.ResolvableType;
029import org.springframework.core.codec.CodecException;
030import org.springframework.core.codec.Decoder;
031import org.springframework.core.codec.StringDecoder;
032import org.springframework.core.io.buffer.DataBuffer;
033import org.springframework.core.io.buffer.DataBufferFactory;
034import org.springframework.core.io.buffer.DataBufferLimitException;
035import org.springframework.core.io.buffer.DefaultDataBufferFactory;
036import org.springframework.http.MediaType;
037import org.springframework.http.ReactiveHttpInputMessage;
038import org.springframework.lang.Nullable;
039
040/**
041 * Reader that supports a stream of {@link ServerSentEvent ServerSentEvents} and also plain
042 * {@link Object Objects} which is the same as an {@link ServerSentEvent} with data only.
043 *
044 * @author Sebastien Deleuze
045 * @author Rossen Stoyanchev
046 * @since 5.0
047 */
048public class ServerSentEventHttpMessageReader implements HttpMessageReader<Object> {
049
050        private static final ResolvableType STRING_TYPE = ResolvableType.forClass(String.class);
051
052        private static final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
053
054
055        @Nullable
056        private final Decoder<?> decoder;
057
058        private final StringDecoder lineDecoder = StringDecoder.textPlainOnly();
059
060
061        /**
062         * Constructor without a {@code Decoder}. In this mode only {@code String}
063         * is supported as the data of an event.
064         */
065        public ServerSentEventHttpMessageReader() {
066                this(null);
067        }
068
069        /**
070         * Constructor with JSON {@code Decoder} for decoding to Objects.
071         * Support for decoding to {@code String} event data is built-in.
072         */
073        public ServerSentEventHttpMessageReader(@Nullable Decoder<?> decoder) {
074                this.decoder = decoder;
075        }
076
077
078        /**
079         * Return the configured {@code Decoder}.
080         */
081        @Nullable
082        public Decoder<?> getDecoder() {
083                return this.decoder;
084        }
085
086        /**
087         * Configure a limit on the maximum number of bytes per SSE event which are
088         * buffered before the event is parsed.
089         * <p>Note that the {@link #getDecoder() data decoder}, if provided, must
090         * also be customized accordingly to raise the limit if necessary in order
091         * to be able to parse the data portion of the event.
092         * <p>By default this is set to 256K.
093         * @param byteCount the max number of bytes to buffer, or -1 for unlimited
094         * @since 5.1.13
095         */
096        public void setMaxInMemorySize(int byteCount) {
097                this.lineDecoder.setMaxInMemorySize(byteCount);
098        }
099
100        /**
101         * Return the {@link #setMaxInMemorySize configured} byte count limit.
102         * @since 5.1.13
103         */
104        public int getMaxInMemorySize() {
105                return this.lineDecoder.getMaxInMemorySize();
106        }
107
108
109        @Override
110        public List<MediaType> getReadableMediaTypes() {
111                return Collections.singletonList(MediaType.TEXT_EVENT_STREAM);
112        }
113
114        @Override
115        public boolean canRead(ResolvableType elementType, @Nullable MediaType mediaType) {
116                return (MediaType.TEXT_EVENT_STREAM.includes(mediaType) || isServerSentEvent(elementType));
117        }
118
119        private boolean isServerSentEvent(ResolvableType elementType) {
120                return ServerSentEvent.class.isAssignableFrom(elementType.toClass());
121        }
122
123
124        @Override
125        public Flux<Object> read(
126                        ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
127
128                LimitTracker limitTracker = new LimitTracker();
129
130                boolean shouldWrap = isServerSentEvent(elementType);
131                ResolvableType valueType = (shouldWrap ? elementType.getGeneric() : elementType);
132
133                return this.lineDecoder.decode(message.getBody(), STRING_TYPE, null, hints)
134                                .doOnNext(limitTracker::afterLineParsed)
135                                .bufferUntil(String::isEmpty)
136                                .concatMap(lines -> {
137                                        Object event = buildEvent(lines, valueType, shouldWrap, hints);
138                                        return (event != null ? Mono.just(event) : Mono.empty());
139                                });
140        }
141
142        @Nullable
143        private Object buildEvent(List<String> lines, ResolvableType valueType, boolean shouldWrap,
144                        Map<String, Object> hints) {
145
146                ServerSentEvent.Builder<Object> sseBuilder = shouldWrap ? ServerSentEvent.builder() : null;
147                StringBuilder data = null;
148                StringBuilder comment = null;
149
150                for (String line : lines) {
151                        if (line.startsWith("data:")) {
152                                data = (data != null ? data : new StringBuilder());
153                                data.append(line.substring(5).trim()).append("\n");
154                        }
155                        if (shouldWrap) {
156                                if (line.startsWith("id:")) {
157                                        sseBuilder.id(line.substring(3).trim());
158                                }
159                                else if (line.startsWith("event:")) {
160                                        sseBuilder.event(line.substring(6).trim());
161                                }
162                                else if (line.startsWith("retry:")) {
163                                        sseBuilder.retry(Duration.ofMillis(Long.parseLong(line.substring(6).trim())));
164                                }
165                                else if (line.startsWith(":")) {
166                                        comment = (comment != null ? comment : new StringBuilder());
167                                        comment.append(line.substring(1).trim()).append("\n");
168                                }
169                        }
170                }
171
172                Object decodedData = (data != null ? decodeData(data, valueType, hints) : null);
173
174                if (shouldWrap) {
175                        if (comment != null) {
176                                sseBuilder.comment(comment.substring(0, comment.length() - 1));
177                        }
178                        if (decodedData != null) {
179                                sseBuilder.data(decodedData);
180                        }
181                        return sseBuilder.build();
182                }
183                else {
184                        return decodedData;
185                }
186        }
187
188        @Nullable
189        private Object decodeData(StringBuilder data, ResolvableType dataType, Map<String, Object> hints) {
190                if (String.class == dataType.resolve()) {
191                        return data.substring(0, data.length() - 1);
192                }
193                if (this.decoder == null) {
194                        throw new CodecException("No SSE decoder configured and the data is not String.");
195                }
196                byte[] bytes = data.toString().getBytes(StandardCharsets.UTF_8);
197                DataBuffer buffer = bufferFactory.wrap(bytes);  // wrapping only, no allocation
198                return this.decoder.decode(buffer, dataType, MediaType.TEXT_EVENT_STREAM, hints);
199        }
200
201        @Override
202        public Mono<Object> readMono(
203                        ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) {
204
205                // In order of readers, we're ahead of String + "*/*"
206                // If this is called, simply delegate to StringDecoder
207
208                if (elementType.resolve() == String.class) {
209                        Flux<DataBuffer> body = message.getBody();
210                        return this.lineDecoder.decodeToMono(body, elementType, null, null).cast(Object.class);
211                }
212
213                return Mono.error(new UnsupportedOperationException(
214                                "ServerSentEventHttpMessageReader only supports reading stream of events as a Flux"));
215        }
216
217
218        private class LimitTracker {
219
220                private int accumulated = 0;
221
222                public void afterLineParsed(String line) {
223                        if (getMaxInMemorySize() < 0) {
224                                return;
225                        }
226                        if (line.isEmpty()) {
227                                this.accumulated = 0;
228                        }
229                        if (line.length() > Integer.MAX_VALUE - this.accumulated) {
230                                raiseLimitException();
231                        }
232                        else {
233                                this.accumulated += line.length();
234                                if (this.accumulated > getMaxInMemorySize()) {
235                                        raiseLimitException();
236                                }
237                        }
238                }
239
240                private void raiseLimitException() {
241                        // Do not release here, it's likely down via doOnDiscard..
242                        throw new DataBufferLimitException("Exceeded limit on max bytes to buffer : " + getMaxInMemorySize());
243                }
244        }
245
246}