001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.stomp;
018
019import java.nio.charset.Charset;
020import java.nio.charset.StandardCharsets;
021import java.security.Principal;
022import java.util.Arrays;
023import java.util.Collections;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.atomic.AtomicLong;
028
029import org.springframework.lang.Nullable;
030import org.springframework.messaging.Message;
031import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
032import org.springframework.messaging.simp.SimpMessageType;
033import org.springframework.messaging.support.MessageHeaderAccessor;
034import org.springframework.util.ClassUtils;
035import org.springframework.util.CollectionUtils;
036import org.springframework.util.MimeType;
037import org.springframework.util.MimeTypeUtils;
038import org.springframework.util.StringUtils;
039
040/**
041 * A {@code MessageHeaderAccessor} to use when creating a {@code Message} from
042 * a decoded STOMP frame, or when encoding a {@code Message} to a STOMP frame.
043 *
044 * <p>When created from STOMP frame content, the actual STOMP headers are
045 * stored in the native header sub-map managed by the parent class
046 * {@link org.springframework.messaging.support.NativeMessageHeaderAccessor}
047 * while the parent class {@link SimpMessageHeaderAccessor} manages common
048 * processing headers some of which are based on STOMP headers
049 * (e.g. destination, content-type, etc).
050 *
051 * <p>An instance of this class can also be created by wrapping an existing
052 * {@code Message}. That message may have been created with the more generic
053 * {@link org.springframework.messaging.simp.SimpMessageHeaderAccessor} in
054 * which case STOMP headers are created from common processing headers.
055 * In this case it is also necessary to invoke either
056 * {@link #updateStompCommandAsClientMessage()} or
057 * {@link #updateStompCommandAsServerMessage()} if sending a message and
058 * depending on whether a message is sent to a client or the message broker.
059 *
060 * @author Rossen Stoyanchev
061 * @since 4.0
062 */
063public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
064
065        private static final AtomicLong messageIdCounter = new AtomicLong();
066
067        private static final long[] DEFAULT_HEARTBEAT = new long[] {0, 0};
068
069
070        // STOMP header names
071
072        public static final String STOMP_ID_HEADER = "id";
073
074        public static final String STOMP_HOST_HEADER = "host";
075
076        public static final String STOMP_ACCEPT_VERSION_HEADER = "accept-version";
077
078        public static final String STOMP_MESSAGE_ID_HEADER = "message-id";
079
080        public static final String STOMP_RECEIPT_HEADER = "receipt"; // any client frame except CONNECT
081
082        public static final String STOMP_RECEIPT_ID_HEADER = "receipt-id"; // RECEIPT frame
083
084        public static final String STOMP_SUBSCRIPTION_HEADER = "subscription";
085
086        public static final String STOMP_VERSION_HEADER = "version";
087
088        public static final String STOMP_MESSAGE_HEADER = "message";
089
090        public static final String STOMP_ACK_HEADER = "ack";
091
092        public static final String STOMP_NACK_HEADER = "nack";
093
094        public static final String STOMP_LOGIN_HEADER = "login";
095
096        public static final String STOMP_PASSCODE_HEADER = "passcode";
097
098        public static final String STOMP_DESTINATION_HEADER = "destination";
099
100        public static final String STOMP_CONTENT_TYPE_HEADER = "content-type";
101
102        public static final String STOMP_CONTENT_LENGTH_HEADER = "content-length";
103
104        public static final String STOMP_HEARTBEAT_HEADER = "heart-beat";
105
106        // Other header names
107
108        private static final String COMMAND_HEADER = "stompCommand";
109
110        private static final String CREDENTIALS_HEADER = "stompCredentials";
111
112
113        /**
114         * A constructor for creating message headers from a parsed STOMP frame.
115         */
116        StompHeaderAccessor(StompCommand command, @Nullable Map<String, List<String>> externalSourceHeaders) {
117                super(command.getMessageType(), externalSourceHeaders);
118                setHeader(COMMAND_HEADER, command);
119                updateSimpMessageHeadersFromStompHeaders();
120        }
121
122        /**
123         * A constructor for accessing and modifying existing message headers.
124         * Note that the message headers may not have been created from a STOMP frame
125         * but may have rather originated from using the more generic
126         * {@link org.springframework.messaging.simp.SimpMessageHeaderAccessor}.
127         */
128        StompHeaderAccessor(Message<?> message) {
129                super(message);
130                updateStompHeadersFromSimpMessageHeaders();
131        }
132
133        StompHeaderAccessor() {
134                super(SimpMessageType.HEARTBEAT, null);
135        }
136
137
138        void updateSimpMessageHeadersFromStompHeaders() {
139                if (getNativeHeaders() == null) {
140                        return;
141                }
142                String value = getFirstNativeHeader(STOMP_DESTINATION_HEADER);
143                if (value != null) {
144                        super.setDestination(value);
145                }
146                value = getFirstNativeHeader(STOMP_CONTENT_TYPE_HEADER);
147                if (value != null) {
148                        super.setContentType(MimeTypeUtils.parseMimeType(value));
149                }
150                StompCommand command = getCommand();
151                if (StompCommand.MESSAGE.equals(command)) {
152                        value = getFirstNativeHeader(STOMP_SUBSCRIPTION_HEADER);
153                        if (value != null) {
154                                super.setSubscriptionId(value);
155                        }
156                }
157                else if (StompCommand.SUBSCRIBE.equals(command) || StompCommand.UNSUBSCRIBE.equals(command)) {
158                        value = getFirstNativeHeader(STOMP_ID_HEADER);
159                        if (value != null) {
160                                super.setSubscriptionId(value);
161                        }
162                }
163                else if (StompCommand.CONNECT.equals(command) || StompCommand.STOMP.equals(command)) {
164                        protectPasscode();
165                }
166        }
167
168        void updateStompHeadersFromSimpMessageHeaders() {
169                String destination = getDestination();
170                if (destination != null) {
171                        setNativeHeader(STOMP_DESTINATION_HEADER, destination);
172                }
173                MimeType contentType = getContentType();
174                if (contentType != null) {
175                        setNativeHeader(STOMP_CONTENT_TYPE_HEADER, contentType.toString());
176                }
177                trySetStompHeaderForSubscriptionId();
178        }
179
180
181        @Override
182        protected MessageHeaderAccessor createAccessor(Message<?> message) {
183                return wrap(message);
184        }
185
186        // Redeclared for visibility within simp.stomp
187        @Override
188        @Nullable
189        protected Map<String, List<String>> getNativeHeaders() {
190                return super.getNativeHeaders();
191        }
192
193        public StompCommand updateStompCommandAsClientMessage() {
194                SimpMessageType messageType = getMessageType();
195                if (messageType != SimpMessageType.MESSAGE) {
196                        throw new IllegalStateException("Unexpected message type " + messageType);
197                }
198                StompCommand command = getCommand();
199                if (command == null) {
200                        command = StompCommand.SEND;
201                        setHeader(COMMAND_HEADER, command);
202                }
203                else if (!command.equals(StompCommand.SEND)) {
204                        throw new IllegalStateException("Unexpected STOMP command " + command);
205                }
206                return command;
207        }
208
209        public void updateStompCommandAsServerMessage() {
210                SimpMessageType messageType = getMessageType();
211                if (messageType != SimpMessageType.MESSAGE) {
212                        throw new IllegalStateException("Unexpected message type " + messageType);
213                }
214                StompCommand command = getCommand();
215                if ((command == null) || StompCommand.SEND.equals(command)) {
216                        setHeader(COMMAND_HEADER, StompCommand.MESSAGE);
217                }
218                else if (!StompCommand.MESSAGE.equals(command)) {
219                        throw new IllegalStateException("Unexpected STOMP command " + command);
220                }
221                trySetStompHeaderForSubscriptionId();
222                if (getMessageId() == null) {
223                        String messageId = getSessionId() + '-' + messageIdCounter.getAndIncrement();
224                        setNativeHeader(STOMP_MESSAGE_ID_HEADER, messageId);
225                }
226        }
227
228        /**
229         * Return the STOMP command, or {@code null} if not yet set.
230         */
231        @Nullable
232        public StompCommand getCommand() {
233                return (StompCommand) getHeader(COMMAND_HEADER);
234        }
235
236        public boolean isHeartbeat() {
237                return (SimpMessageType.HEARTBEAT == getMessageType());
238        }
239
240        public long[] getHeartbeat() {
241                String rawValue = getFirstNativeHeader(STOMP_HEARTBEAT_HEADER);
242                String[] rawValues = StringUtils.split(rawValue, ",");
243                if (rawValues == null) {
244                        return Arrays.copyOf(DEFAULT_HEARTBEAT, 2);
245                }
246                return new long[] {Long.parseLong(rawValues[0]), Long.parseLong(rawValues[1])};
247        }
248
249        public void setAcceptVersion(String acceptVersion) {
250                setNativeHeader(STOMP_ACCEPT_VERSION_HEADER, acceptVersion);
251        }
252
253        public Set<String> getAcceptVersion() {
254                String rawValue = getFirstNativeHeader(STOMP_ACCEPT_VERSION_HEADER);
255                return (rawValue != null ? StringUtils.commaDelimitedListToSet(rawValue) : Collections.emptySet());
256        }
257
258        public void setHost(@Nullable String host) {
259                setNativeHeader(STOMP_HOST_HEADER, host);
260        }
261
262        @Nullable
263        public String getHost() {
264                return getFirstNativeHeader(STOMP_HOST_HEADER);
265        }
266
267        @Override
268        public void setDestination(@Nullable String destination) {
269                super.setDestination(destination);
270                setNativeHeader(STOMP_DESTINATION_HEADER, destination);
271        }
272
273        @Override
274        public void setContentType(MimeType contentType) {
275                super.setContentType(contentType);
276                setNativeHeader(STOMP_CONTENT_TYPE_HEADER, contentType.toString());
277        }
278
279        @Override
280        public void setSubscriptionId(@Nullable String subscriptionId) {
281                super.setSubscriptionId(subscriptionId);
282                trySetStompHeaderForSubscriptionId();
283        }
284
285        private void trySetStompHeaderForSubscriptionId() {
286                String subscriptionId = getSubscriptionId();
287                if (subscriptionId != null) {
288                        StompCommand command = getCommand();
289                        if (command != null && StompCommand.MESSAGE.equals(command)) {
290                                setNativeHeader(STOMP_SUBSCRIPTION_HEADER, subscriptionId);
291                        }
292                        else {
293                                SimpMessageType messageType = getMessageType();
294                                if (SimpMessageType.SUBSCRIBE.equals(messageType) || SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
295                                        setNativeHeader(STOMP_ID_HEADER, subscriptionId);
296                                }
297                        }
298                }
299        }
300
301        @Nullable
302        public Integer getContentLength() {
303                String header = getFirstNativeHeader(STOMP_CONTENT_LENGTH_HEADER);
304                return (header != null ? Integer.valueOf(header) : null);
305        }
306
307        public void setContentLength(int contentLength) {
308                setNativeHeader(STOMP_CONTENT_LENGTH_HEADER, String.valueOf(contentLength));
309        }
310
311        public void setHeartbeat(long cx, long cy) {
312                setNativeHeader(STOMP_HEARTBEAT_HEADER, cx + "," + cy);
313        }
314
315        public void setAck(@Nullable String ack) {
316                setNativeHeader(STOMP_ACK_HEADER, ack);
317        }
318
319        @Nullable
320        public String getAck() {
321                return getFirstNativeHeader(STOMP_ACK_HEADER);
322        }
323
324        public void setNack(@Nullable String nack) {
325                setNativeHeader(STOMP_NACK_HEADER, nack);
326        }
327
328        @Nullable
329        public String getNack() {
330                return getFirstNativeHeader(STOMP_NACK_HEADER);
331        }
332
333        public void setLogin(@Nullable String login) {
334                setNativeHeader(STOMP_LOGIN_HEADER, login);
335        }
336
337        @Nullable
338        public String getLogin() {
339                return getFirstNativeHeader(STOMP_LOGIN_HEADER);
340        }
341
342        public void setPasscode(@Nullable String passcode) {
343                setNativeHeader(STOMP_PASSCODE_HEADER, passcode);
344                protectPasscode();
345        }
346
347        private void protectPasscode() {
348                String value = getFirstNativeHeader(STOMP_PASSCODE_HEADER);
349                if (value != null && !"PROTECTED".equals(value)) {
350                        setHeader(CREDENTIALS_HEADER, new StompPasscode(value));
351                        setNativeHeader(STOMP_PASSCODE_HEADER, "PROTECTED");
352                }
353        }
354
355        /**
356         * Return the passcode header value, or {@code null} if not set.
357         */
358        @Nullable
359        public String getPasscode() {
360                StompPasscode credentials = (StompPasscode) getHeader(CREDENTIALS_HEADER);
361                return (credentials != null ? credentials.passcode : null);
362        }
363
364        public void setReceiptId(@Nullable String receiptId) {
365                setNativeHeader(STOMP_RECEIPT_ID_HEADER, receiptId);
366        }
367
368        @Nullable
369        public String getReceiptId() {
370                return getFirstNativeHeader(STOMP_RECEIPT_ID_HEADER);
371        }
372
373        public void setReceipt(@Nullable String receiptId) {
374                setNativeHeader(STOMP_RECEIPT_HEADER, receiptId);
375        }
376
377        @Nullable
378        public String getReceipt() {
379                return getFirstNativeHeader(STOMP_RECEIPT_HEADER);
380        }
381
382        @Nullable
383        public String getMessage() {
384                return getFirstNativeHeader(STOMP_MESSAGE_HEADER);
385        }
386
387        public void setMessage(@Nullable String content) {
388                setNativeHeader(STOMP_MESSAGE_HEADER, content);
389        }
390
391        @Nullable
392        public String getMessageId() {
393                return getFirstNativeHeader(STOMP_MESSAGE_ID_HEADER);
394        }
395
396        public void setMessageId(@Nullable String id) {
397                setNativeHeader(STOMP_MESSAGE_ID_HEADER, id);
398        }
399
400        @Nullable
401        public String getVersion() {
402                return getFirstNativeHeader(STOMP_VERSION_HEADER);
403        }
404
405        public void setVersion(@Nullable String version) {
406                setNativeHeader(STOMP_VERSION_HEADER, version);
407        }
408
409
410        // Logging related
411
412        @Override
413        public String getShortLogMessage(Object payload) {
414                StompCommand command = getCommand();
415                if (StompCommand.SUBSCRIBE.equals(command)) {
416                        return "SUBSCRIBE " + getDestination() + " id=" + getSubscriptionId() + appendSession();
417                }
418                else if (StompCommand.UNSUBSCRIBE.equals(command)) {
419                        return "UNSUBSCRIBE id=" + getSubscriptionId() + appendSession();
420                }
421                else if (StompCommand.SEND.equals(command)) {
422                        return "SEND " + getDestination() + appendSession() + appendPayload(payload);
423                }
424                else if (StompCommand.CONNECT.equals(command)) {
425                        Principal user = getUser();
426                        return "CONNECT" + (user != null ? " user=" + user.getName() : "") + appendSession();
427                }
428                else if (StompCommand.STOMP.equals(command)) {
429                        Principal user = getUser();
430                        return "STOMP" + (user != null ? " user=" + user.getName() : "") + appendSession();
431                }
432                else if (StompCommand.CONNECTED.equals(command)) {
433                        return "CONNECTED heart-beat=" + Arrays.toString(getHeartbeat()) + appendSession();
434                }
435                else if (StompCommand.DISCONNECT.equals(command)) {
436                        String receipt = getReceipt();
437                        return "DISCONNECT" + (receipt != null ? " receipt=" + receipt : "") + appendSession();
438                }
439                else {
440                        return getDetailedLogMessage(payload);
441                }
442        }
443
444        @Override
445        public String getDetailedLogMessage(@Nullable Object payload) {
446                if (isHeartbeat()) {
447                        String sessionId = getSessionId();
448                        return "heart-beat" + (sessionId != null ? " in session " + sessionId : "");
449                }
450                StompCommand command = getCommand();
451                if (command == null) {
452                        return super.getDetailedLogMessage(payload);
453                }
454                StringBuilder sb = new StringBuilder();
455                sb.append(command.name()).append(" ");
456                Map<String, List<String>> nativeHeaders = getNativeHeaders();
457                if (nativeHeaders != null) {
458                        sb.append(nativeHeaders);
459                }
460                sb.append(appendSession());
461                if (getUser() != null) {
462                        sb.append(", user=").append(getUser().getName());
463                }
464                if (payload != null && command.isBodyAllowed()) {
465                        sb.append(appendPayload(payload));
466                }
467                return sb.toString();
468        }
469
470        private String appendSession() {
471                return " session=" + getSessionId();
472        }
473
474        private String appendPayload(Object payload) {
475                if (payload.getClass() != byte[].class) {
476                        throw new IllegalStateException(
477                                        "Expected byte array payload but got: " + ClassUtils.getQualifiedName(payload.getClass()));
478                }
479                byte[] bytes = (byte[]) payload;
480                MimeType mimeType = getContentType();
481                String contentType = (mimeType != null ? " " + mimeType.toString() : "");
482                if (bytes.length == 0 || mimeType == null || !isReadableContentType()) {
483                        return contentType;
484                }
485                Charset charset = mimeType.getCharset();
486                charset = (charset != null ? charset : StandardCharsets.UTF_8);
487                return (bytes.length < 80) ?
488                                contentType + " payload=" + new String(bytes, charset) :
489                                contentType + " payload=" + new String(Arrays.copyOf(bytes, 80), charset) + "...(truncated)";
490        }
491
492
493        // Static factory methods and accessors
494
495        /**
496         * Create an instance for the given STOMP command.
497         */
498        public static StompHeaderAccessor create(StompCommand command) {
499                return new StompHeaderAccessor(command, null);
500        }
501
502        /**
503         * Create an instance for the given STOMP command and headers.
504         */
505        public static StompHeaderAccessor create(StompCommand command, Map<String, List<String>> headers) {
506                return new StompHeaderAccessor(command, headers);
507        }
508
509        /**
510         * Create headers for a heartbeat. While a STOMP heartbeat frame does not
511         * have headers, a session id is needed for processing purposes at a minimum.
512         */
513        public static StompHeaderAccessor createForHeartbeat() {
514                return new StompHeaderAccessor();
515        }
516
517        /**
518         * Create an instance from the payload and headers of the given Message.
519         */
520        public static StompHeaderAccessor wrap(Message<?> message) {
521                return new StompHeaderAccessor(message);
522        }
523
524        /**
525         * Return the STOMP command from the given headers, or {@code null} if not set.
526         */
527        @Nullable
528        public static StompCommand getCommand(Map<String, Object> headers) {
529                return (StompCommand) headers.get(COMMAND_HEADER);
530        }
531
532        /**
533         * Return the passcode header value, or {@code null} if not set.
534         */
535        @Nullable
536        public static String getPasscode(Map<String, Object> headers) {
537                StompPasscode credentials = (StompPasscode) headers.get(CREDENTIALS_HEADER);
538                return (credentials != null ? credentials.passcode : null);
539        }
540
541        @Nullable
542        public static Integer getContentLength(Map<String, List<String>> nativeHeaders) {
543                List<String> values = nativeHeaders.get(STOMP_CONTENT_LENGTH_HEADER);
544                return (!CollectionUtils.isEmpty(values) ? Integer.valueOf(values.get(0)) : null);
545        }
546
547
548        private static class StompPasscode {
549
550                private final String passcode;
551
552                public StompPasscode(String passcode) {
553                        this.passcode = passcode;
554                }
555
556                @Override
557                public String toString() {
558                        return "[PROTECTED]";
559                }
560        }
561
562}