001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.user; 018 019import java.net.InetAddress; 020import java.net.UnknownHostException; 021import java.util.HashMap; 022import java.util.HashSet; 023import java.util.Iterator; 024import java.util.Map; 025import java.util.Set; 026import java.util.UUID; 027import java.util.concurrent.ConcurrentHashMap; 028 029import org.springframework.context.ApplicationEvent; 030import org.springframework.context.event.SmartApplicationListener; 031import org.springframework.core.Ordered; 032import org.springframework.messaging.Message; 033import org.springframework.messaging.converter.MessageConverter; 034import org.springframework.util.Assert; 035import org.springframework.util.ObjectUtils; 036 037/** 038 * {@code SimpUserRegistry} that looks up users in a "local" user registry as 039 * well as a set of "remote" user registries. The local registry is provided as 040 * a constructor argument while remote registries are updated via broadcasts 041 * handled by {@link UserRegistryMessageHandler} which in turn notifies this 042 * registry when updates are received. 043 * 044 * @author Rossen Stoyanchev 045 * @since 4.2 046 */ 047@SuppressWarnings("serial") 048public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicationListener { 049 050 private final String id; 051 052 private final SimpUserRegistry localRegistry; 053 054 private final Map<String, UserRegistrySnapshot> remoteRegistries = new ConcurrentHashMap<String, UserRegistrySnapshot>(); 055 056 private final boolean delegateApplicationEvents; 057 058 /* Cross-server session lookup (e.g. same user connected to multiple servers) */ 059 private final SessionLookup sessionLookup = new SessionLookup(); 060 061 062 /** 063 * Create an instance wrapping the local user registry. 064 */ 065 public MultiServerUserRegistry(SimpUserRegistry localRegistry) { 066 Assert.notNull(localRegistry, "'localRegistry' is required"); 067 this.id = generateId(); 068 this.localRegistry = localRegistry; 069 this.delegateApplicationEvents = this.localRegistry instanceof SmartApplicationListener; 070 } 071 072 private static String generateId() { 073 String host; 074 try { 075 host = InetAddress.getLocalHost().getHostAddress(); 076 } 077 catch (UnknownHostException ex) { 078 host = "unknown"; 079 } 080 return host + '-' + UUID.randomUUID(); 081 } 082 083 084 @Override 085 public int getOrder() { 086 return (this.delegateApplicationEvents ? 087 ((SmartApplicationListener) this.localRegistry).getOrder() : Ordered.LOWEST_PRECEDENCE); 088 } 089 090 091 // SmartApplicationListener methods 092 093 @Override 094 public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) { 095 return (this.delegateApplicationEvents && 096 ((SmartApplicationListener) this.localRegistry).supportsEventType(eventType)); 097 } 098 099 @Override 100 public boolean supportsSourceType(Class<?> sourceType) { 101 return (this.delegateApplicationEvents && 102 ((SmartApplicationListener) this.localRegistry).supportsSourceType(sourceType)); 103 } 104 105 @Override 106 public void onApplicationEvent(ApplicationEvent event) { 107 if (this.delegateApplicationEvents) { 108 ((SmartApplicationListener) this.localRegistry).onApplicationEvent(event); 109 } 110 } 111 112 113 // SimpUserRegistry methods 114 115 @Override 116 public SimpUser getUser(String userName) { 117 // Prefer remote registries due to cross-server SessionLookup 118 for (UserRegistrySnapshot registry : this.remoteRegistries.values()) { 119 SimpUser user = registry.getUserMap().get(userName); 120 if (user != null) { 121 return user; 122 } 123 } 124 return this.localRegistry.getUser(userName); 125 } 126 127 @Override 128 public Set<SimpUser> getUsers() { 129 // Prefer remote registries due to cross-server SessionLookup 130 Set<SimpUser> result = new HashSet<SimpUser>(); 131 for (UserRegistrySnapshot registry : this.remoteRegistries.values()) { 132 result.addAll(registry.getUserMap().values()); 133 } 134 result.addAll(this.localRegistry.getUsers()); 135 return result; 136 } 137 138 @Override 139 public int getUserCount() { 140 int userCount = 0; 141 for (UserRegistrySnapshot registry : this.remoteRegistries.values()) { 142 userCount += registry.getUserMap().size(); 143 } 144 userCount += this.localRegistry.getUserCount(); 145 return userCount; 146 } 147 148 @Override 149 public Set<SimpSubscription> findSubscriptions(SimpSubscriptionMatcher matcher) { 150 Set<SimpSubscription> result = new HashSet<SimpSubscription>(); 151 for (UserRegistrySnapshot registry : this.remoteRegistries.values()) { 152 result.addAll(registry.findSubscriptions(matcher)); 153 } 154 result.addAll(this.localRegistry.findSubscriptions(matcher)); 155 return result; 156 } 157 158 159 // Internal methods for UserRegistryMessageHandler to manage broadcasts 160 161 Object getLocalRegistryDto() { 162 return new UserRegistrySnapshot(this.id, this.localRegistry); 163 } 164 165 void addRemoteRegistryDto(Message<?> message, MessageConverter converter, long expirationPeriod) { 166 UserRegistrySnapshot registry = (UserRegistrySnapshot) converter.fromMessage(message, UserRegistrySnapshot.class); 167 if (registry != null && !registry.getId().equals(this.id)) { 168 registry.init(expirationPeriod, this.sessionLookup); 169 this.remoteRegistries.put(registry.getId(), registry); 170 } 171 } 172 173 void purgeExpiredRegistries() { 174 long now = System.currentTimeMillis(); 175 Iterator<Map.Entry<String, UserRegistrySnapshot>> iterator = this.remoteRegistries.entrySet().iterator(); 176 while (iterator.hasNext()) { 177 Map.Entry<String, UserRegistrySnapshot> entry = iterator.next(); 178 if (entry.getValue().isExpired(now)) { 179 iterator.remove(); 180 } 181 } 182 } 183 184 185 @Override 186 public String toString() { 187 return "local=[" + this.localRegistry + "], remote=" + this.remoteRegistries; 188 } 189 190 191 /** 192 * Holds a copy of a SimpUserRegistry for the purpose of broadcasting to and 193 * receiving broadcasts from other application servers. 194 */ 195 private static class UserRegistrySnapshot { 196 197 private String id; 198 199 private Map<String, TransferSimpUser> users; 200 201 private long expirationTime; 202 203 /** 204 * Default constructor for JSON deserialization. 205 */ 206 @SuppressWarnings("unused") 207 public UserRegistrySnapshot() { 208 } 209 210 /** 211 * Constructor to create DTO from a local user registry. 212 */ 213 public UserRegistrySnapshot(String id, SimpUserRegistry registry) { 214 this.id = id; 215 Set<SimpUser> users = registry.getUsers(); 216 this.users = new HashMap<String, TransferSimpUser>(users.size()); 217 for (SimpUser user : users) { 218 this.users.put(user.getName(), new TransferSimpUser(user)); 219 } 220 } 221 222 public void setId(String id) { 223 this.id = id; 224 } 225 226 public String getId() { 227 return this.id; 228 } 229 230 public void setUserMap(Map<String, TransferSimpUser> users) { 231 this.users = users; 232 } 233 234 public Map<String, TransferSimpUser> getUserMap() { 235 return this.users; 236 } 237 238 public boolean isExpired(long now) { 239 return (now > this.expirationTime); 240 } 241 242 public void init(long expirationPeriod, SessionLookup sessionLookup) { 243 this.expirationTime = System.currentTimeMillis() + expirationPeriod; 244 for (TransferSimpUser user : this.users.values()) { 245 user.afterDeserialization(sessionLookup); 246 } 247 } 248 249 public Set<SimpSubscription> findSubscriptions(SimpSubscriptionMatcher matcher) { 250 Set<SimpSubscription> result = new HashSet<SimpSubscription>(); 251 for (TransferSimpUser user : this.users.values()) { 252 for (TransferSimpSession session : user.sessions) { 253 for (SimpSubscription subscription : session.subscriptions) { 254 if (matcher.match(subscription)) { 255 result.add(subscription); 256 } 257 } 258 } 259 } 260 return result; 261 } 262 263 @Override 264 public String toString() { 265 return "id=" + this.id + ", users=" + this.users; 266 } 267 } 268 269 270 /** 271 * SimpUser that can be (de)serialized and broadcast to other servers. 272 */ 273 private static class TransferSimpUser implements SimpUser { 274 275 private String name; 276 277 // User sessions from "this" registry only (i.e. one server) 278 private Set<TransferSimpSession> sessions; 279 280 // Cross-server session lookup (e.g. user connected to multiple servers) 281 private SessionLookup sessionLookup; 282 283 /** 284 * Default constructor for JSON deserialization. 285 */ 286 @SuppressWarnings("unused") 287 public TransferSimpUser() { 288 this.sessions = new HashSet<TransferSimpSession>(1); 289 } 290 291 /** 292 * Constructor to create user from a local user. 293 */ 294 public TransferSimpUser(SimpUser user) { 295 this.name = user.getName(); 296 Set<SimpSession> sessions = user.getSessions(); 297 this.sessions = new HashSet<TransferSimpSession>(sessions.size()); 298 for (SimpSession session : sessions) { 299 this.sessions.add(new TransferSimpSession(session)); 300 } 301 } 302 303 public void setName(String name) { 304 this.name = name; 305 } 306 307 @Override 308 public String getName() { 309 return this.name; 310 } 311 312 @Override 313 public boolean hasSessions() { 314 if (this.sessionLookup != null) { 315 return !this.sessionLookup.findSessions(getName()).isEmpty(); 316 } 317 return !this.sessions.isEmpty(); 318 } 319 320 @Override 321 public SimpSession getSession(String sessionId) { 322 if (this.sessionLookup != null) { 323 return this.sessionLookup.findSessions(getName()).get(sessionId); 324 } 325 for (TransferSimpSession session : this.sessions) { 326 if (session.getId().equals(sessionId)) { 327 return session; 328 } 329 } 330 return null; 331 } 332 333 public void setSessions(Set<TransferSimpSession> sessions) { 334 this.sessions.addAll(sessions); 335 } 336 337 @Override 338 public Set<SimpSession> getSessions() { 339 if (this.sessionLookup != null) { 340 Map<String, SimpSession> sessions = this.sessionLookup.findSessions(getName()); 341 return new HashSet<SimpSession>(sessions.values()); 342 } 343 return new HashSet<SimpSession>(this.sessions); 344 } 345 346 private void afterDeserialization(SessionLookup sessionLookup) { 347 this.sessionLookup = sessionLookup; 348 for (TransferSimpSession session : this.sessions) { 349 session.setUser(this); 350 session.afterDeserialization(); 351 } 352 } 353 354 private void addSessions(Map<String, SimpSession> map) { 355 for (SimpSession session : this.sessions) { 356 map.put(session.getId(), session); 357 } 358 } 359 360 361 @Override 362 public boolean equals(Object other) { 363 return (this == other || (other instanceof SimpUser && this.name.equals(((SimpUser) other).getName()))); 364 } 365 366 @Override 367 public int hashCode() { 368 return this.name.hashCode(); 369 } 370 371 @Override 372 public String toString() { 373 return "name=" + this.name + ", sessions=" + this.sessions; 374 } 375 } 376 377 378 /** 379 * SimpSession that can be (de)serialized and broadcast to other servers. 380 */ 381 private static class TransferSimpSession implements SimpSession { 382 383 private String id; 384 385 private TransferSimpUser user; 386 387 private final Set<TransferSimpSubscription> subscriptions; 388 389 /** 390 * Default constructor for JSON deserialization. 391 */ 392 @SuppressWarnings("unused") 393 public TransferSimpSession() { 394 this.subscriptions = new HashSet<TransferSimpSubscription>(4); 395 } 396 397 /** 398 * Constructor to create DTO from the local user session. 399 */ 400 public TransferSimpSession(SimpSession session) { 401 this.id = session.getId(); 402 Set<SimpSubscription> subscriptions = session.getSubscriptions(); 403 this.subscriptions = new HashSet<TransferSimpSubscription>(subscriptions.size()); 404 for (SimpSubscription subscription : subscriptions) { 405 this.subscriptions.add(new TransferSimpSubscription(subscription)); 406 } 407 } 408 409 public void setId(String id) { 410 this.id = id; 411 } 412 413 @Override 414 public String getId() { 415 return this.id; 416 } 417 418 public void setUser(TransferSimpUser user) { 419 this.user = user; 420 } 421 422 @Override 423 public TransferSimpUser getUser() { 424 return this.user; 425 } 426 427 public void setSubscriptions(Set<TransferSimpSubscription> subscriptions) { 428 this.subscriptions.addAll(subscriptions); 429 } 430 431 @Override 432 public Set<SimpSubscription> getSubscriptions() { 433 return new HashSet<SimpSubscription>(this.subscriptions); 434 } 435 436 private void afterDeserialization() { 437 for (TransferSimpSubscription subscription : this.subscriptions) { 438 subscription.setSession(this); 439 } 440 } 441 442 @Override 443 public boolean equals(Object other) { 444 return (this == other || (other instanceof SimpSession && this.id.equals(((SimpSession) other).getId()))); 445 } 446 447 @Override 448 public int hashCode() { 449 return this.id.hashCode(); 450 } 451 452 @Override 453 public String toString() { 454 return "id=" + this.id + ", subscriptions=" + this.subscriptions; 455 } 456 } 457 458 459 /** 460 * SimpSubscription that can be (de)serialized and broadcast to other servers. 461 */ 462 private static class TransferSimpSubscription implements SimpSubscription { 463 464 private String id; 465 466 private TransferSimpSession session; 467 468 private String destination; 469 470 /** 471 * Default constructor for JSON deserialization. 472 */ 473 @SuppressWarnings("unused") 474 public TransferSimpSubscription() { 475 } 476 477 /** 478 * Constructor to create DTO from a local user subscription. 479 */ 480 public TransferSimpSubscription(SimpSubscription subscription) { 481 this.id = subscription.getId(); 482 this.destination = subscription.getDestination(); 483 } 484 485 public void setId(String id) { 486 this.id = id; 487 } 488 489 @Override 490 public String getId() { 491 return this.id; 492 } 493 494 public void setSession(TransferSimpSession session) { 495 this.session = session; 496 } 497 498 @Override 499 public TransferSimpSession getSession() { 500 return this.session; 501 } 502 503 public void setDestination(String destination) { 504 this.destination = destination; 505 } 506 507 @Override 508 public String getDestination() { 509 return this.destination; 510 } 511 512 @Override 513 public boolean equals(Object other) { 514 if (this == other) { 515 return true; 516 } 517 if (!(other instanceof SimpSubscription)) { 518 return false; 519 } 520 SimpSubscription otherSubscription = (SimpSubscription) other; 521 return (getId().equals(otherSubscription.getId()) && 522 ObjectUtils.nullSafeEquals(getSession(), otherSubscription.getSession())); 523 } 524 525 @Override 526 public int hashCode() { 527 return getId().hashCode() * 31 + ObjectUtils.nullSafeHashCode(getSession()); 528 } 529 530 @Override 531 public String toString() { 532 return "destination=" + this.destination; 533 } 534 } 535 536 537 /** 538 * Helper class to find user sessions across all servers. 539 */ 540 private class SessionLookup { 541 542 public Map<String, SimpSession> findSessions(String userName) { 543 Map<String, SimpSession> map = new HashMap<String, SimpSession>(1); 544 SimpUser user = localRegistry.getUser(userName); 545 if (user != null) { 546 for (SimpSession session : user.getSessions()) { 547 map.put(session.getId(), session); 548 } 549 } 550 for (UserRegistrySnapshot registry : remoteRegistries.values()) { 551 TransferSimpUser transferUser = registry.getUserMap().get(userName); 552 if (transferUser != null) { 553 transferUser.addSessions(map); 554 } 555 } 556 return map; 557 } 558 } 559 560}