001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.support; 018 019import java.nio.charset.Charset; 020import java.nio.charset.StandardCharsets; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027import java.util.UUID; 028 029import org.springframework.lang.Nullable; 030import org.springframework.messaging.Message; 031import org.springframework.messaging.MessageChannel; 032import org.springframework.messaging.MessageHeaders; 033import org.springframework.util.Assert; 034import org.springframework.util.IdGenerator; 035import org.springframework.util.MimeType; 036import org.springframework.util.MimeTypeUtils; 037import org.springframework.util.ObjectUtils; 038import org.springframework.util.PatternMatchUtils; 039import org.springframework.util.StringUtils; 040 041/** 042 * Wrapper around {@link MessageHeaders} that provides extra features such as 043 * strongly typed accessors for specific headers, the ability to leave headers 044 * in a {@link Message} mutable, and the option to suppress automatic generation 045 * of {@link MessageHeaders#ID id} and {@link MessageHeaders#TIMESTAMP 046 * timesteamp} headers. Sub-classes such as {@link NativeMessageHeaderAccessor} 047 * and others provide support for managing processing vs external source headers 048 * as well as protocol specific headers. 049 * 050 * <p>Below is a workflow to initialize headers via {@code MessageHeaderAccessor}, 051 * or one of its sub-classes, then create a {@link Message}, and then re-obtain 052 * the accessor possibly from a different component: 053 * <pre class="code"> 054 * // Create a message with headers 055 * MessageHeaderAccessor accessor = new MessageHeaderAccessor(); 056 * accessor.setHeader("foo", "bar"); 057 * MessageHeaders headers = accessor.getMessageHeaders(); 058 * Message message = MessageBuilder.createMessage("payload", headers); 059 * 060 * // Later on 061 * MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message); 062 * Assert.notNull(accessor, "No MessageHeaderAccessor"); 063 * </pre> 064 * 065 * <p>In order for the above to work, all participating components must use 066 * {@code MessageHeaders} to create, access, or modify headers, or otherwise 067 * {@link MessageHeaderAccessor#getAccessor(Message, Class)} will return null. 068 * Below is a workflow that shows how headers are created and left mutable, 069 * then modified possibly by a different component, and finally made immutable 070 * perhaps before the possibility of being accessed on a different thread: 071 * <pre class="code"> 072 * // Create a message with mutable headers 073 * MessageHeaderAccessor accessor = new MessageHeaderAccessor(); 074 * accessor.setHeader("foo", "bar"); 075 * accessor.setLeaveMutable(true); 076 * MessageHeaders headers = accessor.getMessageHeaders(); 077 * Message message = MessageBuilder.createMessage("payload", headers); 078 * 079 * // Later on 080 * MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message); 081 * if (accessor.isMutable()) { 082 * // It's mutable, just change the headers 083 * accessor.setHeader("bar", "baz"); 084 * } 085 * else { 086 * // It's not, so get a mutable copy, change and re-create 087 * accessor = MessageHeaderAccessor.getMutableAccessor(message); 088 * accessor.setHeader("bar", "baz"); 089 * accessor.setLeaveMutable(true); // leave mutable again or not? 090 * message = MessageBuilder.createMessage(message.getPayload(), accessor); 091 * } 092 * 093 * // Make the accessor immutable 094 * MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message); 095 * accessor.setImmutable(); 096 * </pre> 097 * 098 * @author Rossen Stoyanchev 099 * @author Juergen Hoeller 100 * @since 4.0 101 */ 102public class MessageHeaderAccessor { 103 104 /** 105 * The default charset used for headers. 106 */ 107 public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; 108 109 private static final MimeType[] READABLE_MIME_TYPES = new MimeType[] { 110 MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_XML, 111 new MimeType("text", "*"), new MimeType("application", "*+json"), new MimeType("application", "*+xml") 112 }; 113 114 115 private final MutableMessageHeaders headers; 116 117 private boolean leaveMutable = false; 118 119 private boolean modified = false; 120 121 private boolean enableTimestamp = false; 122 123 @Nullable 124 private IdGenerator idGenerator; 125 126 127 /** 128 * A constructor to create new headers. 129 */ 130 public MessageHeaderAccessor() { 131 this(null); 132 } 133 134 /** 135 * A constructor accepting the headers of an existing message to copy. 136 * @param message a message to copy the headers from, or {@code null} if none 137 */ 138 public MessageHeaderAccessor(@Nullable Message<?> message) { 139 this.headers = new MutableMessageHeaders(message != null ? message.getHeaders() : null); 140 } 141 142 143 /** 144 * Build a 'nested' accessor for the given message. 145 * @param message the message to build a new accessor for 146 * @return the nested accessor (typically a specific subclass) 147 */ 148 protected MessageHeaderAccessor createAccessor(Message<?> message) { 149 return new MessageHeaderAccessor(message); 150 } 151 152 153 // Configuration properties 154 155 /** 156 * By default when {@link #getMessageHeaders()} is called, {@code "this"} 157 * {@code MessageHeaderAccessor} instance can no longer be used to modify the 158 * underlying message headers and the returned {@code MessageHeaders} is immutable. 159 * <p>However when this is set to {@code true}, the returned (underlying) 160 * {@code MessageHeaders} instance remains mutable. To make further modifications 161 * continue to use the same accessor instance or re-obtain it via:<br> 162 * {@link MessageHeaderAccessor#getAccessor(Message, Class) 163 * MessageHeaderAccessor.getAccessor(Message, Class)} 164 * <p>When modifications are complete use {@link #setImmutable()} to prevent 165 * further changes. The intended use case for this mechanism is initialization 166 * of a Message within a single thread. 167 * <p>By default this is set to {@code false}. 168 * @since 4.1 169 */ 170 public void setLeaveMutable(boolean leaveMutable) { 171 Assert.state(this.headers.isMutable(), "Already immutable"); 172 this.leaveMutable = leaveMutable; 173 } 174 175 /** 176 * By default when {@link #getMessageHeaders()} is called, {@code "this"} 177 * {@code MessageHeaderAccessor} instance can no longer be used to modify the 178 * underlying message headers. However if {@link #setLeaveMutable(boolean)} 179 * is used, this method is necessary to indicate explicitly when the 180 * {@code MessageHeaders} instance should no longer be modified. 181 * @since 4.1 182 */ 183 public void setImmutable() { 184 this.headers.setImmutable(); 185 } 186 187 /** 188 * Whether the underlying headers can still be modified. 189 * @since 4.1 190 */ 191 public boolean isMutable() { 192 return this.headers.isMutable(); 193 } 194 195 /** 196 * Mark the underlying message headers as modified. 197 * @param modified typically {@code true}, or {@code false} to reset the flag 198 * @since 4.1 199 */ 200 protected void setModified(boolean modified) { 201 this.modified = modified; 202 } 203 204 /** 205 * Check whether the underlying message headers have been marked as modified. 206 * @return {@code true} if the flag has been set, {@code false} otherwise 207 */ 208 public boolean isModified() { 209 return this.modified; 210 } 211 212 /** 213 * A package private mechanism to enables the automatic addition of the 214 * {@link org.springframework.messaging.MessageHeaders#TIMESTAMP} header. 215 * <p>By default, this property is set to {@code false}. 216 * @see IdTimestampMessageHeaderInitializer 217 */ 218 void setEnableTimestamp(boolean enableTimestamp) { 219 this.enableTimestamp = enableTimestamp; 220 } 221 222 /** 223 * A package-private mechanism to configure the IdGenerator strategy to use. 224 * <p>By default this property is not set in which case the default IdGenerator 225 * in {@link org.springframework.messaging.MessageHeaders} is used. 226 * @see IdTimestampMessageHeaderInitializer 227 */ 228 void setIdGenerator(IdGenerator idGenerator) { 229 this.idGenerator = idGenerator; 230 } 231 232 233 // Accessors for the resulting MessageHeaders 234 235 /** 236 * Return the underlying {@code MessageHeaders} instance. 237 * <p>Unless {@link #setLeaveMutable(boolean)} was set to {@code true}, after 238 * this call, the headers are immutable and this accessor can no longer 239 * modify them. 240 * <p>This method always returns the same {@code MessageHeaders} instance if 241 * invoked multiples times. To obtain a copy of the underlying headers, use 242 * {@link #toMessageHeaders()} or {@link #toMap()} instead. 243 * @since 4.1 244 */ 245 public MessageHeaders getMessageHeaders() { 246 if (!this.leaveMutable) { 247 setImmutable(); 248 } 249 return this.headers; 250 } 251 252 /** 253 * Return a copy of the underlying header values as a {@link MessageHeaders} object. 254 * <p>This method can be invoked many times, with modifications in between 255 * where each new call returns a fresh copy of the current header values. 256 * @since 4.1 257 */ 258 public MessageHeaders toMessageHeaders() { 259 return new MessageHeaders(this.headers); 260 } 261 262 /** 263 * Return a copy of the underlying header values as a plain {@link Map} object. 264 * <p>This method can be invoked many times, with modifications in between 265 * where each new call returns a fresh copy of the current header values. 266 */ 267 public Map<String, Object> toMap() { 268 return new HashMap<>(this.headers); 269 } 270 271 272 // Generic header accessors 273 274 /** 275 * Retrieve the value for the header with the given name. 276 * @param headerName the name of the header 277 * @return the associated value, or {@code null} if none found 278 */ 279 @Nullable 280 public Object getHeader(String headerName) { 281 return this.headers.get(headerName); 282 } 283 284 /** 285 * Set the value for the given header name. 286 * <p>If the provided value is {@code null}, the header will be removed. 287 */ 288 public void setHeader(String name, @Nullable Object value) { 289 if (isReadOnly(name)) { 290 throw new IllegalArgumentException("'" + name + "' header is read-only"); 291 } 292 verifyType(name, value); 293 if (value != null) { 294 // Modify header if necessary 295 if (!ObjectUtils.nullSafeEquals(value, getHeader(name))) { 296 this.modified = true; 297 this.headers.getRawHeaders().put(name, value); 298 } 299 } 300 else { 301 // Remove header if available 302 if (this.headers.containsKey(name)) { 303 this.modified = true; 304 this.headers.getRawHeaders().remove(name); 305 } 306 } 307 } 308 309 protected void verifyType(@Nullable String headerName, @Nullable Object headerValue) { 310 if (headerName != null && headerValue != null) { 311 if (MessageHeaders.ERROR_CHANNEL.equals(headerName) || 312 MessageHeaders.REPLY_CHANNEL.endsWith(headerName)) { 313 if (!(headerValue instanceof MessageChannel || headerValue instanceof String)) { 314 throw new IllegalArgumentException( 315 "'" + headerName + "' header value must be a MessageChannel or String"); 316 } 317 } 318 } 319 } 320 321 /** 322 * Set the value for the given header name only if the header name is not 323 * already associated with a value. 324 */ 325 public void setHeaderIfAbsent(String name, Object value) { 326 if (getHeader(name) == null) { 327 setHeader(name, value); 328 } 329 } 330 331 /** 332 * Remove the value for the given header name. 333 */ 334 public void removeHeader(String headerName) { 335 if (StringUtils.hasLength(headerName) && !isReadOnly(headerName)) { 336 setHeader(headerName, null); 337 } 338 } 339 340 /** 341 * Removes all headers provided via array of 'headerPatterns'. 342 * <p>As the name suggests, array may contain simple matching patterns for header 343 * names. Supported pattern styles are: "xxx*", "*xxx", "*xxx*" and "xxx*yyy". 344 */ 345 public void removeHeaders(String... headerPatterns) { 346 List<String> headersToRemove = new ArrayList<>(); 347 for (String pattern : headerPatterns) { 348 if (StringUtils.hasLength(pattern)){ 349 if (pattern.contains("*")){ 350 headersToRemove.addAll(getMatchingHeaderNames(pattern, this.headers)); 351 } 352 else { 353 headersToRemove.add(pattern); 354 } 355 } 356 } 357 for (String headerToRemove : headersToRemove) { 358 removeHeader(headerToRemove); 359 } 360 } 361 362 private List<String> getMatchingHeaderNames(String pattern, @Nullable Map<String, Object> headers) { 363 if (headers == null) { 364 return Collections.emptyList(); 365 } 366 List<String> matchingHeaderNames = new ArrayList<>(); 367 for (String key : headers.keySet()) { 368 if (PatternMatchUtils.simpleMatch(pattern, key)) { 369 matchingHeaderNames.add(key); 370 } 371 } 372 return matchingHeaderNames; 373 } 374 375 /** 376 * Copy the name-value pairs from the provided Map. 377 * <p>This operation will overwrite any existing values. Use 378 * {@link #copyHeadersIfAbsent(Map)} to avoid overwriting values. 379 */ 380 public void copyHeaders(@Nullable Map<String, ?> headersToCopy) { 381 if (headersToCopy == null || this.headers == headersToCopy) { 382 return; 383 } 384 headersToCopy.forEach((key, value) -> { 385 if (!isReadOnly(key)) { 386 setHeader(key, value); 387 } 388 }); 389 } 390 391 /** 392 * Copy the name-value pairs from the provided Map. 393 * <p>This operation will <em>not</em> overwrite any existing values. 394 */ 395 public void copyHeadersIfAbsent(@Nullable Map<String, ?> headersToCopy) { 396 if (headersToCopy == null || this.headers == headersToCopy) { 397 return; 398 } 399 headersToCopy.forEach((key, value) -> { 400 if (!isReadOnly(key)) { 401 setHeaderIfAbsent(key, value); 402 } 403 }); 404 } 405 406 protected boolean isReadOnly(String headerName) { 407 return (MessageHeaders.ID.equals(headerName) || MessageHeaders.TIMESTAMP.equals(headerName)); 408 } 409 410 411 // Specific header accessors 412 413 @Nullable 414 public UUID getId() { 415 Object value = getHeader(MessageHeaders.ID); 416 if (value == null) { 417 return null; 418 } 419 return (value instanceof UUID ? (UUID) value : UUID.fromString(value.toString())); 420 } 421 422 @Nullable 423 public Long getTimestamp() { 424 Object value = getHeader(MessageHeaders.TIMESTAMP); 425 if (value == null) { 426 return null; 427 } 428 return (value instanceof Long ? (Long) value : Long.parseLong(value.toString())); 429 } 430 431 public void setContentType(MimeType contentType) { 432 setHeader(MessageHeaders.CONTENT_TYPE, contentType); 433 } 434 435 @Nullable 436 public MimeType getContentType() { 437 Object value = getHeader(MessageHeaders.CONTENT_TYPE); 438 if (value == null) { 439 return null; 440 } 441 return (value instanceof MimeType ? (MimeType) value : MimeType.valueOf(value.toString())); 442 } 443 444 private Charset getCharset() { 445 MimeType contentType = getContentType(); 446 Charset charset = (contentType != null ? contentType.getCharset() : null); 447 return (charset != null ? charset : DEFAULT_CHARSET); 448 } 449 450 public void setReplyChannelName(String replyChannelName) { 451 setHeader(MessageHeaders.REPLY_CHANNEL, replyChannelName); 452 } 453 454 public void setReplyChannel(MessageChannel replyChannel) { 455 setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel); 456 } 457 458 @Nullable 459 public Object getReplyChannel() { 460 return getHeader(MessageHeaders.REPLY_CHANNEL); 461 } 462 463 public void setErrorChannelName(String errorChannelName) { 464 setHeader(MessageHeaders.ERROR_CHANNEL, errorChannelName); 465 } 466 467 public void setErrorChannel(MessageChannel errorChannel) { 468 setHeader(MessageHeaders.ERROR_CHANNEL, errorChannel); 469 } 470 471 @Nullable 472 public Object getErrorChannel() { 473 return getHeader(MessageHeaders.ERROR_CHANNEL); 474 } 475 476 477 // Log message stuff 478 479 /** 480 * Return a concise message for logging purposes. 481 * @param payload the payload that corresponds to the headers. 482 * @return the message 483 */ 484 public String getShortLogMessage(Object payload) { 485 return "headers=" + this.headers.toString() + getShortPayloadLogMessage(payload); 486 } 487 488 /** 489 * Return a more detailed message for logging purposes. 490 * @param payload the payload that corresponds to the headers. 491 * @return the message 492 */ 493 public String getDetailedLogMessage(@Nullable Object payload) { 494 return "headers=" + this.headers.toString() + getDetailedPayloadLogMessage(payload); 495 } 496 497 protected String getShortPayloadLogMessage(Object payload) { 498 if (payload instanceof String) { 499 String payloadText = (String) payload; 500 return (payloadText.length() < 80) ? 501 " payload=" + payloadText : 502 " payload=" + payloadText.substring(0, 80) + "...(truncated)"; 503 } 504 else if (payload instanceof byte[]) { 505 byte[] bytes = (byte[]) payload; 506 if (isReadableContentType()) { 507 return (bytes.length < 80) ? 508 " payload=" + new String(bytes, getCharset()) : 509 " payload=" + new String(Arrays.copyOf(bytes, 80), getCharset()) + "...(truncated)"; 510 } 511 else { 512 return " payload=byte[" + bytes.length + "]"; 513 } 514 } 515 else { 516 String payloadText = payload.toString(); 517 return (payloadText.length() < 80) ? 518 " payload=" + payloadText : 519 " payload=" + ObjectUtils.identityToString(payload); 520 } 521 } 522 523 protected String getDetailedPayloadLogMessage(@Nullable Object payload) { 524 if (payload instanceof String) { 525 return " payload=" + payload; 526 } 527 else if (payload instanceof byte[]) { 528 byte[] bytes = (byte[]) payload; 529 if (isReadableContentType()) { 530 return " payload=" + new String(bytes, getCharset()); 531 } 532 else { 533 return " payload=byte[" + bytes.length + "]"; 534 } 535 } 536 else { 537 return " payload=" + payload; 538 } 539 } 540 541 protected boolean isReadableContentType() { 542 MimeType contentType = getContentType(); 543 for (MimeType mimeType : READABLE_MIME_TYPES) { 544 if (mimeType.includes(contentType)) { 545 return true; 546 } 547 } 548 return false; 549 } 550 551 @Override 552 public String toString() { 553 return getClass().getSimpleName() + " [headers=" + this.headers + "]"; 554 } 555 556 557 // Static factory methods 558 559 /** 560 * Return the original {@code MessageHeaderAccessor} used to create the headers 561 * of the given {@code Message}, or {@code null} if that's not available or if 562 * its type does not match the required type. 563 * <p>This is for cases where the existence of an accessor is strongly expected 564 * (followed up with an assertion) or where an accessor will be created otherwise. 565 * @param message the message to get an accessor for 566 * @return an accessor instance of the specified type, or {@code null} if none 567 * @since 5.1.19 568 */ 569 @Nullable 570 public static MessageHeaderAccessor getAccessor(Message<?> message) { 571 return getAccessor(message.getHeaders(), null); 572 } 573 574 /** 575 * Return the original {@code MessageHeaderAccessor} used to create the headers 576 * of the given {@code Message}, or {@code null} if that's not available or if 577 * its type does not match the required type. 578 * <p>This is for cases where the existence of an accessor is strongly expected 579 * (followed up with an assertion) or where an accessor will be created otherwise. 580 * @param message the message to get an accessor for 581 * @param requiredType the required accessor type (or {@code null} for any) 582 * @return an accessor instance of the specified type, or {@code null} if none 583 * @since 4.1 584 */ 585 @Nullable 586 public static <T extends MessageHeaderAccessor> T getAccessor(Message<?> message, @Nullable Class<T> requiredType) { 587 return getAccessor(message.getHeaders(), requiredType); 588 } 589 590 /** 591 * A variation of {@link #getAccessor(org.springframework.messaging.Message, Class)} 592 * with a {@code MessageHeaders} instance instead of a {@code Message}. 593 * <p>This is for cases when a full message may not have been created yet. 594 * @param messageHeaders the message headers to get an accessor for 595 * @param requiredType the required accessor type (or {@code null} for any) 596 * @return an accessor instance of the specified type, or {@code null} if none 597 * @since 4.1 598 */ 599 @SuppressWarnings("unchecked") 600 @Nullable 601 public static <T extends MessageHeaderAccessor> T getAccessor( 602 MessageHeaders messageHeaders, @Nullable Class<T> requiredType) { 603 604 if (messageHeaders instanceof MutableMessageHeaders) { 605 MutableMessageHeaders mutableHeaders = (MutableMessageHeaders) messageHeaders; 606 MessageHeaderAccessor headerAccessor = mutableHeaders.getAccessor(); 607 if (requiredType == null || requiredType.isInstance(headerAccessor)) { 608 return (T) headerAccessor; 609 } 610 } 611 return null; 612 } 613 614 /** 615 * Return a mutable {@code MessageHeaderAccessor} for the given message attempting 616 * to match the type of accessor used to create the message headers, or otherwise 617 * wrapping the message with a {@code MessageHeaderAccessor} instance. 618 * <p>This is for cases where a header needs to be updated in generic code 619 * while preserving the accessor type for downstream processing. 620 * @return an accessor of the required type (never {@code null}) 621 * @since 4.1 622 */ 623 public static MessageHeaderAccessor getMutableAccessor(Message<?> message) { 624 if (message.getHeaders() instanceof MutableMessageHeaders) { 625 MutableMessageHeaders mutableHeaders = (MutableMessageHeaders) message.getHeaders(); 626 MessageHeaderAccessor accessor = mutableHeaders.getAccessor(); 627 return (accessor.isMutable() ? accessor : accessor.createAccessor(message)); 628 } 629 return new MessageHeaderAccessor(message); 630 } 631 632 633 /** 634 * Extension of {@link MessageHeaders} that helps to preserve the link to 635 * the outer {@link MessageHeaderAccessor} instance that created it as well 636 * as keeps track of whether headers are still mutable. 637 */ 638 @SuppressWarnings("serial") 639 private class MutableMessageHeaders extends MessageHeaders { 640 641 private boolean mutable = true; 642 643 public MutableMessageHeaders(@Nullable Map<String, Object> headers) { 644 super(headers, MessageHeaders.ID_VALUE_NONE, -1L); 645 } 646 647 @Override 648 public Map<String, Object> getRawHeaders() { 649 Assert.state(this.mutable, "Already immutable"); 650 return super.getRawHeaders(); 651 } 652 653 public void setImmutable() { 654 if (!this.mutable) { 655 return; 656 } 657 658 if (getId() == null) { 659 IdGenerator idGenerator = (MessageHeaderAccessor.this.idGenerator != null ? 660 MessageHeaderAccessor.this.idGenerator : MessageHeaders.getIdGenerator()); 661 UUID id = idGenerator.generateId(); 662 if (id != MessageHeaders.ID_VALUE_NONE) { 663 getRawHeaders().put(ID, id); 664 } 665 } 666 667 if (getTimestamp() == null) { 668 if (MessageHeaderAccessor.this.enableTimestamp) { 669 getRawHeaders().put(TIMESTAMP, System.currentTimeMillis()); 670 } 671 } 672 673 this.mutable = false; 674 } 675 676 public boolean isMutable() { 677 return this.mutable; 678 } 679 680 public MessageHeaderAccessor getAccessor() { 681 return MessageHeaderAccessor.this; 682 } 683 684 protected Object writeReplace() { 685 // Serialize as regular MessageHeaders (without MessageHeaderAccessor reference) 686 return new MessageHeaders(this); 687 } 688 } 689 690}