001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.broker;
018
019import org.apache.commons.logging.Log;
020import org.apache.commons.logging.LogFactory;
021
022import org.springframework.messaging.Message;
023import org.springframework.messaging.MessageHeaders;
024import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
025import org.springframework.messaging.simp.SimpMessageType;
026import org.springframework.util.CollectionUtils;
027import org.springframework.util.LinkedMultiValueMap;
028import org.springframework.util.MultiValueMap;
029
030/**
031 * Abstract base class for implementations of {@link SubscriptionRegistry} that
032 * looks up information in messages but delegates to abstract methods for the
033 * actual storage and retrieval.
034 *
035 * @author Rossen Stoyanchev
036 * @since 4.0
037 */
038public abstract class AbstractSubscriptionRegistry implements SubscriptionRegistry {
039
040        private static final MultiValueMap<String, String> EMPTY_MAP =
041                        CollectionUtils.unmodifiableMultiValueMap(new LinkedMultiValueMap<String, String>(0));
042
043        protected final Log logger = LogFactory.getLog(getClass());
044
045
046        @Override
047        public final void registerSubscription(Message<?> message) {
048                MessageHeaders headers = message.getHeaders();
049
050                SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
051                if (!SimpMessageType.SUBSCRIBE.equals(messageType)) {
052                        throw new IllegalArgumentException("Expected SUBSCRIBE: " + message);
053                }
054
055                String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
056                if (sessionId == null) {
057                        if (logger.isErrorEnabled()) {
058                                logger.error("No sessionId in  " + message);
059                        }
060                        return;
061                }
062
063                String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);
064                if (subscriptionId == null) {
065                        if (logger.isErrorEnabled()) {
066                                logger.error("No subscriptionId in " + message);
067                        }
068                        return;
069                }
070
071                String destination = SimpMessageHeaderAccessor.getDestination(headers);
072                if (destination == null) {
073                        if (logger.isErrorEnabled()) {
074                                logger.error("No destination in " + message);
075                        }
076                        return;
077                }
078
079                addSubscriptionInternal(sessionId, subscriptionId, destination, message);
080        }
081
082        @Override
083        public final void unregisterSubscription(Message<?> message) {
084                MessageHeaders headers = message.getHeaders();
085
086                SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
087                if (!SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
088                        throw new IllegalArgumentException("Expected UNSUBSCRIBE: " + message);
089                }
090
091                String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
092                if (sessionId == null) {
093                        if (logger.isErrorEnabled()) {
094                                logger.error("No sessionId in " + message);
095                        }
096                        return;
097                }
098
099                String subscriptionId = SimpMessageHeaderAccessor.getSubscriptionId(headers);
100                if (subscriptionId == null) {
101                        if (logger.isErrorEnabled()) {
102                                logger.error("No subscriptionId " + message);
103                        }
104                        return;
105                }
106
107                removeSubscriptionInternal(sessionId, subscriptionId, message);
108        }
109
110        @Override
111        public final MultiValueMap<String, String> findSubscriptions(Message<?> message) {
112                MessageHeaders headers = message.getHeaders();
113
114                SimpMessageType type = SimpMessageHeaderAccessor.getMessageType(headers);
115                if (!SimpMessageType.MESSAGE.equals(type)) {
116                        throw new IllegalArgumentException("Unexpected message type: " + type);
117                }
118
119                String destination = SimpMessageHeaderAccessor.getDestination(headers);
120                if (destination == null) {
121                        if (logger.isErrorEnabled()) {
122                                logger.error("No destination in " + message);
123                        }
124                        return EMPTY_MAP;
125                }
126
127                return findSubscriptionsInternal(destination, message);
128        }
129
130
131        protected abstract void addSubscriptionInternal(
132                        String sessionId, String subscriptionId, String destination, Message<?> message);
133
134        protected abstract void removeSubscriptionInternal(
135                        String sessionId, String subscriptionId, Message<?> message);
136
137        protected abstract MultiValueMap<String, String> findSubscriptionsInternal(
138                        String destination, Message<?> message);
139
140}