001/* 002 * Copyright 2002-2016 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.stomp; 018 019import java.nio.ByteBuffer; 020import java.util.Collections; 021import java.util.List; 022import java.util.Queue; 023import java.util.concurrent.LinkedBlockingQueue; 024 025import org.springframework.messaging.Message; 026import org.springframework.util.Assert; 027import org.springframework.util.LinkedMultiValueMap; 028import org.springframework.util.MultiValueMap; 029 030/** 031 * An extension of {@link org.springframework.messaging.simp.stomp.StompDecoder} 032 * that buffers content remaining in the input ByteBuffer after the parent 033 * class has read all (complete) STOMP frames from it. The remaining content 034 * represents an incomplete STOMP frame. When called repeatedly with additional 035 * data, the decode method returns one or more messages or, if there is not 036 * enough data still, continues to buffer. 037 * 038 * <p>A single instance of this decoder can be invoked repeatedly to read all 039 * messages from a single stream (e.g. WebSocket session) as long as decoding 040 * does not fail. If there is an exception, StompDecoder instance should not 041 * be used any more as its internal state is not guaranteed to be consistent. 042 * It is expected that the underlying session is closed at that point. 043 * 044 * @author Rossen Stoyanchev 045 * @since 4.0.3 046 * @see StompDecoder 047 */ 048public class BufferingStompDecoder { 049 050 private final StompDecoder stompDecoder; 051 052 private final int bufferSizeLimit; 053 054 private final Queue<ByteBuffer> chunks = new LinkedBlockingQueue<ByteBuffer>(); 055 056 private volatile Integer expectedContentLength; 057 058 059 /** 060 * Create a new {@code BufferingStompDecoder} wrapping the given {@code StompDecoder}. 061 * @param stompDecoder the target decoder to wrap 062 * @param bufferSizeLimit the buffer size limit 063 */ 064 public BufferingStompDecoder(StompDecoder stompDecoder, int bufferSizeLimit) { 065 Assert.notNull(stompDecoder, "StompDecoder is required"); 066 Assert.isTrue(bufferSizeLimit > 0, "Buffer size limit must be greater than 0"); 067 this.stompDecoder = stompDecoder; 068 this.bufferSizeLimit = bufferSizeLimit; 069 } 070 071 072 /** 073 * Return the wrapped {@link StompDecoder}. 074 */ 075 public final StompDecoder getStompDecoder() { 076 return this.stompDecoder; 077 } 078 079 /** 080 * Return the configured buffer size limit. 081 */ 082 public final int getBufferSizeLimit() { 083 return this.bufferSizeLimit; 084 } 085 086 087 /** 088 * Decodes one or more STOMP frames from the given {@code ByteBuffer} into a 089 * list of {@link Message}s. 090 * <p>If there was enough data to parse a "content-length" header, then the 091 * value is used to determine how much more data is needed before a new 092 * attempt to decode is made. 093 * <p>If there was not enough data to parse the "content-length", or if there 094 * is "content-length" header, every subsequent call to decode attempts to 095 * parse again with all available data. Therefore the presence of a "content-length" 096 * header helps to optimize the decoding of large messages. 097 * @param newBuffer a buffer containing new data to decode 098 * @return decoded messages or an empty list 099 * @throws StompConversionException raised in case of decoding issues 100 */ 101 public List<Message<byte[]>> decode(ByteBuffer newBuffer) { 102 this.chunks.add(newBuffer); 103 checkBufferLimits(); 104 105 if (this.expectedContentLength != null && getBufferSize() < this.expectedContentLength) { 106 return Collections.<Message<byte[]>>emptyList(); 107 } 108 109 ByteBuffer bufferToDecode = assembleChunksAndReset(); 110 MultiValueMap<String, String> headers = new LinkedMultiValueMap<String, String>(); 111 List<Message<byte[]>> messages = this.stompDecoder.decode(bufferToDecode, headers); 112 113 if (bufferToDecode.hasRemaining()) { 114 this.chunks.add(bufferToDecode); 115 this.expectedContentLength = StompHeaderAccessor.getContentLength(headers); 116 } 117 118 return messages; 119 } 120 121 private ByteBuffer assembleChunksAndReset() { 122 ByteBuffer result; 123 if (this.chunks.size() == 1) { 124 result = this.chunks.remove(); 125 } 126 else { 127 result = ByteBuffer.allocate(getBufferSize()); 128 for (ByteBuffer partial : this.chunks) { 129 result.put(partial); 130 } 131 result.flip(); 132 } 133 this.chunks.clear(); 134 this.expectedContentLength = null; 135 return result; 136 } 137 138 private void checkBufferLimits() { 139 if (this.expectedContentLength != null) { 140 if (this.expectedContentLength > this.bufferSizeLimit) { 141 throw new StompConversionException( 142 "STOMP 'content-length' header value " + this.expectedContentLength + 143 " exceeds configured buffer size limit " + this.bufferSizeLimit); 144 } 145 } 146 if (getBufferSize() > this.bufferSizeLimit) { 147 throw new StompConversionException("The configured STOMP buffer size limit of " + 148 this.bufferSizeLimit + " bytes has been exceeded"); 149 } 150 } 151 152 /** 153 * Calculate the current buffer size. 154 */ 155 public int getBufferSize() { 156 int size = 0; 157 for (ByteBuffer buffer : this.chunks) { 158 size = size + buffer.remaining(); 159 } 160 return size; 161 } 162 163 /** 164 * Get the expected content length of the currently buffered, incomplete STOMP frame. 165 */ 166 public Integer getExpectedContentLength() { 167 return this.expectedContentLength; 168 } 169 170}