001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.http.codec; 018 019import java.nio.charset.StandardCharsets; 020import java.time.Duration; 021import java.util.Collections; 022import java.util.List; 023import java.util.Map; 024 025import reactor.core.publisher.Flux; 026import reactor.core.publisher.Mono; 027 028import org.springframework.core.ResolvableType; 029import org.springframework.core.codec.CodecException; 030import org.springframework.core.codec.Decoder; 031import org.springframework.core.codec.StringDecoder; 032import org.springframework.core.io.buffer.DataBuffer; 033import org.springframework.core.io.buffer.DataBufferFactory; 034import org.springframework.core.io.buffer.DataBufferLimitException; 035import org.springframework.core.io.buffer.DefaultDataBufferFactory; 036import org.springframework.http.MediaType; 037import org.springframework.http.ReactiveHttpInputMessage; 038import org.springframework.lang.Nullable; 039 040/** 041 * Reader that supports a stream of {@link ServerSentEvent ServerSentEvents} and also plain 042 * {@link Object Objects} which is the same as an {@link ServerSentEvent} with data only. 043 * 044 * @author Sebastien Deleuze 045 * @author Rossen Stoyanchev 046 * @since 5.0 047 */ 048public class ServerSentEventHttpMessageReader implements HttpMessageReader<Object> { 049 050 private static final ResolvableType STRING_TYPE = ResolvableType.forClass(String.class); 051 052 private static final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); 053 054 055 @Nullable 056 private final Decoder<?> decoder; 057 058 private final StringDecoder lineDecoder = StringDecoder.textPlainOnly(); 059 060 061 /** 062 * Constructor without a {@code Decoder}. In this mode only {@code String} 063 * is supported as the data of an event. 064 */ 065 public ServerSentEventHttpMessageReader() { 066 this(null); 067 } 068 069 /** 070 * Constructor with JSON {@code Decoder} for decoding to Objects. 071 * Support for decoding to {@code String} event data is built-in. 072 */ 073 public ServerSentEventHttpMessageReader(@Nullable Decoder<?> decoder) { 074 this.decoder = decoder; 075 } 076 077 078 /** 079 * Return the configured {@code Decoder}. 080 */ 081 @Nullable 082 public Decoder<?> getDecoder() { 083 return this.decoder; 084 } 085 086 /** 087 * Configure a limit on the maximum number of bytes per SSE event which are 088 * buffered before the event is parsed. 089 * <p>Note that the {@link #getDecoder() data decoder}, if provided, must 090 * also be customized accordingly to raise the limit if necessary in order 091 * to be able to parse the data portion of the event. 092 * <p>By default this is set to 256K. 093 * @param byteCount the max number of bytes to buffer, or -1 for unlimited 094 * @since 5.1.13 095 */ 096 public void setMaxInMemorySize(int byteCount) { 097 this.lineDecoder.setMaxInMemorySize(byteCount); 098 } 099 100 /** 101 * Return the {@link #setMaxInMemorySize configured} byte count limit. 102 * @since 5.1.13 103 */ 104 public int getMaxInMemorySize() { 105 return this.lineDecoder.getMaxInMemorySize(); 106 } 107 108 109 @Override 110 public List<MediaType> getReadableMediaTypes() { 111 return Collections.singletonList(MediaType.TEXT_EVENT_STREAM); 112 } 113 114 @Override 115 public boolean canRead(ResolvableType elementType, @Nullable MediaType mediaType) { 116 return (MediaType.TEXT_EVENT_STREAM.includes(mediaType) || isServerSentEvent(elementType)); 117 } 118 119 private boolean isServerSentEvent(ResolvableType elementType) { 120 return ServerSentEvent.class.isAssignableFrom(elementType.toClass()); 121 } 122 123 124 @Override 125 public Flux<Object> read( 126 ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) { 127 128 LimitTracker limitTracker = new LimitTracker(); 129 130 boolean shouldWrap = isServerSentEvent(elementType); 131 ResolvableType valueType = (shouldWrap ? elementType.getGeneric() : elementType); 132 133 return this.lineDecoder.decode(message.getBody(), STRING_TYPE, null, hints) 134 .doOnNext(limitTracker::afterLineParsed) 135 .bufferUntil(String::isEmpty) 136 .concatMap(lines -> { 137 Object event = buildEvent(lines, valueType, shouldWrap, hints); 138 return (event != null ? Mono.just(event) : Mono.empty()); 139 }); 140 } 141 142 @Nullable 143 private Object buildEvent(List<String> lines, ResolvableType valueType, boolean shouldWrap, 144 Map<String, Object> hints) { 145 146 ServerSentEvent.Builder<Object> sseBuilder = shouldWrap ? ServerSentEvent.builder() : null; 147 StringBuilder data = null; 148 StringBuilder comment = null; 149 150 for (String line : lines) { 151 if (line.startsWith("data:")) { 152 data = (data != null ? data : new StringBuilder()); 153 data.append(line.substring(5).trim()).append("\n"); 154 } 155 if (shouldWrap) { 156 if (line.startsWith("id:")) { 157 sseBuilder.id(line.substring(3).trim()); 158 } 159 else if (line.startsWith("event:")) { 160 sseBuilder.event(line.substring(6).trim()); 161 } 162 else if (line.startsWith("retry:")) { 163 sseBuilder.retry(Duration.ofMillis(Long.parseLong(line.substring(6).trim()))); 164 } 165 else if (line.startsWith(":")) { 166 comment = (comment != null ? comment : new StringBuilder()); 167 comment.append(line.substring(1).trim()).append("\n"); 168 } 169 } 170 } 171 172 Object decodedData = (data != null ? decodeData(data, valueType, hints) : null); 173 174 if (shouldWrap) { 175 if (comment != null) { 176 sseBuilder.comment(comment.substring(0, comment.length() - 1)); 177 } 178 if (decodedData != null) { 179 sseBuilder.data(decodedData); 180 } 181 return sseBuilder.build(); 182 } 183 else { 184 return decodedData; 185 } 186 } 187 188 @Nullable 189 private Object decodeData(StringBuilder data, ResolvableType dataType, Map<String, Object> hints) { 190 if (String.class == dataType.resolve()) { 191 return data.substring(0, data.length() - 1); 192 } 193 if (this.decoder == null) { 194 throw new CodecException("No SSE decoder configured and the data is not String."); 195 } 196 byte[] bytes = data.toString().getBytes(StandardCharsets.UTF_8); 197 DataBuffer buffer = bufferFactory.wrap(bytes); // wrapping only, no allocation 198 return this.decoder.decode(buffer, dataType, MediaType.TEXT_EVENT_STREAM, hints); 199 } 200 201 @Override 202 public Mono<Object> readMono( 203 ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) { 204 205 // In order of readers, we're ahead of String + "*/*" 206 // If this is called, simply delegate to StringDecoder 207 208 if (elementType.resolve() == String.class) { 209 Flux<DataBuffer> body = message.getBody(); 210 return this.lineDecoder.decodeToMono(body, elementType, null, null).cast(Object.class); 211 } 212 213 return Mono.error(new UnsupportedOperationException( 214 "ServerSentEventHttpMessageReader only supports reading stream of events as a Flux")); 215 } 216 217 218 private class LimitTracker { 219 220 private int accumulated = 0; 221 222 public void afterLineParsed(String line) { 223 if (getMaxInMemorySize() < 0) { 224 return; 225 } 226 if (line.isEmpty()) { 227 this.accumulated = 0; 228 } 229 if (line.length() > Integer.MAX_VALUE - this.accumulated) { 230 raiseLimitException(); 231 } 232 else { 233 this.accumulated += line.length(); 234 if (this.accumulated > getMaxInMemorySize()) { 235 raiseLimitException(); 236 } 237 } 238 } 239 240 private void raiseLimitException() { 241 // Do not release here, it's likely down via doOnDiscard.. 242 throw new DataBufferLimitException("Exceeded limit on max bytes to buffer : " + getMaxInMemorySize()); 243 } 244 } 245 246}