001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.broker; 018 019import java.util.Collection; 020import java.util.HashSet; 021import java.util.LinkedHashMap; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.CopyOnWriteArraySet; 028 029import org.springframework.expression.EvaluationContext; 030import org.springframework.expression.Expression; 031import org.springframework.expression.ExpressionParser; 032import org.springframework.expression.PropertyAccessor; 033import org.springframework.expression.TypedValue; 034import org.springframework.expression.spel.SpelEvaluationException; 035import org.springframework.expression.spel.standard.SpelExpressionParser; 036import org.springframework.expression.spel.support.SimpleEvaluationContext; 037import org.springframework.messaging.Message; 038import org.springframework.messaging.MessageHeaders; 039import org.springframework.messaging.simp.SimpMessageHeaderAccessor; 040import org.springframework.messaging.support.MessageHeaderAccessor; 041import org.springframework.util.AntPathMatcher; 042import org.springframework.util.Assert; 043import org.springframework.util.LinkedMultiValueMap; 044import org.springframework.util.MultiValueMap; 045import org.springframework.util.PathMatcher; 046import org.springframework.util.StringUtils; 047 048/** 049 * Implementation of {@link SubscriptionRegistry} that stores subscriptions 050 * in memory and uses a {@link org.springframework.util.PathMatcher PathMatcher} 051 * for matching destinations. 052 * 053 * <p>As of 4.2, this class supports a {@link #setSelectorHeaderName selector} 054 * header on subscription messages with Spring EL expressions evaluated against 055 * the headers to filter out messages in addition to destination matching. 056 * 057 * @author Rossen Stoyanchev 058 * @author Sebastien Deleuze 059 * @author Juergen Hoeller 060 * @since 4.0 061 */ 062public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry { 063 064 /** Default maximum number of entries for the destination cache: 1024 */ 065 public static final int DEFAULT_CACHE_LIMIT = 1024; 066 067 /** Static evaluation context to reuse */ 068 private static final EvaluationContext messageEvalContext = 069 SimpleEvaluationContext.forPropertyAccessors(new SimpMessageHeaderPropertyAccessor()).build(); 070 071 072 private PathMatcher pathMatcher = new AntPathMatcher(); 073 074 private volatile int cacheLimit = DEFAULT_CACHE_LIMIT; 075 076 private String selectorHeaderName = "selector"; 077 078 private volatile boolean selectorHeaderInUse = false; 079 080 private final ExpressionParser expressionParser = new SpelExpressionParser(); 081 082 private final DestinationCache destinationCache = new DestinationCache(); 083 084 private final SessionSubscriptionRegistry subscriptionRegistry = new SessionSubscriptionRegistry(); 085 086 087 /** 088 * Specify the {@link PathMatcher} to use. 089 */ 090 public void setPathMatcher(PathMatcher pathMatcher) { 091 this.pathMatcher = pathMatcher; 092 } 093 094 /** 095 * Return the configured {@link PathMatcher}. 096 */ 097 public PathMatcher getPathMatcher() { 098 return this.pathMatcher; 099 } 100 101 /** 102 * Specify the maximum number of entries for the resolved destination cache. 103 * Default is 1024. 104 */ 105 public void setCacheLimit(int cacheLimit) { 106 this.cacheLimit = cacheLimit; 107 } 108 109 /** 110 * Return the maximum number of entries for the resolved destination cache. 111 */ 112 public int getCacheLimit() { 113 return this.cacheLimit; 114 } 115 116 /** 117 * Configure the name of a header that a subscription message can have for 118 * the purpose of filtering messages matched to the subscription. The header 119 * value is expected to be a Spring EL boolean expression to be applied to 120 * the headers of messages matched to the subscription. 121 * <p>For example: 122 * <pre> 123 * headers.foo == 'bar' 124 * </pre> 125 * <p>By default this is set to "selector". You can set it to a different 126 * name, or to {@code null} to turn off support for a selector header. 127 * @param selectorHeaderName the name to use for a selector header 128 * @since 4.2 129 */ 130 public void setSelectorHeaderName(String selectorHeaderName) { 131 this.selectorHeaderName = (StringUtils.hasText(selectorHeaderName) ? selectorHeaderName : null); 132 } 133 134 /** 135 * Return the name for the selector header name. 136 * @since 4.2 137 */ 138 public String getSelectorHeaderName() { 139 return this.selectorHeaderName; 140 } 141 142 143 @Override 144 protected void addSubscriptionInternal( 145 String sessionId, String subsId, String destination, Message<?> message) { 146 147 Expression expression = getSelectorExpression(message.getHeaders()); 148 this.subscriptionRegistry.addSubscription(sessionId, subsId, destination, expression); 149 this.destinationCache.updateAfterNewSubscription(destination, sessionId, subsId); 150 } 151 152 private Expression getSelectorExpression(MessageHeaders headers) { 153 Expression expression = null; 154 if (getSelectorHeaderName() != null) { 155 String selector = SimpMessageHeaderAccessor.getFirstNativeHeader(getSelectorHeaderName(), headers); 156 if (selector != null) { 157 try { 158 expression = this.expressionParser.parseExpression(selector); 159 this.selectorHeaderInUse = true; 160 if (logger.isTraceEnabled()) { 161 logger.trace("Subscription selector: [" + selector + "]"); 162 } 163 } 164 catch (Throwable ex) { 165 if (logger.isDebugEnabled()) { 166 logger.debug("Failed to parse selector: " + selector, ex); 167 } 168 } 169 } 170 } 171 return expression; 172 } 173 174 @Override 175 protected void removeSubscriptionInternal(String sessionId, String subsId, Message<?> message) { 176 SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId); 177 if (info != null) { 178 String destination = info.removeSubscription(subsId); 179 if (destination != null) { 180 this.destinationCache.updateAfterRemovedSubscription(sessionId, subsId); 181 } 182 } 183 } 184 185 @Override 186 public void unregisterAllSubscriptions(String sessionId) { 187 SessionSubscriptionInfo info = this.subscriptionRegistry.removeSubscriptions(sessionId); 188 if (info != null) { 189 this.destinationCache.updateAfterRemovedSession(info); 190 } 191 } 192 193 @Override 194 protected MultiValueMap<String, String> findSubscriptionsInternal(String destination, Message<?> message) { 195 MultiValueMap<String, String> result = this.destinationCache.getSubscriptions(destination, message); 196 return filterSubscriptions(result, message); 197 } 198 199 private MultiValueMap<String, String> filterSubscriptions( 200 MultiValueMap<String, String> allMatches, Message<?> message) { 201 202 if (!this.selectorHeaderInUse) { 203 return allMatches; 204 } 205 MultiValueMap<String, String> result = new LinkedMultiValueMap<String, String>(allMatches.size()); 206 for (String sessionId : allMatches.keySet()) { 207 for (String subId : allMatches.get(sessionId)) { 208 SessionSubscriptionInfo info = this.subscriptionRegistry.getSubscriptions(sessionId); 209 if (info == null) { 210 continue; 211 } 212 Subscription sub = info.getSubscription(subId); 213 if (sub == null) { 214 continue; 215 } 216 Expression expression = sub.getSelectorExpression(); 217 if (expression == null) { 218 result.add(sessionId, subId); 219 continue; 220 } 221 try { 222 if (Boolean.TRUE.equals(expression.getValue(messageEvalContext, message, Boolean.class))) { 223 result.add(sessionId, subId); 224 } 225 } 226 catch (SpelEvaluationException ex) { 227 if (logger.isDebugEnabled()) { 228 logger.debug("Failed to evaluate selector: " + ex.getMessage()); 229 } 230 } 231 catch (Throwable ex) { 232 logger.debug("Failed to evaluate selector", ex); 233 } 234 } 235 } 236 return result; 237 } 238 239 @Override 240 public String toString() { 241 return "DefaultSubscriptionRegistry[" + this.destinationCache + ", " + this.subscriptionRegistry + "]"; 242 } 243 244 245 /** 246 * A cache for destinations previously resolved via 247 * {@link DefaultSubscriptionRegistry#findSubscriptionsInternal(String, Message)} 248 */ 249 private class DestinationCache { 250 251 /** Map from destination -> <sessionId, subscriptionId> for fast look-ups */ 252 private final Map<String, LinkedMultiValueMap<String, String>> accessCache = 253 new ConcurrentHashMap<String, LinkedMultiValueMap<String, String>>(DEFAULT_CACHE_LIMIT); 254 255 /** Map from destination -> <sessionId, subscriptionId> with locking */ 256 @SuppressWarnings("serial") 257 private final Map<String, LinkedMultiValueMap<String, String>> updateCache = 258 new LinkedHashMap<String, LinkedMultiValueMap<String, String>>(DEFAULT_CACHE_LIMIT, 0.75f, true) { 259 @Override 260 protected boolean removeEldestEntry(Map.Entry<String, LinkedMultiValueMap<String, String>> eldest) { 261 if (size() > getCacheLimit()) { 262 accessCache.remove(eldest.getKey()); 263 return true; 264 } 265 else { 266 return false; 267 } 268 } 269 }; 270 271 272 public LinkedMultiValueMap<String, String> getSubscriptions(String destination, Message<?> message) { 273 LinkedMultiValueMap<String, String> result = this.accessCache.get(destination); 274 if (result == null) { 275 synchronized (this.updateCache) { 276 result = new LinkedMultiValueMap<String, String>(); 277 for (SessionSubscriptionInfo info : subscriptionRegistry.getAllSubscriptions()) { 278 for (String destinationPattern : info.getDestinations()) { 279 if (getPathMatcher().match(destinationPattern, destination)) { 280 for (Subscription subscription : info.getSubscriptions(destinationPattern)) { 281 result.add(info.sessionId, subscription.getId()); 282 } 283 } 284 } 285 } 286 if (!result.isEmpty()) { 287 this.updateCache.put(destination, result.deepCopy()); 288 this.accessCache.put(destination, result); 289 } 290 } 291 } 292 return result; 293 } 294 295 public void updateAfterNewSubscription(String destination, String sessionId, String subsId) { 296 synchronized (this.updateCache) { 297 for (Map.Entry<String, LinkedMultiValueMap<String, String>> entry : this.updateCache.entrySet()) { 298 String cachedDestination = entry.getKey(); 299 if (getPathMatcher().match(destination, cachedDestination)) { 300 LinkedMultiValueMap<String, String> subs = entry.getValue(); 301 // Subscription id's may also be populated via getSubscriptions() 302 List<String> subsForSession = subs.get(sessionId); 303 if (subsForSession == null || !subsForSession.contains(subsId)) { 304 subs.add(sessionId, subsId); 305 this.accessCache.put(cachedDestination, subs.deepCopy()); 306 } 307 } 308 } 309 } 310 } 311 312 public void updateAfterRemovedSubscription(String sessionId, String subsId) { 313 synchronized (this.updateCache) { 314 Set<String> destinationsToRemove = new HashSet<String>(); 315 for (Map.Entry<String, LinkedMultiValueMap<String, String>> entry : this.updateCache.entrySet()) { 316 String destination = entry.getKey(); 317 LinkedMultiValueMap<String, String> sessionMap = entry.getValue(); 318 List<String> subscriptions = sessionMap.get(sessionId); 319 if (subscriptions != null) { 320 subscriptions.remove(subsId); 321 if (subscriptions.isEmpty()) { 322 sessionMap.remove(sessionId); 323 } 324 if (sessionMap.isEmpty()) { 325 destinationsToRemove.add(destination); 326 } 327 else { 328 this.accessCache.put(destination, sessionMap.deepCopy()); 329 } 330 } 331 } 332 for (String destination : destinationsToRemove) { 333 this.updateCache.remove(destination); 334 this.accessCache.remove(destination); 335 } 336 } 337 } 338 339 public void updateAfterRemovedSession(SessionSubscriptionInfo info) { 340 synchronized (this.updateCache) { 341 Set<String> destinationsToRemove = new HashSet<String>(); 342 for (Map.Entry<String, LinkedMultiValueMap<String, String>> entry : this.updateCache.entrySet()) { 343 String destination = entry.getKey(); 344 LinkedMultiValueMap<String, String> sessionMap = entry.getValue(); 345 if (sessionMap.remove(info.getSessionId()) != null) { 346 if (sessionMap.isEmpty()) { 347 destinationsToRemove.add(destination); 348 } 349 else { 350 this.accessCache.put(destination, sessionMap.deepCopy()); 351 } 352 } 353 } 354 for (String destination : destinationsToRemove) { 355 this.updateCache.remove(destination); 356 this.accessCache.remove(destination); 357 } 358 } 359 } 360 361 @Override 362 public String toString() { 363 return "cache[" + this.accessCache.size() + " destination(s)]"; 364 } 365 } 366 367 368 /** 369 * Provide access to session subscriptions by sessionId. 370 */ 371 private static class SessionSubscriptionRegistry { 372 373 // sessionId -> SessionSubscriptionInfo 374 private final ConcurrentMap<String, SessionSubscriptionInfo> sessions = 375 new ConcurrentHashMap<String, SessionSubscriptionInfo>(); 376 377 public SessionSubscriptionInfo getSubscriptions(String sessionId) { 378 return this.sessions.get(sessionId); 379 } 380 381 public Collection<SessionSubscriptionInfo> getAllSubscriptions() { 382 return this.sessions.values(); 383 } 384 385 public SessionSubscriptionInfo addSubscription(String sessionId, String subscriptionId, 386 String destination, Expression selectorExpression) { 387 388 SessionSubscriptionInfo info = this.sessions.get(sessionId); 389 if (info == null) { 390 info = new SessionSubscriptionInfo(sessionId); 391 SessionSubscriptionInfo value = this.sessions.putIfAbsent(sessionId, info); 392 if (value != null) { 393 info = value; 394 } 395 } 396 info.addSubscription(destination, subscriptionId, selectorExpression); 397 return info; 398 } 399 400 public SessionSubscriptionInfo removeSubscriptions(String sessionId) { 401 return this.sessions.remove(sessionId); 402 } 403 404 @Override 405 public String toString() { 406 return "registry[" + this.sessions.size() + " sessions]"; 407 } 408 } 409 410 411 /** 412 * Hold subscriptions for a session. 413 */ 414 private static class SessionSubscriptionInfo { 415 416 private final String sessionId; 417 418 // destination -> subscriptions 419 private final Map<String, Set<Subscription>> destinationLookup = 420 new ConcurrentHashMap<String, Set<Subscription>>(4); 421 422 public SessionSubscriptionInfo(String sessionId) { 423 Assert.notNull(sessionId, "'sessionId' must not be null"); 424 this.sessionId = sessionId; 425 } 426 427 public String getSessionId() { 428 return this.sessionId; 429 } 430 431 public Set<String> getDestinations() { 432 return this.destinationLookup.keySet(); 433 } 434 435 public Set<Subscription> getSubscriptions(String destination) { 436 return this.destinationLookup.get(destination); 437 } 438 439 public Subscription getSubscription(String subscriptionId) { 440 for (Map.Entry<String, Set<DefaultSubscriptionRegistry.Subscription>> destinationEntry : this.destinationLookup.entrySet()) { 441 Set<Subscription> subs = destinationEntry.getValue(); 442 if (subs != null) { 443 for (Subscription sub : subs) { 444 if (sub.getId().equalsIgnoreCase(subscriptionId)) { 445 return sub; 446 } 447 } 448 } 449 } 450 return null; 451 } 452 453 public void addSubscription(String destination, String subscriptionId, Expression selectorExpression) { 454 Set<Subscription> subs = this.destinationLookup.get(destination); 455 if (subs == null) { 456 synchronized (this.destinationLookup) { 457 subs = this.destinationLookup.get(destination); 458 if (subs == null) { 459 subs = new CopyOnWriteArraySet<Subscription>(); 460 this.destinationLookup.put(destination, subs); 461 } 462 } 463 } 464 subs.add(new Subscription(subscriptionId, selectorExpression)); 465 } 466 467 public String removeSubscription(String subscriptionId) { 468 for (Map.Entry<String, Set<DefaultSubscriptionRegistry.Subscription>> destinationEntry : this.destinationLookup.entrySet()) { 469 Set<Subscription> subs = destinationEntry.getValue(); 470 if (subs != null) { 471 for (Subscription sub : subs) { 472 if (sub.getId().equals(subscriptionId) && subs.remove(sub)) { 473 synchronized (this.destinationLookup) { 474 if (subs.isEmpty()) { 475 this.destinationLookup.remove(destinationEntry.getKey()); 476 } 477 } 478 return destinationEntry.getKey(); 479 } 480 } 481 } 482 } 483 return null; 484 } 485 486 @Override 487 public String toString() { 488 return "[sessionId=" + this.sessionId + ", subscriptions=" + this.destinationLookup + "]"; 489 } 490 } 491 492 493 private static final class Subscription { 494 495 private final String id; 496 497 private final Expression selectorExpression; 498 499 public Subscription(String id, Expression selector) { 500 Assert.notNull(id, "Subscription id must not be null"); 501 this.id = id; 502 this.selectorExpression = selector; 503 } 504 505 public String getId() { 506 return this.id; 507 } 508 509 public Expression getSelectorExpression() { 510 return this.selectorExpression; 511 } 512 513 @Override 514 public boolean equals(Object other) { 515 return (this == other || (other instanceof Subscription && this.id.equals(((Subscription) other).id))); 516 } 517 518 @Override 519 public int hashCode() { 520 return this.id.hashCode(); 521 } 522 523 @Override 524 public String toString() { 525 return "subscription(id=" + this.id + ")"; 526 } 527 } 528 529 530 private static class SimpMessageHeaderPropertyAccessor implements PropertyAccessor { 531 532 @Override 533 public Class<?>[] getSpecificTargetClasses() { 534 return new Class<?>[] {Message.class, MessageHeaders.class}; 535 } 536 537 @Override 538 public boolean canRead(EvaluationContext context, Object target, String name) { 539 return true; 540 } 541 542 @Override 543 public TypedValue read(EvaluationContext context, Object target, String name) { 544 Object value; 545 if (target instanceof Message) { 546 value = name.equals("headers") ? ((Message) target).getHeaders() : null; 547 } 548 else if (target instanceof MessageHeaders) { 549 MessageHeaders headers = (MessageHeaders) target; 550 SimpMessageHeaderAccessor accessor = 551 MessageHeaderAccessor.getAccessor(headers, SimpMessageHeaderAccessor.class); 552 Assert.state(accessor != null, "No SimpMessageHeaderAccessor"); 553 if ("destination".equalsIgnoreCase(name)) { 554 value = accessor.getDestination(); 555 } 556 else { 557 value = accessor.getFirstNativeHeader(name); 558 if (value == null) { 559 value = headers.get(name); 560 } 561 } 562 } 563 else { 564 // Should never happen... 565 throw new IllegalStateException("Expected Message or MessageHeaders."); 566 } 567 return new TypedValue(value); 568 } 569 570 @Override 571 public boolean canWrite(EvaluationContext context, Object target, String name) { 572 return false; 573 } 574 575 @Override 576 public void write(EvaluationContext context, Object target, String name, Object value) { 577 } 578 } 579 580}