001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.stomp; 018 019import java.nio.ByteBuffer; 020import java.util.Collections; 021import java.util.List; 022import java.util.Queue; 023import java.util.concurrent.LinkedBlockingQueue; 024 025import org.springframework.lang.Nullable; 026import org.springframework.messaging.Message; 027import org.springframework.util.Assert; 028import org.springframework.util.LinkedMultiValueMap; 029import org.springframework.util.MultiValueMap; 030 031/** 032 * An extension of {@link org.springframework.messaging.simp.stomp.StompDecoder} 033 * that buffers content remaining in the input ByteBuffer after the parent 034 * class has read all (complete) STOMP frames from it. The remaining content 035 * represents an incomplete STOMP frame. When called repeatedly with additional 036 * data, the decode method returns one or more messages or, if there is not 037 * enough data still, continues to buffer. 038 * 039 * <p>A single instance of this decoder can be invoked repeatedly to read all 040 * messages from a single stream (e.g. WebSocket session) as long as decoding 041 * does not fail. If there is an exception, StompDecoder instance should not 042 * be used any more as its internal state is not guaranteed to be consistent. 043 * It is expected that the underlying session is closed at that point. 044 * 045 * @author Rossen Stoyanchev 046 * @since 4.0.3 047 * @see StompDecoder 048 */ 049public class BufferingStompDecoder { 050 051 private final StompDecoder stompDecoder; 052 053 private final int bufferSizeLimit; 054 055 private final Queue<ByteBuffer> chunks = new LinkedBlockingQueue<>(); 056 057 @Nullable 058 private volatile Integer expectedContentLength; 059 060 061 /** 062 * Create a new {@code BufferingStompDecoder} wrapping the given {@code StompDecoder}. 063 * @param stompDecoder the target decoder to wrap 064 * @param bufferSizeLimit the buffer size limit 065 */ 066 public BufferingStompDecoder(StompDecoder stompDecoder, int bufferSizeLimit) { 067 Assert.notNull(stompDecoder, "StompDecoder is required"); 068 Assert.isTrue(bufferSizeLimit > 0, "Buffer size limit must be greater than 0"); 069 this.stompDecoder = stompDecoder; 070 this.bufferSizeLimit = bufferSizeLimit; 071 } 072 073 074 /** 075 * Return the wrapped {@link StompDecoder}. 076 */ 077 public final StompDecoder getStompDecoder() { 078 return this.stompDecoder; 079 } 080 081 /** 082 * Return the configured buffer size limit. 083 */ 084 public final int getBufferSizeLimit() { 085 return this.bufferSizeLimit; 086 } 087 088 089 /** 090 * Decodes one or more STOMP frames from the given {@code ByteBuffer} into a 091 * list of {@link Message Messages}. 092 * <p>If there was enough data to parse a "content-length" header, then the 093 * value is used to determine how much more data is needed before a new 094 * attempt to decode is made. 095 * <p>If there was not enough data to parse the "content-length", or if there 096 * is "content-length" header, every subsequent call to decode attempts to 097 * parse again with all available data. Therefore the presence of a "content-length" 098 * header helps to optimize the decoding of large messages. 099 * @param newBuffer a buffer containing new data to decode 100 * @return decoded messages or an empty list 101 * @throws StompConversionException raised in case of decoding issues 102 */ 103 public List<Message<byte[]>> decode(ByteBuffer newBuffer) { 104 this.chunks.add(newBuffer); 105 checkBufferLimits(); 106 107 Integer contentLength = this.expectedContentLength; 108 if (contentLength != null && getBufferSize() < contentLength) { 109 return Collections.emptyList(); 110 } 111 112 ByteBuffer bufferToDecode = assembleChunksAndReset(); 113 MultiValueMap<String, String> headers = new LinkedMultiValueMap<>(); 114 List<Message<byte[]>> messages = this.stompDecoder.decode(bufferToDecode, headers); 115 116 if (bufferToDecode.hasRemaining()) { 117 this.chunks.add(bufferToDecode); 118 this.expectedContentLength = StompHeaderAccessor.getContentLength(headers); 119 } 120 121 return messages; 122 } 123 124 private ByteBuffer assembleChunksAndReset() { 125 ByteBuffer result; 126 if (this.chunks.size() == 1) { 127 result = this.chunks.remove(); 128 } 129 else { 130 result = ByteBuffer.allocate(getBufferSize()); 131 for (ByteBuffer partial : this.chunks) { 132 result.put(partial); 133 } 134 result.flip(); 135 } 136 this.chunks.clear(); 137 this.expectedContentLength = null; 138 return result; 139 } 140 141 private void checkBufferLimits() { 142 Integer contentLength = this.expectedContentLength; 143 if (contentLength != null && contentLength > this.bufferSizeLimit) { 144 throw new StompConversionException( 145 "STOMP 'content-length' header value " + this.expectedContentLength + 146 " exceeds configured buffer size limit " + this.bufferSizeLimit); 147 } 148 if (getBufferSize() > this.bufferSizeLimit) { 149 throw new StompConversionException("The configured STOMP buffer size limit of " + 150 this.bufferSizeLimit + " bytes has been exceeded"); 151 } 152 } 153 154 /** 155 * Calculate the current buffer size. 156 */ 157 public int getBufferSize() { 158 int size = 0; 159 for (ByteBuffer buffer : this.chunks) { 160 size = size + buffer.remaining(); 161 } 162 return size; 163 } 164 165 /** 166 * Get the expected content length of the currently buffered, incomplete STOMP frame. 167 */ 168 @Nullable 169 public Integer getExpectedContentLength() { 170 return this.expectedContentLength; 171 } 172 173}