001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.broker;
018
019import java.util.Collection;
020import java.util.HashSet;
021import java.util.LinkedHashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.CopyOnWriteArraySet;
028
029import org.springframework.expression.EvaluationContext;
030import org.springframework.expression.Expression;
031import org.springframework.expression.ExpressionParser;
032import org.springframework.expression.PropertyAccessor;
033import org.springframework.expression.TypedValue;
034import org.springframework.expression.spel.SpelEvaluationException;
035import org.springframework.expression.spel.standard.SpelExpressionParser;
036import org.springframework.expression.spel.support.SimpleEvaluationContext;
037import org.springframework.messaging.Message;
038import org.springframework.messaging.MessageHeaders;
039import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
040import org.springframework.messaging.support.MessageHeaderAccessor;
041import org.springframework.util.AntPathMatcher;
042import org.springframework.util.Assert;
043import org.springframework.util.LinkedMultiValueMap;
044import org.springframework.util.MultiValueMap;
045import org.springframework.util.PathMatcher;
046import org.springframework.util.StringUtils;
047
048/**
049 * Implementation of {@link SubscriptionRegistry} that stores subscriptions
050 * in memory and uses a {@link org.springframework.util.PathMatcher PathMatcher}
051 * for matching destinations.
052 *
053 * <p>As of 4.2, this class supports a {@link #setSelectorHeaderName selector}
054 * header on subscription messages with Spring EL expressions evaluated against
055 * the headers to filter out messages in addition to destination matching.
056 *
057 * @author Rossen Stoyanchev
058 * @author Sebastien Deleuze
059 * @author Juergen Hoeller
060 * @since 4.0
061 */
062public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
063
064        /** Default maximum number of entries for the destination cache: 1024 */
065        public static final int DEFAULT_CACHE_LIMIT = 1024;
066
067        /** Static evaluation context to reuse */
068        private static final EvaluationContext messageEvalContext =
069                        SimpleEvaluationContext.forPropertyAccessors(new SimpMessageHeaderPropertyAccessor()).build();
070
071
072        private PathMatcher pathMatcher = new AntPathMatcher();
073
074        private volatile int cacheLimit = DEFAULT_CACHE_LIMIT;
075
076        private String selectorHeaderName = "selector";
077
078        private volatile boolean selectorHeaderInUse = false;
079
080        private final ExpressionParser expressionParser = new SpelExpressionParser();
081
082        private final DestinationCache destinationCache = new DestinationCache();
083
084        private final SessionSubscriptionRegistry subscriptionRegistry = new SessionSubscriptionRegistry();
085
086
087        /**
088         * Specify the {@link PathMatcher} to use.
089         */
090        public void setPathMatcher(PathMatcher pathMatcher) {
091                this.pathMatcher = pathMatcher;
092        }
093
094        /**
095         * Return the configured {@link PathMatcher}.
096         */
097        public PathMatcher getPathMatcher() {
098                return this.pathMatcher;
099        }
100
101        /**
102         * Specify the maximum number of entries for the resolved destination cache.
103         * Default is 1024.
104         */
105        public void setCacheLimit(int cacheLimit) {
106                this.cacheLimit = cacheLimit;
107        }
108
109        /**
110         * Return the maximum number of entries for the resolved destination cache.
111         */
112        public int getCacheLimit() {
113                return this.cacheLimit;
114        }
115
116        /**
117         * Configure the name of a header that a subscription message can have for
118         * the purpose of filtering messages matched to the subscription. The header
119         * value is expected to be a Spring EL boolean expression to be applied to
120         * the headers of messages matched to the subscription.
121         * <p>For example:
122         * <pre>
123         * headers.foo == 'bar'
124         * </pre>
125         * <p>By default this is set to "selector". You can set it to a different
126         * name, or to {@code null} to turn off support for a selector header.
127         * @param selectorHeaderName the name to use for a selector header
128         * @since 4.2
129         */
130        public void setSelectorHeaderName(String selectorHeaderName) {
131                this.selectorHeaderName = (StringUtils.hasText(selectorHeaderName) ? selectorHeaderName : null);
132        }
133
134        /**
135         * Return the name for the selector header name.
136         * @since 4.2
137         */
138        public String getSelectorHeaderName() {
139                return this.selectorHeaderName;
140        }
141
142
143        @Override
144        protected void addSubscriptionInternal(
145                        String sessionId, String subsId, String destination, Message<?> message) {
146
147                Expression expression = getSelectorExpression(message.getHeaders());
148                this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression);
149                this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId);
150        }
151
152        private Expression getSelectorExpression(MessageHeaders headers) {
153                Expression expression = null;
154                if (getSelectorHeaderName() != null) {
155                        String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers);
156                        if (selector != null) {
157                                try {
158                                        expression = this.expressionParser.parseExpression(selector);
159                                        this.selectorHeaderInUse = true;
160                                        if (logger.isTraceEnabled()) {
161                                                logger.trace("Subscription selector: [" + selector + "]");
162                                        }
163                                }
164                                catch (Throwable ex) {
165                                        if (logger.isDebugEnabled()) {
166                                                logger.debug("Failed to parse selector: " + selector, ex);
167                                        }
168                                }
169                        }
170                }
171                return expression;
172        }
173
174        @Override
175        protected void removeSubscriptionInternal(String sessionId, String subsId, Message<?> message) {
176                SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId);
177                if (info != null) {
178                        String destination = info.removeSubscription(subsId);
179                        if (destination != null) {
180                                this.destinationCache.updateAfterRemovedSubscription(sessionId, subsId);
181                        }
182                }
183        }
184
185        @Override
186        public void unregisterAllSubscriptions(String sessionId) {
187                SessionSubscriptionInfo info = this.subscriptionRegistry.removeSubscriptions(sessionId);
188                if (info != null) {
189                        this.destinationCache.updateAfterRemovedSession(info);
190                }
191        }
192
193        @Override
194        protected MultiValueMap<String, String> findSubscriptionsInternal(String destination, Message<?> message) {
195                MultiValueMap<String, String> result = this.destinationCache.getSubscriptions(destination, message);
196                return filterSubscriptions(result, message);
197        }
198
199        private MultiValueMap<String, String> filterSubscriptions(
200                        MultiValueMap<String, String> allMatches, Message<?> message) {
201
202                if (!this.selectorHeaderInUse) {
203                        return allMatches;
204                }
205                MultiValueMap<String, String> result = new LinkedMultiValueMap<String, String>(allMatches.size());
206                for (String sessionId : allMatches.keySet()) {
207                        for (String subId : allMatches.get(sessionId)) {
208                                SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId);
209                                if (info == null) {
210                                        continue;
211                                }
212                                Subscription sub = info.getSubscription(subId);
213                                if (sub == null) {
214                                        continue;
215                                }
216                                Expression expression = sub.getSelectorExpression();
217                                if (expression == null) {
218                                        result.add(sessionId, subId);
219                                        continue;
220                                }
221                                try {
222                                        if (Boolean.TRUE.equals(expression.getValue(messageEvalContext, message, Boolean.class))) {
223                                                result.add(sessionId, subId);
224                                        }
225                                }
226                                catch (SpelEvaluationException ex) {
227                                        if (logger.isDebugEnabled()) {
228                                                logger.debug("Failed to evaluate selector: " + ex.getMessage());
229                                        }
230                                }
231                                catch (Throwable ex) {
232                                        logger.debug("Failed to evaluate selector", ex);
233                                }
234                        }
235                }
236                return result;
237        }
238
239        @Override
240        public String toString() {
241                return "DefaultSubscriptionRegistry[" + this.destinationCache + ", " + this.subscriptionRegistry + "]";
242        }
243
244
245        /**
246         * A cache for destinations previously resolved via
247         * {@link DefaultSubscriptionRegistry#findSubscriptionsInternal(String, Message)}
248         */
249        private class DestinationCache {
250
251                /** Map from destination -> <sessionId, subscriptionId> for fast look-ups */
252                private final Map<String, LinkedMultiValueMap<String, String>> accessCache =
253                                new ConcurrentHashMap<String, LinkedMultiValueMap<String, String>>(DEFAULT_CACHE_LIMIT);
254
255                /** Map from destination -> <sessionId, subscriptionId> with locking */
256                @SuppressWarnings("serial")
257                private final Map<String, LinkedMultiValueMap<String, String>> updateCache =
258                                new LinkedHashMap<String, LinkedMultiValueMap<String, String>>(DEFAULT_CACHE_LIMIT, 0.75f, true) {
259                                        @Override
260                                        protected boolean removeEldestEntry(Map.Entry<String, LinkedMultiValueMap<String, String>> eldest) {
261                                                if (size() > getCacheLimit()) {
262                                                        accessCache.remove(eldest.getKey());
263                                                        return true;
264                                                }
265                                                else {
266                                                        return false;
267                                                }
268                                        }
269                                };
270
271
272                public LinkedMultiValueMap<String, String> getSubscriptions(String destination, Message<?> message) {
273                        LinkedMultiValueMap<String, String> result = this.accessCache.get(destination);
274                        if (result == null) {
275                                synchronized (this.updateCache) {
276                                        result = new LinkedMultiValueMap<String, String>();
277                                        for (SessionSubscriptionInfo info : subscriptionRegistry.getAllSubscriptions()) {
278                                                for (String destinationPattern : info.getDestinations()) {
279                                                        if (getPathMatcher().match(destinationPattern, destination)) {
280                                                                for (Subscription subscription : info.getSubscriptions(destinationPattern)) {
281                                                                        result.add(info.sessionId, subscription.getId());
282                                                                }
283                                                        }
284                                                }
285                                        }
286                                        if (!result.isEmpty()) {
287                                                this.updateCache.put(destination, result.deepCopy());
288                                                this.accessCache.put(destination, result);
289                                        }
290                                }
291                        }
292                        return result;
293                }
294
295                public void updateAfterNewSubscription(String destination, String sessionId, String subsId) {
296                        synchronized (this.updateCache) {
297                                for (Map.Entry<String, LinkedMultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
298                                        String cachedDestination = entry.getKey();
299                                        if (getPathMatcher().match(destination, cachedDestination)) {
300                                                LinkedMultiValueMap<String, String> subs = entry.getValue();
301                                                // Subscription id's may also be populated via getSubscriptions()
302                                                List<String> subsForSession = subs.get(sessionId);
303                                                if (subsForSession == null || !subsForSession.contains(subsId)) {
304                                                        subs.add(sessionId, subsId);
305                                                        this.accessCache.put(cachedDestination, subs.deepCopy());
306                                                }
307                                        }
308                                }
309                        }
310                }
311
312                public void updateAfterRemovedSubscription(String sessionId, String subsId) {
313                        synchronized (this.updateCache) {
314                                Set<String> destinationsToRemove = new HashSet<String>();
315                                for (Map.Entry<String, LinkedMultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
316                                        String destination = entry.getKey();
317                                        LinkedMultiValueMap<String, String> sessionMap = entry.getValue();
318                                        List<String> subscriptions = sessionMap.get(sessionId);
319                                        if (subscriptions != null) {
320                                                subscriptions.remove(subsId);
321                                                if (subscriptions.isEmpty()) {
322                                                        sessionMap.remove(sessionId);
323                                                }
324                                                if (sessionMap.isEmpty()) {
325                                                        destinationsToRemove.add(destination);
326                                                }
327                                                else {
328                                                        this.accessCache.put(destination, sessionMap.deepCopy());
329                                                }
330                                        }
331                                }
332                                for (String destination : destinationsToRemove) {
333                                        this.updateCache.remove(destination);
334                                        this.accessCache.remove(destination);
335                                }
336                        }
337                }
338
339                public void updateAfterRemovedSession(SessionSubscriptionInfo info) {
340                        synchronized (this.updateCache) {
341                                Set<String> destinationsToRemove = new HashSet<String>();
342                                for (Map.Entry<String, LinkedMultiValueMap<String, String>> entry : this.updateCache.entrySet()) {
343                                        String destination = entry.getKey();
344                                        LinkedMultiValueMap<String, String> sessionMap = entry.getValue();
345                                        if (sessionMap.remove(info.getSessionId()) != null) {
346                                                if (sessionMap.isEmpty()) {
347                                                        destinationsToRemove.add(destination);
348                                                }
349                                                else {
350                                                        this.accessCache.put(destination, sessionMap.deepCopy());
351                                                }
352                                        }
353                                }
354                                for (String destination : destinationsToRemove) {
355                                        this.updateCache.remove(destination);
356                                        this.accessCache.remove(destination);
357                                }
358                        }
359                }
360
361                @Override
362                public String toString() {
363                        return "cache[" + this.accessCache.size() + " destination(s)]";
364                }
365        }
366
367
368        /**
369         * Provide access to session subscriptions by sessionId.
370         */
371        private static class SessionSubscriptionRegistry {
372
373                // sessionId -> SessionSubscriptionInfo
374                private final ConcurrentMap<String, SessionSubscriptionInfo> sessions =
375                                new ConcurrentHashMap<String, SessionSubscriptionInfo>();
376
377                public SessionSubscriptionInfo getSubscriptions(String sessionId) {
378                        return this.sessions.get(sessionId);
379                }
380
381                public Collection<SessionSubscriptionInfo> getAllSubscriptions() {
382                        return this.sessions.values();
383                }
384
385                public SessionSubscriptionInfo addSubscription(String sessionId, String subscriptionId,
386                                String destination, Expression selectorExpression) {
387
388                        SessionSubscriptionInfo info = this.sessions.get(sessionId);
389                        if (info == null) {
390                                info = new SessionSubscriptionInfo(sessionId);
391                                SessionSubscriptionInfo value = this.sessions.putIfAbsent(sessionId, info);
392                                if (value != null) {
393                                        info = value;
394                                }
395                        }
396                        info.addSubscription(destination, subscriptionId, selectorExpression);
397                        return info;
398                }
399
400                public SessionSubscriptionInfo removeSubscriptions(String sessionId) {
401                        return this.sessions.remove(sessionId);
402                }
403
404                @Override
405                public String toString() {
406                        return "registry[" + this.sessions.size() + " sessions]";
407                }
408        }
409
410
411        /**
412         * Hold subscriptions for a session.
413         */
414        private static class SessionSubscriptionInfo {
415
416                private final String sessionId;
417
418                // destination -> subscriptions
419                private final Map<String, Set<Subscription>> destinationLookup =
420                                new ConcurrentHashMap<String, Set<Subscription>>(4);
421
422                public SessionSubscriptionInfo(String sessionId) {
423                        Assert.notNull(sessionId, "'sessionId' must not be null");
424                        this.sessionId = sessionId;
425                }
426
427                public String getSessionId() {
428                        return this.sessionId;
429                }
430
431                public Set<String> getDestinations() {
432                        return this.destinationLookup.keySet();
433                }
434
435                public Set<Subscription> getSubscriptions(String destination) {
436                        return this.destinationLookup.get(destination);
437                }
438
439                public Subscription getSubscription(String subscriptionId) {
440                        for (Map.Entry<String, Set<DefaultSubscriptionRegistry.Subscription>> destinationEntry : this.destinationLookup.entrySet()) {
441                                Set<Subscription> subs = destinationEntry.getValue();
442                                if (subs != null) {
443                                        for (Subscription sub : subs) {
444                                                if (sub.getId().equalsIgnoreCase(subscriptionId)) {
445                                                        return sub;
446                                                }
447                                        }
448                                }
449                        }
450                        return null;
451                }
452
453                public void addSubscription(String destination, String subscriptionId, Expression selectorExpression) {
454                        Set<Subscription> subs = this.destinationLookup.get(destination);
455                        if (subs == null) {
456                                synchronized (this.destinationLookup) {
457                                        subs = this.destinationLookup.get(destination);
458                                        if (subs == null) {
459                                                subs = new CopyOnWriteArraySet<Subscription>();
460                                                this.destinationLookup.put(destination, subs);
461                                        }
462                                }
463                        }
464                        subs.add(new Subscription(subscriptionId, selectorExpression));
465                }
466
467                public String removeSubscription(String subscriptionId) {
468                        for (Map.Entry<String, Set<DefaultSubscriptionRegistry.Subscription>> destinationEntry : this.destinationLookup.entrySet()) {
469                                Set<Subscription> subs = destinationEntry.getValue();
470                                if (subs != null) {
471                                        for (Subscription sub : subs) {
472                                                if (sub.getId().equals(subscriptionId) && subs.remove(sub)) {
473                                                        synchronized (this.destinationLookup) {
474                                                                if (subs.isEmpty()) {
475                                                                        this.destinationLookup.remove(destinationEntry.getKey());
476                                                                }
477                                                        }
478                                                        return destinationEntry.getKey();
479                                                }
480                                        }
481                                }
482                        }
483                        return null;
484                }
485
486                @Override
487                public String toString() {
488                        return "[sessionId=" + this.sessionId + ", subscriptions=" + this.destinationLookup + "]";
489                }
490        }
491
492
493        private static final class Subscription {
494
495                private final String id;
496
497                private final Expression selectorExpression;
498
499                public Subscription(String id, Expression selector) {
500                        Assert.notNull(id, "Subscription id must not be null");
501                        this.id = id;
502                        this.selectorExpression = selector;
503                }
504
505                public String getId() {
506                        return this.id;
507                }
508
509                public Expression getSelectorExpression() {
510                        return this.selectorExpression;
511                }
512
513                @Override
514                public boolean equals(Object other) {
515                        return (this == other || (other instanceof Subscription && this.id.equals(((Subscription) other).id)));
516                }
517
518                @Override
519                public int hashCode() {
520                        return this.id.hashCode();
521                }
522
523                @Override
524                public String toString() {
525                        return "subscription(id=" + this.id + ")";
526                }
527        }
528
529
530        private static class SimpMessageHeaderPropertyAccessor implements PropertyAccessor {
531
532                @Override
533                public Class<?>[] getSpecificTargetClasses() {
534                        return new Class<?>[] {Message.class, MessageHeaders.class};
535                }
536
537                @Override
538                public boolean canRead(EvaluationContext context, Object target, String name) {
539                        return true;
540                }
541
542                @Override
543                public TypedValue read(EvaluationContext context, Object target, String name) {
544                        Object value;
545                        if (target instanceof Message) {
546                                value = name.equals("headers") ? ((Message) target).getHeaders() : null;
547                        }
548                        else if (target instanceof MessageHeaders) {
549                                MessageHeaders headers = (MessageHeaders) target;
550                                SimpMessageHeaderAccessor accessor =
551                                                MessageHeaderAccessor.getAccessor(headers, SimpMessageHeaderAccessor.class);
552                                Assert.state(accessor != null, "No SimpMessageHeaderAccessor");
553                                if ("destination".equalsIgnoreCase(name)) {
554                                        value = accessor.getDestination();
555                                }
556                                else {
557                                        value = accessor.getFirstNativeHeader(name);
558                                        if (value == null) {
559                                                value = headers.get(name);
560                                        }
561                                }
562                        }
563                        else {
564                                // Should never happen...
565                                throw new IllegalStateException("Expected Message or MessageHeaders.");
566                        }
567                        return new TypedValue(value);
568                }
569
570                @Override
571                public boolean canWrite(EvaluationContext context, Object target, String name) {
572                        return false;
573                }
574
575                @Override
576                public void write(EvaluationContext context, Object target, String name, Object value) {
577                }
578        }
579
580}