001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp; 018 019import java.security.Principal; 020import java.util.List; 021import java.util.Map; 022 023import org.springframework.messaging.Message; 024import org.springframework.messaging.support.IdTimestampMessageHeaderInitializer; 025import org.springframework.messaging.support.MessageHeaderAccessor; 026import org.springframework.messaging.support.NativeMessageHeaderAccessor; 027import org.springframework.util.Assert; 028import org.springframework.util.CollectionUtils; 029 030/** 031 * A base class for working with message headers in simple messaging protocols that 032 * support basic messaging patterns. Provides uniform access to specific values common 033 * across protocols such as a destination, message type (e.g. publish, subscribe, etc), 034 * session id, and others. 035 * 036 * <p>Use one of the static factory method in this class, then call getters and setters, 037 * and at the end if necessary call {@link #toMap()} to obtain the updated headers. 038 * 039 * @author Rossen Stoyanchev 040 * @since 4.0 041 */ 042public class SimpMessageHeaderAccessor extends NativeMessageHeaderAccessor { 043 044 private static final IdTimestampMessageHeaderInitializer headerInitializer; 045 046 static { 047 headerInitializer = new IdTimestampMessageHeaderInitializer(); 048 headerInitializer.setDisableIdGeneration(); 049 headerInitializer.setEnableTimestamp(false); 050 } 051 052 // SiMP header names 053 054 public static final String DESTINATION_HEADER = "simpDestination"; 055 056 public static final String MESSAGE_TYPE_HEADER = "simpMessageType"; 057 058 public static final String SESSION_ID_HEADER = "simpSessionId"; 059 060 public static final String SESSION_ATTRIBUTES = "simpSessionAttributes"; 061 062 public static final String SUBSCRIPTION_ID_HEADER = "simpSubscriptionId"; 063 064 public static final String USER_HEADER = "simpUser"; 065 066 public static final String CONNECT_MESSAGE_HEADER = "simpConnectMessage"; 067 068 public static final String DISCONNECT_MESSAGE_HEADER = "simpDisconnectMessage"; 069 070 public static final String HEART_BEAT_HEADER = "simpHeartbeat"; 071 072 073 /** 074 * A header for internal use with "user" destinations where we need to 075 * restore the destination prior to sending messages to clients. 076 */ 077 public static final String ORIGINAL_DESTINATION = "simpOrigDestination"; 078 079 /** 080 * A header that indicates to the broker that the sender will ignore errors. 081 * The header is simply checked for presence or absence. 082 */ 083 public static final String IGNORE_ERROR = "simpIgnoreError"; 084 085 086 /** 087 * A constructor for creating new message headers. 088 * This constructor is protected. See factory methods in this and sub-classes. 089 */ 090 protected SimpMessageHeaderAccessor(SimpMessageType messageType, Map<String, List<String>> externalSourceHeaders) { 091 super(externalSourceHeaders); 092 Assert.notNull(messageType, "MessageType must not be null"); 093 setHeader(MESSAGE_TYPE_HEADER, messageType); 094 headerInitializer.initHeaders(this); 095 } 096 097 /** 098 * A constructor for accessing and modifying existing message headers. This 099 * constructor is protected. See factory methods in this and sub-classes. 100 */ 101 protected SimpMessageHeaderAccessor(Message<?> message) { 102 super(message); 103 headerInitializer.initHeaders(this); 104 } 105 106 107 @Override 108 protected MessageHeaderAccessor createAccessor(Message<?> message) { 109 return wrap(message); 110 } 111 112 public void setMessageTypeIfNotSet(SimpMessageType messageType) { 113 if (getMessageType() == null) { 114 setHeader(MESSAGE_TYPE_HEADER, messageType); 115 } 116 } 117 118 public SimpMessageType getMessageType() { 119 return (SimpMessageType) getHeader(MESSAGE_TYPE_HEADER); 120 } 121 122 public void setDestination(String destination) { 123 Assert.notNull(destination, "Destination must not be null"); 124 setHeader(DESTINATION_HEADER, destination); 125 } 126 127 public String getDestination() { 128 return (String) getHeader(DESTINATION_HEADER); 129 } 130 131 public void setSubscriptionId(String subscriptionId) { 132 setHeader(SUBSCRIPTION_ID_HEADER, subscriptionId); 133 } 134 135 public String getSubscriptionId() { 136 return (String) getHeader(SUBSCRIPTION_ID_HEADER); 137 } 138 139 public void setSessionId(String sessionId) { 140 setHeader(SESSION_ID_HEADER, sessionId); 141 } 142 143 /** 144 * @return the id of the current session 145 */ 146 public String getSessionId() { 147 return (String) getHeader(SESSION_ID_HEADER); 148 } 149 150 /** 151 * A static alternative for access to the session attributes header. 152 */ 153 public void setSessionAttributes(Map<String, Object> attributes) { 154 setHeader(SESSION_ATTRIBUTES, attributes); 155 } 156 157 /** 158 * Return the attributes associated with the current session. 159 */ 160 @SuppressWarnings("unchecked") 161 public Map<String, Object> getSessionAttributes() { 162 return (Map<String, Object>) getHeader(SESSION_ATTRIBUTES); 163 } 164 165 public void setUser(Principal principal) { 166 setHeader(USER_HEADER, principal); 167 } 168 169 /** 170 * Return the user associated with the current session. 171 */ 172 public Principal getUser() { 173 return (Principal) getHeader(USER_HEADER); 174 } 175 176 @Override 177 public String getShortLogMessage(Object payload) { 178 if (getMessageType() == null) { 179 return super.getDetailedLogMessage(payload); 180 } 181 StringBuilder sb = getBaseLogMessage(); 182 if (!CollectionUtils.isEmpty(getSessionAttributes())) { 183 sb.append(" attributes[").append(getSessionAttributes().size()).append("]"); 184 } 185 sb.append(getShortPayloadLogMessage(payload)); 186 return sb.toString(); 187 } 188 189 @SuppressWarnings("unchecked") 190 @Override 191 public String getDetailedLogMessage(Object payload) { 192 if (getMessageType() == null) { 193 return super.getDetailedLogMessage(payload); 194 } 195 StringBuilder sb = getBaseLogMessage(); 196 if (!CollectionUtils.isEmpty(getSessionAttributes())) { 197 sb.append(" attributes=").append(getSessionAttributes()); 198 } 199 if (!CollectionUtils.isEmpty((Map<String, List<String>>) getHeader(NATIVE_HEADERS))) { 200 sb.append(" nativeHeaders=").append(getHeader(NATIVE_HEADERS)); 201 } 202 sb.append(getDetailedPayloadLogMessage(payload)); 203 return sb.toString(); 204 } 205 206 private StringBuilder getBaseLogMessage() { 207 StringBuilder sb = new StringBuilder(); 208 SimpMessageType messageType = getMessageType(); 209 sb.append(messageType != null ? messageType.name() : SimpMessageType.OTHER); 210 String destination = getDestination(); 211 if (destination != null) { 212 sb.append(" destination=").append(destination); 213 } 214 String subscriptionId = getSubscriptionId(); 215 if (subscriptionId != null) { 216 sb.append(" subscriptionId=").append(subscriptionId); 217 } 218 sb.append(" session=").append(getSessionId()); 219 Principal user = getUser(); 220 if (user != null) { 221 sb.append(" user=").append(user.getName()); 222 } 223 return sb; 224 } 225 226 227 // Static factory methods and accessors 228 229 /** 230 * Create an instance with 231 * {@link org.springframework.messaging.simp.SimpMessageType} {@code MESSAGE}. 232 */ 233 public static SimpMessageHeaderAccessor create() { 234 return new SimpMessageHeaderAccessor(SimpMessageType.MESSAGE, null); 235 } 236 237 /** 238 * Create an instance with the given 239 * {@link org.springframework.messaging.simp.SimpMessageType}. 240 */ 241 public static SimpMessageHeaderAccessor create(SimpMessageType messageType) { 242 return new SimpMessageHeaderAccessor(messageType, null); 243 } 244 245 /** 246 * Create an instance from the payload and headers of the given Message. 247 */ 248 public static SimpMessageHeaderAccessor wrap(Message<?> message) { 249 return new SimpMessageHeaderAccessor(message); 250 } 251 252 public static SimpMessageType getMessageType(Map<String, Object> headers) { 253 return (SimpMessageType) headers.get(MESSAGE_TYPE_HEADER); 254 } 255 256 public static String getDestination(Map<String, Object> headers) { 257 return (String) headers.get(DESTINATION_HEADER); 258 } 259 260 public static String getSubscriptionId(Map<String, Object> headers) { 261 return (String) headers.get(SUBSCRIPTION_ID_HEADER); 262 } 263 264 public static String getSessionId(Map<String, Object> headers) { 265 return (String) headers.get(SESSION_ID_HEADER); 266 } 267 268 @SuppressWarnings("unchecked") 269 public static Map<String, Object> getSessionAttributes(Map<String, Object> headers) { 270 return (Map<String, Object>) headers.get(SESSION_ATTRIBUTES); 271 } 272 273 public static Principal getUser(Map<String, Object> headers) { 274 return (Principal) headers.get(USER_HEADER); 275 } 276 277 public static long[] getHeartbeat(Map<String, Object> headers) { 278 return (long[]) headers.get(HEART_BEAT_HEADER); 279 } 280 281}