001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.stomp; 018 019import java.nio.charset.Charset; 020import java.nio.charset.StandardCharsets; 021import java.security.Principal; 022import java.util.Arrays; 023import java.util.Collections; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.concurrent.atomic.AtomicLong; 028 029import org.springframework.lang.Nullable; 030import org.springframework.messaging.Message; 031import org.springframework.messaging.simp.SimpMessageHeaderAccessor; 032import org.springframework.messaging.simp.SimpMessageType; 033import org.springframework.messaging.support.MessageHeaderAccessor; 034import org.springframework.util.ClassUtils; 035import org.springframework.util.CollectionUtils; 036import org.springframework.util.MimeType; 037import org.springframework.util.MimeTypeUtils; 038import org.springframework.util.StringUtils; 039 040/** 041 * A {@code MessageHeaderAccessor} to use when creating a {@code Message} from 042 * a decoded STOMP frame, or when encoding a {@code Message} to a STOMP frame. 043 * 044 * <p>When created from STOMP frame content, the actual STOMP headers are 045 * stored in the native header sub-map managed by the parent class 046 * {@link org.springframework.messaging.support.NativeMessageHeaderAccessor} 047 * while the parent class {@link SimpMessageHeaderAccessor} manages common 048 * processing headers some of which are based on STOMP headers 049 * (e.g. destination, content-type, etc). 050 * 051 * <p>An instance of this class can also be created by wrapping an existing 052 * {@code Message}. That message may have been created with the more generic 053 * {@link org.springframework.messaging.simp.SimpMessageHeaderAccessor} in 054 * which case STOMP headers are created from common processing headers. 055 * In this case it is also necessary to invoke either 056 * {@link #updateStompCommandAsClientMessage()} or 057 * {@link #updateStompCommandAsServerMessage()} if sending a message and 058 * depending on whether a message is sent to a client or the message broker. 059 * 060 * @author Rossen Stoyanchev 061 * @since 4.0 062 */ 063public class StompHeaderAccessor extends SimpMessageHeaderAccessor { 064 065 private static final AtomicLong messageIdCounter = new AtomicLong(); 066 067 private static final long[] DEFAULT_HEARTBEAT = new long[] {0, 0}; 068 069 070 // STOMP header names 071 072 public static final String STOMP_ID_HEADER = "id"; 073 074 public static final String STOMP_HOST_HEADER = "host"; 075 076 public static final String STOMP_ACCEPT_VERSION_HEADER = "accept-version"; 077 078 public static final String STOMP_MESSAGE_ID_HEADER = "message-id"; 079 080 public static final String STOMP_RECEIPT_HEADER = "receipt"; // any client frame except CONNECT 081 082 public static final String STOMP_RECEIPT_ID_HEADER = "receipt-id"; // RECEIPT frame 083 084 public static final String STOMP_SUBSCRIPTION_HEADER = "subscription"; 085 086 public static final String STOMP_VERSION_HEADER = "version"; 087 088 public static final String STOMP_MESSAGE_HEADER = "message"; 089 090 public static final String STOMP_ACK_HEADER = "ack"; 091 092 public static final String STOMP_NACK_HEADER = "nack"; 093 094 public static final String STOMP_LOGIN_HEADER = "login"; 095 096 public static final String STOMP_PASSCODE_HEADER = "passcode"; 097 098 public static final String STOMP_DESTINATION_HEADER = "destination"; 099 100 public static final String STOMP_CONTENT_TYPE_HEADER = "content-type"; 101 102 public static final String STOMP_CONTENT_LENGTH_HEADER = "content-length"; 103 104 public static final String STOMP_HEARTBEAT_HEADER = "heart-beat"; 105 106 // Other header names 107 108 private static final String COMMAND_HEADER = "stompCommand"; 109 110 private static final String CREDENTIALS_HEADER = "stompCredentials"; 111 112 113 /** 114 * A constructor for creating message headers from a parsed STOMP frame. 115 */ 116 StompHeaderAccessor(StompCommand command, @Nullable Map<String, List<String>> externalSourceHeaders) { 117 super(command.getMessageType(), externalSourceHeaders); 118 setHeader(COMMAND_HEADER, command); 119 updateSimpMessageHeadersFromStompHeaders(); 120 } 121 122 /** 123 * A constructor for accessing and modifying existing message headers. 124 * Note that the message headers may not have been created from a STOMP frame 125 * but may have rather originated from using the more generic 126 * {@link org.springframework.messaging.simp.SimpMessageHeaderAccessor}. 127 */ 128 StompHeaderAccessor(Message<?> message) { 129 super(message); 130 updateStompHeadersFromSimpMessageHeaders(); 131 } 132 133 StompHeaderAccessor() { 134 super(SimpMessageType.HEARTBEAT, null); 135 } 136 137 138 void updateSimpMessageHeadersFromStompHeaders() { 139 if (getNativeHeaders() == null) { 140 return; 141 } 142 String value = getFirstNativeHeader(STOMP_DESTINATION_HEADER); 143 if (value != null) { 144 super.setDestination(value); 145 } 146 value = getFirstNativeHeader(STOMP_CONTENT_TYPE_HEADER); 147 if (value != null) { 148 super.setContentType(MimeTypeUtils.parseMimeType(value)); 149 } 150 StompCommand command = getCommand(); 151 if (StompCommand.MESSAGE.equals(command)) { 152 value = getFirstNativeHeader(STOMP_SUBSCRIPTION_HEADER); 153 if (value != null) { 154 super.setSubscriptionId(value); 155 } 156 } 157 else if (StompCommand.SUBSCRIBE.equals(command) || StompCommand.UNSUBSCRIBE.equals(command)) { 158 value = getFirstNativeHeader(STOMP_ID_HEADER); 159 if (value != null) { 160 super.setSubscriptionId(value); 161 } 162 } 163 else if (StompCommand.CONNECT.equals(command) || StompCommand.STOMP.equals(command)) { 164 protectPasscode(); 165 } 166 } 167 168 void updateStompHeadersFromSimpMessageHeaders() { 169 String destination = getDestination(); 170 if (destination != null) { 171 setNativeHeader(STOMP_DESTINATION_HEADER, destination); 172 } 173 MimeType contentType = getContentType(); 174 if (contentType != null) { 175 setNativeHeader(STOMP_CONTENT_TYPE_HEADER, contentType.toString()); 176 } 177 trySetStompHeaderForSubscriptionId(); 178 } 179 180 181 @Override 182 protected MessageHeaderAccessor createAccessor(Message<?> message) { 183 return wrap(message); 184 } 185 186 // Redeclared for visibility within simp.stomp 187 @Override 188 @Nullable 189 protected Map<String, List<String>> getNativeHeaders() { 190 return super.getNativeHeaders(); 191 } 192 193 public StompCommand updateStompCommandAsClientMessage() { 194 SimpMessageType messageType = getMessageType(); 195 if (messageType != SimpMessageType.MESSAGE) { 196 throw new IllegalStateException("Unexpected message type " + messageType); 197 } 198 StompCommand command = getCommand(); 199 if (command == null) { 200 command = StompCommand.SEND; 201 setHeader(COMMAND_HEADER, command); 202 } 203 else if (!command.equals(StompCommand.SEND)) { 204 throw new IllegalStateException("Unexpected STOMP command " + command); 205 } 206 return command; 207 } 208 209 public void updateStompCommandAsServerMessage() { 210 SimpMessageType messageType = getMessageType(); 211 if (messageType != SimpMessageType.MESSAGE) { 212 throw new IllegalStateException("Unexpected message type " + messageType); 213 } 214 StompCommand command = getCommand(); 215 if ((command == null) || StompCommand.SEND.equals(command)) { 216 setHeader(COMMAND_HEADER, StompCommand.MESSAGE); 217 } 218 else if (!StompCommand.MESSAGE.equals(command)) { 219 throw new IllegalStateException("Unexpected STOMP command " + command); 220 } 221 trySetStompHeaderForSubscriptionId(); 222 if (getMessageId() == null) { 223 String messageId = getSessionId() + '-' + messageIdCounter.getAndIncrement(); 224 setNativeHeader(STOMP_MESSAGE_ID_HEADER, messageId); 225 } 226 } 227 228 /** 229 * Return the STOMP command, or {@code null} if not yet set. 230 */ 231 @Nullable 232 public StompCommand getCommand() { 233 return (StompCommand) getHeader(COMMAND_HEADER); 234 } 235 236 public boolean isHeartbeat() { 237 return (SimpMessageType.HEARTBEAT == getMessageType()); 238 } 239 240 public long[] getHeartbeat() { 241 String rawValue = getFirstNativeHeader(STOMP_HEARTBEAT_HEADER); 242 String[] rawValues = StringUtils.split(rawValue, ","); 243 if (rawValues == null) { 244 return Arrays.copyOf(DEFAULT_HEARTBEAT, 2); 245 } 246 return new long[] {Long.parseLong(rawValues[0]), Long.parseLong(rawValues[1])}; 247 } 248 249 public void setAcceptVersion(String acceptVersion) { 250 setNativeHeader(STOMP_ACCEPT_VERSION_HEADER, acceptVersion); 251 } 252 253 public Set<String> getAcceptVersion() { 254 String rawValue = getFirstNativeHeader(STOMP_ACCEPT_VERSION_HEADER); 255 return (rawValue != null ? StringUtils.commaDelimitedListToSet(rawValue) : Collections.emptySet()); 256 } 257 258 public void setHost(@Nullable String host) { 259 setNativeHeader(STOMP_HOST_HEADER, host); 260 } 261 262 @Nullable 263 public String getHost() { 264 return getFirstNativeHeader(STOMP_HOST_HEADER); 265 } 266 267 @Override 268 public void setDestination(@Nullable String destination) { 269 super.setDestination(destination); 270 setNativeHeader(STOMP_DESTINATION_HEADER, destination); 271 } 272 273 @Override 274 public void setContentType(MimeType contentType) { 275 super.setContentType(contentType); 276 setNativeHeader(STOMP_CONTENT_TYPE_HEADER, contentType.toString()); 277 } 278 279 @Override 280 public void setSubscriptionId(@Nullable String subscriptionId) { 281 super.setSubscriptionId(subscriptionId); 282 trySetStompHeaderForSubscriptionId(); 283 } 284 285 private void trySetStompHeaderForSubscriptionId() { 286 String subscriptionId = getSubscriptionId(); 287 if (subscriptionId != null) { 288 StompCommand command = getCommand(); 289 if (command != null && StompCommand.MESSAGE.equals(command)) { 290 setNativeHeader(STOMP_SUBSCRIPTION_HEADER, subscriptionId); 291 } 292 else { 293 SimpMessageType messageType = getMessageType(); 294 if (SimpMessageType.SUBSCRIBE.equals(messageType) || SimpMessageType.UNSUBSCRIBE.equals(messageType)) { 295 setNativeHeader(STOMP_ID_HEADER, subscriptionId); 296 } 297 } 298 } 299 } 300 301 @Nullable 302 public Integer getContentLength() { 303 String header = getFirstNativeHeader(STOMP_CONTENT_LENGTH_HEADER); 304 return (header != null ? Integer.valueOf(header) : null); 305 } 306 307 public void setContentLength(int contentLength) { 308 setNativeHeader(STOMP_CONTENT_LENGTH_HEADER, String.valueOf(contentLength)); 309 } 310 311 public void setHeartbeat(long cx, long cy) { 312 setNativeHeader(STOMP_HEARTBEAT_HEADER, cx + "," + cy); 313 } 314 315 public void setAck(@Nullable String ack) { 316 setNativeHeader(STOMP_ACK_HEADER, ack); 317 } 318 319 @Nullable 320 public String getAck() { 321 return getFirstNativeHeader(STOMP_ACK_HEADER); 322 } 323 324 public void setNack(@Nullable String nack) { 325 setNativeHeader(STOMP_NACK_HEADER, nack); 326 } 327 328 @Nullable 329 public String getNack() { 330 return getFirstNativeHeader(STOMP_NACK_HEADER); 331 } 332 333 public void setLogin(@Nullable String login) { 334 setNativeHeader(STOMP_LOGIN_HEADER, login); 335 } 336 337 @Nullable 338 public String getLogin() { 339 return getFirstNativeHeader(STOMP_LOGIN_HEADER); 340 } 341 342 public void setPasscode(@Nullable String passcode) { 343 setNativeHeader(STOMP_PASSCODE_HEADER, passcode); 344 protectPasscode(); 345 } 346 347 private void protectPasscode() { 348 String value = getFirstNativeHeader(STOMP_PASSCODE_HEADER); 349 if (value != null && !"PROTECTED".equals(value)) { 350 setHeader(CREDENTIALS_HEADER, new StompPasscode(value)); 351 setNativeHeader(STOMP_PASSCODE_HEADER, "PROTECTED"); 352 } 353 } 354 355 /** 356 * Return the passcode header value, or {@code null} if not set. 357 */ 358 @Nullable 359 public String getPasscode() { 360 StompPasscode credentials = (StompPasscode) getHeader(CREDENTIALS_HEADER); 361 return (credentials != null ? credentials.passcode : null); 362 } 363 364 public void setReceiptId(@Nullable String receiptId) { 365 setNativeHeader(STOMP_RECEIPT_ID_HEADER, receiptId); 366 } 367 368 @Nullable 369 public String getReceiptId() { 370 return getFirstNativeHeader(STOMP_RECEIPT_ID_HEADER); 371 } 372 373 public void setReceipt(@Nullable String receiptId) { 374 setNativeHeader(STOMP_RECEIPT_HEADER, receiptId); 375 } 376 377 @Nullable 378 public String getReceipt() { 379 return getFirstNativeHeader(STOMP_RECEIPT_HEADER); 380 } 381 382 @Nullable 383 public String getMessage() { 384 return getFirstNativeHeader(STOMP_MESSAGE_HEADER); 385 } 386 387 public void setMessage(@Nullable String content) { 388 setNativeHeader(STOMP_MESSAGE_HEADER, content); 389 } 390 391 @Nullable 392 public String getMessageId() { 393 return getFirstNativeHeader(STOMP_MESSAGE_ID_HEADER); 394 } 395 396 public void setMessageId(@Nullable String id) { 397 setNativeHeader(STOMP_MESSAGE_ID_HEADER, id); 398 } 399 400 @Nullable 401 public String getVersion() { 402 return getFirstNativeHeader(STOMP_VERSION_HEADER); 403 } 404 405 public void setVersion(@Nullable String version) { 406 setNativeHeader(STOMP_VERSION_HEADER, version); 407 } 408 409 410 // Logging related 411 412 @Override 413 public String getShortLogMessage(Object payload) { 414 StompCommand command = getCommand(); 415 if (StompCommand.SUBSCRIBE.equals(command)) { 416 return "SUBSCRIBE " + getDestination() + " id=" + getSubscriptionId() + appendSession(); 417 } 418 else if (StompCommand.UNSUBSCRIBE.equals(command)) { 419 return "UNSUBSCRIBE id=" + getSubscriptionId() + appendSession(); 420 } 421 else if (StompCommand.SEND.equals(command)) { 422 return "SEND " + getDestination() + appendSession() + appendPayload(payload); 423 } 424 else if (StompCommand.CONNECT.equals(command)) { 425 Principal user = getUser(); 426 return "CONNECT" + (user != null ? " user=" + user.getName() : "") + appendSession(); 427 } 428 else if (StompCommand.STOMP.equals(command)) { 429 Principal user = getUser(); 430 return "STOMP" + (user != null ? " user=" + user.getName() : "") + appendSession(); 431 } 432 else if (StompCommand.CONNECTED.equals(command)) { 433 return "CONNECTED heart-beat=" + Arrays.toString(getHeartbeat()) + appendSession(); 434 } 435 else if (StompCommand.DISCONNECT.equals(command)) { 436 String receipt = getReceipt(); 437 return "DISCONNECT" + (receipt != null ? " receipt=" + receipt : "") + appendSession(); 438 } 439 else { 440 return getDetailedLogMessage(payload); 441 } 442 } 443 444 @Override 445 public String getDetailedLogMessage(@Nullable Object payload) { 446 if (isHeartbeat()) { 447 String sessionId = getSessionId(); 448 return "heart-beat" + (sessionId != null ? " in session " + sessionId : ""); 449 } 450 StompCommand command = getCommand(); 451 if (command == null) { 452 return super.getDetailedLogMessage(payload); 453 } 454 StringBuilder sb = new StringBuilder(); 455 sb.append(command.name()).append(" "); 456 Map<String, List<String>> nativeHeaders = getNativeHeaders(); 457 if (nativeHeaders != null) { 458 sb.append(nativeHeaders); 459 } 460 sb.append(appendSession()); 461 if (getUser() != null) { 462 sb.append(", user=").append(getUser().getName()); 463 } 464 if (payload != null && command.isBodyAllowed()) { 465 sb.append(appendPayload(payload)); 466 } 467 return sb.toString(); 468 } 469 470 private String appendSession() { 471 return " session=" + getSessionId(); 472 } 473 474 private String appendPayload(Object payload) { 475 if (payload.getClass() != byte[].class) { 476 throw new IllegalStateException( 477 "Expected byte array payload but got: " + ClassUtils.getQualifiedName(payload.getClass())); 478 } 479 byte[] bytes = (byte[]) payload; 480 MimeType mimeType = getContentType(); 481 String contentType = (mimeType != null ? " " + mimeType.toString() : ""); 482 if (bytes.length == 0 || mimeType == null || !isReadableContentType()) { 483 return contentType; 484 } 485 Charset charset = mimeType.getCharset(); 486 charset = (charset != null ? charset : StandardCharsets.UTF_8); 487 return (bytes.length < 80) ? 488 contentType + " payload=" + new String(bytes, charset) : 489 contentType + " payload=" + new String(Arrays.copyOf(bytes, 80), charset) + "...(truncated)"; 490 } 491 492 493 // Static factory methods and accessors 494 495 /** 496 * Create an instance for the given STOMP command. 497 */ 498 public static StompHeaderAccessor create(StompCommand command) { 499 return new StompHeaderAccessor(command, null); 500 } 501 502 /** 503 * Create an instance for the given STOMP command and headers. 504 */ 505 public static StompHeaderAccessor create(StompCommand command, Map<String, List<String>> headers) { 506 return new StompHeaderAccessor(command, headers); 507 } 508 509 /** 510 * Create headers for a heartbeat. While a STOMP heartbeat frame does not 511 * have headers, a session id is needed for processing purposes at a minimum. 512 */ 513 public static StompHeaderAccessor createForHeartbeat() { 514 return new StompHeaderAccessor(); 515 } 516 517 /** 518 * Create an instance from the payload and headers of the given Message. 519 */ 520 public static StompHeaderAccessor wrap(Message<?> message) { 521 return new StompHeaderAccessor(message); 522 } 523 524 /** 525 * Return the STOMP command from the given headers, or {@code null} if not set. 526 */ 527 @Nullable 528 public static StompCommand getCommand(Map<String, Object> headers) { 529 return (StompCommand) headers.get(COMMAND_HEADER); 530 } 531 532 /** 533 * Return the passcode header value, or {@code null} if not set. 534 */ 535 @Nullable 536 public static String getPasscode(Map<String, Object> headers) { 537 StompPasscode credentials = (StompPasscode) headers.get(CREDENTIALS_HEADER); 538 return (credentials != null ? credentials.passcode : null); 539 } 540 541 @Nullable 542 public static Integer getContentLength(Map<String, List<String>> nativeHeaders) { 543 List<String> values = nativeHeaders.get(STOMP_CONTENT_LENGTH_HEADER); 544 return (!CollectionUtils.isEmpty(values) ? Integer.valueOf(values.get(0)) : null); 545 } 546 547 548 private static class StompPasscode { 549 550 private final String passcode; 551 552 public StompPasscode(String passcode) { 553 this.passcode = passcode; 554 } 555 556 @Override 557 public String toString() { 558 return "[PROTECTED]"; 559 } 560 } 561 562}