001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.user;
018
019import java.net.InetAddress;
020import java.net.UnknownHostException;
021import java.util.Collections;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.Map;
025import java.util.Set;
026import java.util.UUID;
027import java.util.concurrent.ConcurrentHashMap;
028
029import org.springframework.context.ApplicationEvent;
030import org.springframework.context.event.SmartApplicationListener;
031import org.springframework.core.Ordered;
032import org.springframework.lang.Nullable;
033import org.springframework.messaging.Message;
034import org.springframework.messaging.converter.MessageConverter;
035import org.springframework.util.Assert;
036import org.springframework.util.ObjectUtils;
037
038/**
039 * {@code SimpUserRegistry} that looks up users in a "local" user registry as
040 * well as a set of "remote" user registries. The local registry is provided as
041 * a constructor argument while remote registries are updated via broadcasts
042 * handled by {@link UserRegistryMessageHandler} which in turn notifies this
043 * registry when updates are received.
044 *
045 * @author Rossen Stoyanchev
046 * @since 4.2
047 */
048@SuppressWarnings("serial")
049public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicationListener {
050
051        private final String id;
052
053        private final SimpUserRegistry localRegistry;
054
055        private final Map<String, UserRegistrySnapshot> remoteRegistries = new ConcurrentHashMap<>();
056
057        private final boolean delegateApplicationEvents;
058
059        /* Cross-server session lookup (e.g. same user connected to multiple servers) */
060        private final SessionLookup sessionLookup = new SessionLookup();
061
062
063        /**
064         * Create an instance wrapping the local user registry.
065         */
066        public MultiServerUserRegistry(SimpUserRegistry localRegistry) {
067                Assert.notNull(localRegistry, "'localRegistry' is required");
068                this.id = generateId();
069                this.localRegistry = localRegistry;
070                this.delegateApplicationEvents = this.localRegistry instanceof SmartApplicationListener;
071        }
072
073        private static String generateId() {
074                String host;
075                try {
076                        host = InetAddress.getLocalHost().getHostAddress();
077                }
078                catch (UnknownHostException ex) {
079                        host = "unknown";
080                }
081                return host + '-' + UUID.randomUUID();
082        }
083
084
085        @Override
086        public int getOrder() {
087                return (this.delegateApplicationEvents ?
088                                ((SmartApplicationListener) this.localRegistry).getOrder() : Ordered.LOWEST_PRECEDENCE);
089        }
090
091
092        // SmartApplicationListener methods
093
094        @Override
095        public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
096                return (this.delegateApplicationEvents &&
097                                ((SmartApplicationListener) this.localRegistry).supportsEventType(eventType));
098        }
099
100        @Override
101        public boolean supportsSourceType(@Nullable Class<?> sourceType) {
102                return (this.delegateApplicationEvents &&
103                                ((SmartApplicationListener) this.localRegistry).supportsSourceType(sourceType));
104        }
105
106        @Override
107        public void onApplicationEvent(ApplicationEvent event) {
108                if (this.delegateApplicationEvents) {
109                        ((SmartApplicationListener) this.localRegistry).onApplicationEvent(event);
110                }
111        }
112
113
114        // SimpUserRegistry methods
115
116        @Override
117        @Nullable
118        public SimpUser getUser(String userName) {
119                // Prefer remote registries due to cross-server SessionLookup
120                for (UserRegistrySnapshot registry : this.remoteRegistries.values()) {
121                        SimpUser user = registry.getUserMap().get(userName);
122                        if (user != null) {
123                                return user;
124                        }
125                }
126                return this.localRegistry.getUser(userName);
127        }
128
129        @Override
130        public Set<SimpUser> getUsers() {
131                // Prefer remote registries due to cross-server SessionLookup
132                Set<SimpUser> result = new HashSet<>();
133                for (UserRegistrySnapshot registry : this.remoteRegistries.values()) {
134                        result.addAll(registry.getUserMap().values());
135                }
136                result.addAll(this.localRegistry.getUsers());
137                return result;
138        }
139
140        @Override
141        public int getUserCount() {
142                int userCount = 0;
143                for (UserRegistrySnapshot registry : this.remoteRegistries.values()) {
144                        userCount += registry.getUserMap().size();
145                }
146                userCount += this.localRegistry.getUserCount();
147                return userCount;
148        }
149
150        @Override
151        public Set<SimpSubscription> findSubscriptions(SimpSubscriptionMatcher matcher) {
152                Set<SimpSubscription> result = new HashSet<>();
153                for (UserRegistrySnapshot registry : this.remoteRegistries.values()) {
154                        result.addAll(registry.findSubscriptions(matcher));
155                }
156                result.addAll(this.localRegistry.findSubscriptions(matcher));
157                return result;
158        }
159
160
161        // Internal methods for UserRegistryMessageHandler to manage broadcasts
162
163        Object getLocalRegistryDto() {
164                return new UserRegistrySnapshot(this.id, this.localRegistry);
165        }
166
167        void addRemoteRegistryDto(Message<?> message, MessageConverter converter, long expirationPeriod) {
168                UserRegistrySnapshot registry = (UserRegistrySnapshot) converter.fromMessage(message, UserRegistrySnapshot.class);
169                if (registry != null && !registry.getId().equals(this.id)) {
170                        registry.init(expirationPeriod, this.sessionLookup);
171                        this.remoteRegistries.put(registry.getId(), registry);
172                }
173        }
174
175        void purgeExpiredRegistries() {
176                long now = System.currentTimeMillis();
177                this.remoteRegistries.entrySet().removeIf(entry -> entry.getValue().isExpired(now));
178        }
179
180
181        @Override
182        public String toString() {
183                return "local=[" + this.localRegistry + "], remote=" + this.remoteRegistries;
184        }
185
186
187        /**
188         * Holds a copy of a SimpUserRegistry for the purpose of broadcasting to and
189         * receiving broadcasts from other application servers.
190         */
191        private static class UserRegistrySnapshot {
192
193                private String id = "";
194
195                private Map<String, TransferSimpUser> users = Collections.emptyMap();
196
197                private long expirationTime;
198
199                /**
200                 * Default constructor for JSON deserialization.
201                 */
202                @SuppressWarnings("unused")
203                public UserRegistrySnapshot() {
204                }
205
206                /**
207                 * Constructor to create DTO from a local user registry.
208                 */
209                public UserRegistrySnapshot(String id, SimpUserRegistry registry) {
210                        this.id = id;
211                        Set<SimpUser> users = registry.getUsers();
212                        this.users = new HashMap<>(users.size());
213                        for (SimpUser user : users) {
214                                this.users.put(user.getName(), new TransferSimpUser(user));
215                        }
216                }
217
218                @SuppressWarnings("unused")
219                public void setId(String id) {
220                        this.id = id;
221                }
222
223                public String getId() {
224                        return this.id;
225                }
226
227                @SuppressWarnings("unused")
228                public void setUserMap(Map<String, TransferSimpUser> users) {
229                        this.users = users;
230                }
231
232                public Map<String, TransferSimpUser> getUserMap() {
233                        return this.users;
234                }
235
236                public boolean isExpired(long now) {
237                        return (now > this.expirationTime);
238                }
239
240                public void init(long expirationPeriod, SessionLookup sessionLookup) {
241                        this.expirationTime = System.currentTimeMillis() + expirationPeriod;
242                        for (TransferSimpUser user : this.users.values()) {
243                                user.afterDeserialization(sessionLookup);
244                        }
245                }
246
247                public Set<SimpSubscription> findSubscriptions(SimpSubscriptionMatcher matcher) {
248                        Set<SimpSubscription> result = new HashSet<>();
249                        for (TransferSimpUser user : this.users.values()) {
250                                for (TransferSimpSession session : user.sessions) {
251                                        for (SimpSubscription subscription : session.subscriptions) {
252                                                if (matcher.match(subscription)) {
253                                                        result.add(subscription);
254                                                }
255                                        }
256                                }
257                        }
258                        return result;
259                }
260
261                @Override
262                public String toString() {
263                        return "id=" + this.id + ", users=" + this.users;
264                }
265        }
266
267
268        /**
269         * SimpUser that can be (de)serialized and broadcast to other servers.
270         */
271        private static class TransferSimpUser implements SimpUser {
272
273                private String name = "";
274
275                // User sessions from "this" registry only (i.e. one server)
276                private Set<TransferSimpSession> sessions;
277
278                // Cross-server session lookup (e.g. user connected to multiple servers)
279                @Nullable
280                private SessionLookup sessionLookup;
281
282                /**
283                 * Default constructor for JSON deserialization.
284                 */
285                @SuppressWarnings("unused")
286                public TransferSimpUser() {
287                        this.sessions = new HashSet<>(1);
288                }
289
290                /**
291                 * Constructor to create user from a local user.
292                 */
293                public TransferSimpUser(SimpUser user) {
294                        this.name = user.getName();
295                        Set<SimpSession> sessions = user.getSessions();
296                        this.sessions = new HashSet<>(sessions.size());
297                        for (SimpSession session : sessions) {
298                                this.sessions.add(new TransferSimpSession(session));
299                        }
300                }
301
302                @SuppressWarnings("unused")
303                public void setName(String name) {
304                        this.name = name;
305                }
306
307                @Override
308                public String getName() {
309                        return this.name;
310                }
311
312                @Override
313                public boolean hasSessions() {
314                        if (this.sessionLookup != null) {
315                                return !this.sessionLookup.findSessions(getName()).isEmpty();
316                        }
317                        return !this.sessions.isEmpty();
318                }
319
320                @Override
321                @Nullable
322                public SimpSession getSession(String sessionId) {
323                        if (this.sessionLookup != null) {
324                                return this.sessionLookup.findSessions(getName()).get(sessionId);
325                        }
326                        for (TransferSimpSession session : this.sessions) {
327                                if (session.getId().equals(sessionId)) {
328                                        return session;
329                                }
330                        }
331                        return null;
332                }
333
334                @SuppressWarnings("unused")
335                public void setSessions(Set<TransferSimpSession> sessions) {
336                        this.sessions.addAll(sessions);
337                }
338
339                @Override
340                public Set<SimpSession> getSessions() {
341                        if (this.sessionLookup != null) {
342                                Map<String, SimpSession> sessions = this.sessionLookup.findSessions(getName());
343                                return new HashSet<>(sessions.values());
344                        }
345                        return new HashSet<>(this.sessions);
346                }
347
348                private void afterDeserialization(SessionLookup sessionLookup) {
349                        this.sessionLookup = sessionLookup;
350                        for (TransferSimpSession session : this.sessions) {
351                                session.setUser(this);
352                                session.afterDeserialization();
353                        }
354                }
355
356                private void addSessions(Map<String, SimpSession> map) {
357                        for (SimpSession session : this.sessions) {
358                                map.put(session.getId(), session);
359                        }
360                }
361
362
363                @Override
364                public boolean equals(@Nullable Object other) {
365                        return (this == other || (other instanceof SimpUser && this.name.equals(((SimpUser) other).getName())));
366                }
367
368                @Override
369                public int hashCode() {
370                        return this.name.hashCode();
371                }
372
373                @Override
374                public String toString() {
375                        return "name=" + this.name + ", sessions=" + this.sessions;
376                }
377        }
378
379
380        /**
381         * SimpSession that can be (de)serialized and broadcast to other servers.
382         */
383        private static class TransferSimpSession implements SimpSession {
384
385                private String id;
386
387                private TransferSimpUser user;
388
389                private final Set<TransferSimpSubscription> subscriptions;
390
391                /**
392                 * Default constructor for JSON deserialization.
393                 */
394                @SuppressWarnings("unused")
395                public TransferSimpSession() {
396                        this.id = "";
397                        this.user = new TransferSimpUser();
398                        this.subscriptions = new HashSet<>(4);
399                }
400
401                /**
402                 * Constructor to create DTO from the local user session.
403                 */
404                public TransferSimpSession(SimpSession session) {
405                        this.id = session.getId();
406                        this.user = new TransferSimpUser();
407                        Set<SimpSubscription> subscriptions = session.getSubscriptions();
408                        this.subscriptions = new HashSet<>(subscriptions.size());
409                        for (SimpSubscription subscription : subscriptions) {
410                                this.subscriptions.add(new TransferSimpSubscription(subscription));
411                        }
412                }
413
414                @SuppressWarnings("unused")
415                public void setId(String id) {
416                        this.id = id;
417                }
418
419                @Override
420                public String getId() {
421                        return this.id;
422                }
423
424                public void setUser(TransferSimpUser user) {
425                        this.user = user;
426                }
427
428                @Override
429                public TransferSimpUser getUser() {
430                        return this.user;
431                }
432
433                @SuppressWarnings("unused")
434                public void setSubscriptions(Set<TransferSimpSubscription> subscriptions) {
435                        this.subscriptions.addAll(subscriptions);
436                }
437
438                @Override
439                public Set<SimpSubscription> getSubscriptions() {
440                        return new HashSet<>(this.subscriptions);
441                }
442
443                private void afterDeserialization() {
444                        for (TransferSimpSubscription subscription : this.subscriptions) {
445                                subscription.setSession(this);
446                        }
447                }
448
449                @Override
450                public boolean equals(@Nullable Object other) {
451                        return (this == other || (other instanceof SimpSession && getId().equals(((SimpSession) other).getId())));
452                }
453
454                @Override
455                public int hashCode() {
456                        return getId().hashCode();
457                }
458
459                @Override
460                public String toString() {
461                        return "id=" + this.id + ", subscriptions=" + this.subscriptions;
462                }
463        }
464
465
466        /**
467         * SimpSubscription that can be (de)serialized and broadcast to other servers.
468         */
469        private static class TransferSimpSubscription implements SimpSubscription {
470
471                private String id;
472
473                private TransferSimpSession session;
474
475                private String destination;
476
477                /**
478                 * Default constructor for JSON deserialization.
479                 */
480                @SuppressWarnings("unused")
481                public TransferSimpSubscription() {
482                        this.id = "";
483                        this.session = new TransferSimpSession();
484                        this.destination = "";
485                }
486
487                /**
488                 * Constructor to create DTO from a local user subscription.
489                 */
490                public TransferSimpSubscription(SimpSubscription subscription) {
491                        this.id = subscription.getId();
492                        this.session = new TransferSimpSession();
493                        this.destination = subscription.getDestination();
494                }
495
496                @SuppressWarnings("unused")
497                public void setId(String id) {
498                        this.id = id;
499                }
500
501                @Override
502                public String getId() {
503                        return this.id;
504                }
505
506                public void setSession(TransferSimpSession session) {
507                        this.session = session;
508                }
509
510                @Override
511                public TransferSimpSession getSession() {
512                        return this.session;
513                }
514
515                @SuppressWarnings("unused")
516                public void setDestination(String destination) {
517                        this.destination = destination;
518                }
519
520                @Override
521                public String getDestination() {
522                        return this.destination;
523                }
524
525                @Override
526                public boolean equals(@Nullable Object other) {
527                        if (this == other) {
528                                return true;
529                        }
530                        if (!(other instanceof SimpSubscription)) {
531                                return false;
532                        }
533                        SimpSubscription otherSubscription = (SimpSubscription) other;
534                        return (getId().equals(otherSubscription.getId()) &&
535                                        ObjectUtils.nullSafeEquals(getSession(), otherSubscription.getSession()));
536                }
537
538                @Override
539                public int hashCode() {
540                        return getId().hashCode() * 31 + ObjectUtils.nullSafeHashCode(getSession());
541                }
542
543                @Override
544                public String toString() {
545                        return "destination=" + this.destination;
546                }
547        }
548
549
550        /**
551         * Helper class to find user sessions across all servers.
552         */
553        private class SessionLookup {
554
555                public Map<String, SimpSession> findSessions(String userName) {
556                        Map<String, SimpSession> map = new HashMap<>(4);
557                        SimpUser user = localRegistry.getUser(userName);
558                        if (user != null) {
559                                for (SimpSession session : user.getSessions()) {
560                                        map.put(session.getId(), session);
561                                }
562                        }
563                        for (UserRegistrySnapshot registry : remoteRegistries.values()) {
564                                TransferSimpUser transferUser = registry.getUserMap().get(userName);
565                                if (transferUser != null) {
566                                        transferUser.addSessions(map);
567                                }
568                        }
569                        return map;
570                }
571        }
572
573}