001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.user; 018 019import java.net.InetAddress; 020import java.net.UnknownHostException; 021import java.util.Collections; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.Map; 025import java.util.Set; 026import java.util.UUID; 027import java.util.concurrent.ConcurrentHashMap; 028 029import org.springframework.context.ApplicationEvent; 030import org.springframework.context.event.SmartApplicationListener; 031import org.springframework.core.Ordered; 032import org.springframework.lang.Nullable; 033import org.springframework.messaging.Message; 034import org.springframework.messaging.converter.MessageConverter; 035import org.springframework.util.Assert; 036import org.springframework.util.ObjectUtils; 037 038/** 039 * {@code SimpUserRegistry} that looks up users in a "local" user registry as 040 * well as a set of "remote" user registries. The local registry is provided as 041 * a constructor argument while remote registries are updated via broadcasts 042 * handled by {@link UserRegistryMessageHandler} which in turn notifies this 043 * registry when updates are received. 044 * 045 * @author Rossen Stoyanchev 046 * @since 4.2 047 */ 048@SuppressWarnings("serial") 049public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicationListener { 050 051 private final String id; 052 053 private final SimpUserRegistry localRegistry; 054 055 private final Map<String, UserRegistrySnapshot> remoteRegistries = new ConcurrentHashMap<>(); 056 057 private final boolean delegateApplicationEvents; 058 059 /* Cross-server session lookup (e.g. same user connected to multiple servers) */ 060 private final SessionLookup sessionLookup = new SessionLookup(); 061 062 063 /** 064 * Create an instance wrapping the local user registry. 065 */ 066 public MultiServerUserRegistry(SimpUserRegistry localRegistry) { 067 Assert.notNull(localRegistry, "'localRegistry' is required"); 068 this.id = generateId(); 069 this.localRegistry = localRegistry; 070 this.delegateApplicationEvents = this.localRegistry instanceof SmartApplicationListener; 071 } 072 073 private static String generateId() { 074 String host; 075 try { 076 host = InetAddress.getLocalHost().getHostAddress(); 077 } 078 catch (UnknownHostException ex) { 079 host = "unknown"; 080 } 081 return host + '-' + UUID.randomUUID(); 082 } 083 084 085 @Override 086 public int getOrder() { 087 return (this.delegateApplicationEvents ? 088 ((SmartApplicationListener) this.localRegistry).getOrder() : Ordered.LOWEST_PRECEDENCE); 089 } 090 091 092 // SmartApplicationListener methods 093 094 @Override 095 public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) { 096 return (this.delegateApplicationEvents && 097 ((SmartApplicationListener) this.localRegistry).supportsEventType(eventType)); 098 } 099 100 @Override 101 public boolean supportsSourceType(@Nullable Class<?> sourceType) { 102 return (this.delegateApplicationEvents && 103 ((SmartApplicationListener) this.localRegistry).supportsSourceType(sourceType)); 104 } 105 106 @Override 107 public void onApplicationEvent(ApplicationEvent event) { 108 if (this.delegateApplicationEvents) { 109 ((SmartApplicationListener) this.localRegistry).onApplicationEvent(event); 110 } 111 } 112 113 114 // SimpUserRegistry methods 115 116 @Override 117 @Nullable 118 public SimpUser getUser(String userName) { 119 // Prefer remote registries due to cross-server SessionLookup 120 for (UserRegistrySnapshot registry : this.remoteRegistries.values()) { 121 SimpUser user = registry.getUserMap().get(userName); 122 if (user != null) { 123 return user; 124 } 125 } 126 return this.localRegistry.getUser(userName); 127 } 128 129 @Override 130 public Set<SimpUser> getUsers() { 131 // Prefer remote registries due to cross-server SessionLookup 132 Set<SimpUser> result = new HashSet<>(); 133 for (UserRegistrySnapshot registry : this.remoteRegistries.values()) { 134 result.addAll(registry.getUserMap().values()); 135 } 136 result.addAll(this.localRegistry.getUsers()); 137 return result; 138 } 139 140 @Override 141 public int getUserCount() { 142 int userCount = 0; 143 for (UserRegistrySnapshot registry : this.remoteRegistries.values()) { 144 userCount += registry.getUserMap().size(); 145 } 146 userCount += this.localRegistry.getUserCount(); 147 return userCount; 148 } 149 150 @Override 151 public Set<SimpSubscription> findSubscriptions(SimpSubscriptionMatcher matcher) { 152 Set<SimpSubscription> result = new HashSet<>(); 153 for (UserRegistrySnapshot registry : this.remoteRegistries.values()) { 154 result.addAll(registry.findSubscriptions(matcher)); 155 } 156 result.addAll(this.localRegistry.findSubscriptions(matcher)); 157 return result; 158 } 159 160 161 // Internal methods for UserRegistryMessageHandler to manage broadcasts 162 163 Object getLocalRegistryDto() { 164 return new UserRegistrySnapshot(this.id, this.localRegistry); 165 } 166 167 void addRemoteRegistryDto(Message<?> message, MessageConverter converter, long expirationPeriod) { 168 UserRegistrySnapshot registry = (UserRegistrySnapshot) converter.fromMessage(message, UserRegistrySnapshot.class); 169 if (registry != null && !registry.getId().equals(this.id)) { 170 registry.init(expirationPeriod, this.sessionLookup); 171 this.remoteRegistries.put(registry.getId(), registry); 172 } 173 } 174 175 void purgeExpiredRegistries() { 176 long now = System.currentTimeMillis(); 177 this.remoteRegistries.entrySet().removeIf(entry -> entry.getValue().isExpired(now)); 178 } 179 180 181 @Override 182 public String toString() { 183 return "local=[" + this.localRegistry + "], remote=" + this.remoteRegistries; 184 } 185 186 187 /** 188 * Holds a copy of a SimpUserRegistry for the purpose of broadcasting to and 189 * receiving broadcasts from other application servers. 190 */ 191 private static class UserRegistrySnapshot { 192 193 private String id = ""; 194 195 private Map<String, TransferSimpUser> users = Collections.emptyMap(); 196 197 private long expirationTime; 198 199 /** 200 * Default constructor for JSON deserialization. 201 */ 202 @SuppressWarnings("unused") 203 public UserRegistrySnapshot() { 204 } 205 206 /** 207 * Constructor to create DTO from a local user registry. 208 */ 209 public UserRegistrySnapshot(String id, SimpUserRegistry registry) { 210 this.id = id; 211 Set<SimpUser> users = registry.getUsers(); 212 this.users = new HashMap<>(users.size()); 213 for (SimpUser user : users) { 214 this.users.put(user.getName(), new TransferSimpUser(user)); 215 } 216 } 217 218 @SuppressWarnings("unused") 219 public void setId(String id) { 220 this.id = id; 221 } 222 223 public String getId() { 224 return this.id; 225 } 226 227 @SuppressWarnings("unused") 228 public void setUserMap(Map<String, TransferSimpUser> users) { 229 this.users = users; 230 } 231 232 public Map<String, TransferSimpUser> getUserMap() { 233 return this.users; 234 } 235 236 public boolean isExpired(long now) { 237 return (now > this.expirationTime); 238 } 239 240 public void init(long expirationPeriod, SessionLookup sessionLookup) { 241 this.expirationTime = System.currentTimeMillis() + expirationPeriod; 242 for (TransferSimpUser user : this.users.values()) { 243 user.afterDeserialization(sessionLookup); 244 } 245 } 246 247 public Set<SimpSubscription> findSubscriptions(SimpSubscriptionMatcher matcher) { 248 Set<SimpSubscription> result = new HashSet<>(); 249 for (TransferSimpUser user : this.users.values()) { 250 for (TransferSimpSession session : user.sessions) { 251 for (SimpSubscription subscription : session.subscriptions) { 252 if (matcher.match(subscription)) { 253 result.add(subscription); 254 } 255 } 256 } 257 } 258 return result; 259 } 260 261 @Override 262 public String toString() { 263 return "id=" + this.id + ", users=" + this.users; 264 } 265 } 266 267 268 /** 269 * SimpUser that can be (de)serialized and broadcast to other servers. 270 */ 271 private static class TransferSimpUser implements SimpUser { 272 273 private String name = ""; 274 275 // User sessions from "this" registry only (i.e. one server) 276 private Set<TransferSimpSession> sessions; 277 278 // Cross-server session lookup (e.g. user connected to multiple servers) 279 @Nullable 280 private SessionLookup sessionLookup; 281 282 /** 283 * Default constructor for JSON deserialization. 284 */ 285 @SuppressWarnings("unused") 286 public TransferSimpUser() { 287 this.sessions = new HashSet<>(1); 288 } 289 290 /** 291 * Constructor to create user from a local user. 292 */ 293 public TransferSimpUser(SimpUser user) { 294 this.name = user.getName(); 295 Set<SimpSession> sessions = user.getSessions(); 296 this.sessions = new HashSet<>(sessions.size()); 297 for (SimpSession session : sessions) { 298 this.sessions.add(new TransferSimpSession(session)); 299 } 300 } 301 302 @SuppressWarnings("unused") 303 public void setName(String name) { 304 this.name = name; 305 } 306 307 @Override 308 public String getName() { 309 return this.name; 310 } 311 312 @Override 313 public boolean hasSessions() { 314 if (this.sessionLookup != null) { 315 return !this.sessionLookup.findSessions(getName()).isEmpty(); 316 } 317 return !this.sessions.isEmpty(); 318 } 319 320 @Override 321 @Nullable 322 public SimpSession getSession(String sessionId) { 323 if (this.sessionLookup != null) { 324 return this.sessionLookup.findSessions(getName()).get(sessionId); 325 } 326 for (TransferSimpSession session : this.sessions) { 327 if (session.getId().equals(sessionId)) { 328 return session; 329 } 330 } 331 return null; 332 } 333 334 @SuppressWarnings("unused") 335 public void setSessions(Set<TransferSimpSession> sessions) { 336 this.sessions.addAll(sessions); 337 } 338 339 @Override 340 public Set<SimpSession> getSessions() { 341 if (this.sessionLookup != null) { 342 Map<String, SimpSession> sessions = this.sessionLookup.findSessions(getName()); 343 return new HashSet<>(sessions.values()); 344 } 345 return new HashSet<>(this.sessions); 346 } 347 348 private void afterDeserialization(SessionLookup sessionLookup) { 349 this.sessionLookup = sessionLookup; 350 for (TransferSimpSession session : this.sessions) { 351 session.setUser(this); 352 session.afterDeserialization(); 353 } 354 } 355 356 private void addSessions(Map<String, SimpSession> map) { 357 for (SimpSession session : this.sessions) { 358 map.put(session.getId(), session); 359 } 360 } 361 362 363 @Override 364 public boolean equals(@Nullable Object other) { 365 return (this == other || (other instanceof SimpUser && this.name.equals(((SimpUser) other).getName()))); 366 } 367 368 @Override 369 public int hashCode() { 370 return this.name.hashCode(); 371 } 372 373 @Override 374 public String toString() { 375 return "name=" + this.name + ", sessions=" + this.sessions; 376 } 377 } 378 379 380 /** 381 * SimpSession that can be (de)serialized and broadcast to other servers. 382 */ 383 private static class TransferSimpSession implements SimpSession { 384 385 private String id; 386 387 private TransferSimpUser user; 388 389 private final Set<TransferSimpSubscription> subscriptions; 390 391 /** 392 * Default constructor for JSON deserialization. 393 */ 394 @SuppressWarnings("unused") 395 public TransferSimpSession() { 396 this.id = ""; 397 this.user = new TransferSimpUser(); 398 this.subscriptions = new HashSet<>(4); 399 } 400 401 /** 402 * Constructor to create DTO from the local user session. 403 */ 404 public TransferSimpSession(SimpSession session) { 405 this.id = session.getId(); 406 this.user = new TransferSimpUser(); 407 Set<SimpSubscription> subscriptions = session.getSubscriptions(); 408 this.subscriptions = new HashSet<>(subscriptions.size()); 409 for (SimpSubscription subscription : subscriptions) { 410 this.subscriptions.add(new TransferSimpSubscription(subscription)); 411 } 412 } 413 414 @SuppressWarnings("unused") 415 public void setId(String id) { 416 this.id = id; 417 } 418 419 @Override 420 public String getId() { 421 return this.id; 422 } 423 424 public void setUser(TransferSimpUser user) { 425 this.user = user; 426 } 427 428 @Override 429 public TransferSimpUser getUser() { 430 return this.user; 431 } 432 433 @SuppressWarnings("unused") 434 public void setSubscriptions(Set<TransferSimpSubscription> subscriptions) { 435 this.subscriptions.addAll(subscriptions); 436 } 437 438 @Override 439 public Set<SimpSubscription> getSubscriptions() { 440 return new HashSet<>(this.subscriptions); 441 } 442 443 private void afterDeserialization() { 444 for (TransferSimpSubscription subscription : this.subscriptions) { 445 subscription.setSession(this); 446 } 447 } 448 449 @Override 450 public boolean equals(@Nullable Object other) { 451 return (this == other || (other instanceof SimpSession && getId().equals(((SimpSession) other).getId()))); 452 } 453 454 @Override 455 public int hashCode() { 456 return getId().hashCode(); 457 } 458 459 @Override 460 public String toString() { 461 return "id=" + this.id + ", subscriptions=" + this.subscriptions; 462 } 463 } 464 465 466 /** 467 * SimpSubscription that can be (de)serialized and broadcast to other servers. 468 */ 469 private static class TransferSimpSubscription implements SimpSubscription { 470 471 private String id; 472 473 private TransferSimpSession session; 474 475 private String destination; 476 477 /** 478 * Default constructor for JSON deserialization. 479 */ 480 @SuppressWarnings("unused") 481 public TransferSimpSubscription() { 482 this.id = ""; 483 this.session = new TransferSimpSession(); 484 this.destination = ""; 485 } 486 487 /** 488 * Constructor to create DTO from a local user subscription. 489 */ 490 public TransferSimpSubscription(SimpSubscription subscription) { 491 this.id = subscription.getId(); 492 this.session = new TransferSimpSession(); 493 this.destination = subscription.getDestination(); 494 } 495 496 @SuppressWarnings("unused") 497 public void setId(String id) { 498 this.id = id; 499 } 500 501 @Override 502 public String getId() { 503 return this.id; 504 } 505 506 public void setSession(TransferSimpSession session) { 507 this.session = session; 508 } 509 510 @Override 511 public TransferSimpSession getSession() { 512 return this.session; 513 } 514 515 @SuppressWarnings("unused") 516 public void setDestination(String destination) { 517 this.destination = destination; 518 } 519 520 @Override 521 public String getDestination() { 522 return this.destination; 523 } 524 525 @Override 526 public boolean equals(@Nullable Object other) { 527 if (this == other) { 528 return true; 529 } 530 if (!(other instanceof SimpSubscription)) { 531 return false; 532 } 533 SimpSubscription otherSubscription = (SimpSubscription) other; 534 return (getId().equals(otherSubscription.getId()) && 535 ObjectUtils.nullSafeEquals(getSession(), otherSubscription.getSession())); 536 } 537 538 @Override 539 public int hashCode() { 540 return getId().hashCode() * 31 + ObjectUtils.nullSafeHashCode(getSession()); 541 } 542 543 @Override 544 public String toString() { 545 return "destination=" + this.destination; 546 } 547 } 548 549 550 /** 551 * Helper class to find user sessions across all servers. 552 */ 553 private class SessionLookup { 554 555 public Map<String, SimpSession> findSessions(String userName) { 556 Map<String, SimpSession> map = new HashMap<>(4); 557 SimpUser user = localRegistry.getUser(userName); 558 if (user != null) { 559 for (SimpSession session : user.getSessions()) { 560 map.put(session.getId(), session); 561 } 562 } 563 for (UserRegistrySnapshot registry : remoteRegistries.values()) { 564 TransferSimpUser transferUser = registry.getUserMap().get(userName); 565 if (transferUser != null) { 566 transferUser.addSessions(map); 567 } 568 } 569 return map; 570 } 571 } 572 573}