001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.user;
018
019import java.net.InetAddress;
020import java.net.UnknownHostException;
021import java.util.HashMap;
022import java.util.HashSet;
023import java.util.Iterator;
024import java.util.Map;
025import java.util.Set;
026import java.util.UUID;
027import java.util.concurrent.ConcurrentHashMap;
028
029import org.springframework.context.ApplicationEvent;
030import org.springframework.context.event.SmartApplicationListener;
031import org.springframework.core.Ordered;
032import org.springframework.messaging.Message;
033import org.springframework.messaging.converter.MessageConverter;
034import org.springframework.util.Assert;
035import org.springframework.util.ObjectUtils;
036
037/**
038 * {@code SimpUserRegistry} that looks up users in a "local" user registry as
039 * well as a set of "remote" user registries. The local registry is provided as
040 * a constructor argument while remote registries are updated via broadcasts
041 * handled by {@link UserRegistryMessageHandler} which in turn notifies this
042 * registry when updates are received.
043 *
044 * @author Rossen Stoyanchev
045 * @since 4.2
046 */
047@SuppressWarnings("serial")
048public class MultiServerUserRegistry implements SimpUserRegistry, SmartApplicationListener {
049
050        private final String id;
051
052        private final SimpUserRegistry localRegistry;
053
054        private final Map<String, UserRegistrySnapshot> remoteRegistries = new ConcurrentHashMap<String, UserRegistrySnapshot>();
055
056        private final boolean delegateApplicationEvents;
057
058        /* Cross-server session lookup (e.g. same user connected to multiple servers) */
059        private final SessionLookup sessionLookup = new SessionLookup();
060
061
062        /**
063         * Create an instance wrapping the local user registry.
064         */
065        public MultiServerUserRegistry(SimpUserRegistry localRegistry) {
066                Assert.notNull(localRegistry, "'localRegistry' is required");
067                this.id = generateId();
068                this.localRegistry = localRegistry;
069                this.delegateApplicationEvents = this.localRegistry instanceof SmartApplicationListener;
070        }
071
072        private static String generateId() {
073                String host;
074                try {
075                        host = InetAddress.getLocalHost().getHostAddress();
076                }
077                catch (UnknownHostException ex) {
078                        host = "unknown";
079                }
080                return host + '-' + UUID.randomUUID();
081        }
082
083
084        @Override
085        public int getOrder() {
086                return (this.delegateApplicationEvents ?
087                                ((SmartApplicationListener) this.localRegistry).getOrder() : Ordered.LOWEST_PRECEDENCE);
088        }
089
090
091        // SmartApplicationListener methods
092
093        @Override
094        public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
095                return (this.delegateApplicationEvents &&
096                                ((SmartApplicationListener) this.localRegistry).supportsEventType(eventType));
097        }
098
099        @Override
100        public boolean supportsSourceType(Class<?> sourceType) {
101                return (this.delegateApplicationEvents &&
102                                ((SmartApplicationListener) this.localRegistry).supportsSourceType(sourceType));
103        }
104
105        @Override
106        public void onApplicationEvent(ApplicationEvent event) {
107                if (this.delegateApplicationEvents) {
108                        ((SmartApplicationListener) this.localRegistry).onApplicationEvent(event);
109                }
110        }
111
112
113        // SimpUserRegistry methods
114
115        @Override
116        public SimpUser getUser(String userName) {
117                // Prefer remote registries due to cross-server SessionLookup
118                for (UserRegistrySnapshot registry : this.remoteRegistries.values()) {
119                        SimpUser user = registry.getUserMap().get(userName);
120                        if (user != null) {
121                                return user;
122                        }
123                }
124                return this.localRegistry.getUser(userName);
125        }
126
127        @Override
128        public Set<SimpUser> getUsers() {
129                // Prefer remote registries due to cross-server SessionLookup
130                Set<SimpUser> result = new HashSet<SimpUser>();
131                for (UserRegistrySnapshot registry : this.remoteRegistries.values()) {
132                        result.addAll(registry.getUserMap().values());
133                }
134                result.addAll(this.localRegistry.getUsers());
135                return result;
136        }
137
138        @Override
139        public int getUserCount() {
140                int userCount = 0;
141                for (UserRegistrySnapshot registry : this.remoteRegistries.values()) {
142                        userCount += registry.getUserMap().size();
143                }
144                userCount += this.localRegistry.getUserCount();
145                return userCount;
146        }
147
148        @Override
149        public Set<SimpSubscription> findSubscriptions(SimpSubscriptionMatcher matcher) {
150                Set<SimpSubscription> result = new HashSet<SimpSubscription>();
151                for (UserRegistrySnapshot registry : this.remoteRegistries.values()) {
152                        result.addAll(registry.findSubscriptions(matcher));
153                }
154                result.addAll(this.localRegistry.findSubscriptions(matcher));
155                return result;
156        }
157
158
159        // Internal methods for UserRegistryMessageHandler to manage broadcasts
160
161        Object getLocalRegistryDto() {
162                return new UserRegistrySnapshot(this.id, this.localRegistry);
163        }
164
165        void addRemoteRegistryDto(Message<?> message, MessageConverter converter, long expirationPeriod) {
166                UserRegistrySnapshot registry = (UserRegistrySnapshot) converter.fromMessage(message, UserRegistrySnapshot.class);
167                if (registry != null && !registry.getId().equals(this.id)) {
168                        registry.init(expirationPeriod, this.sessionLookup);
169                        this.remoteRegistries.put(registry.getId(), registry);
170                }
171        }
172
173        void purgeExpiredRegistries() {
174                long now = System.currentTimeMillis();
175                Iterator<Map.Entry<String, UserRegistrySnapshot>> iterator = this.remoteRegistries.entrySet().iterator();
176                while (iterator.hasNext()) {
177                        Map.Entry<String, UserRegistrySnapshot> entry = iterator.next();
178                        if (entry.getValue().isExpired(now)) {
179                                iterator.remove();
180                        }
181                }
182        }
183
184
185        @Override
186        public String toString() {
187                return "local=[" + this.localRegistry + "], remote=" + this.remoteRegistries;
188        }
189
190
191        /**
192         * Holds a copy of a SimpUserRegistry for the purpose of broadcasting to and
193         * receiving broadcasts from other application servers.
194         */
195        private static class UserRegistrySnapshot {
196
197                private String id;
198
199                private Map<String, TransferSimpUser> users;
200
201                private long expirationTime;
202
203                /**
204                 * Default constructor for JSON deserialization.
205                 */
206                @SuppressWarnings("unused")
207                public UserRegistrySnapshot() {
208                }
209
210                /**
211                 * Constructor to create DTO from a local user registry.
212                 */
213                public UserRegistrySnapshot(String id, SimpUserRegistry registry) {
214                        this.id = id;
215                        Set<SimpUser> users = registry.getUsers();
216                        this.users = new HashMap<String, TransferSimpUser>(users.size());
217                        for (SimpUser user : users) {
218                                this.users.put(user.getName(), new TransferSimpUser(user));
219                        }
220                }
221
222                public void setId(String id) {
223                        this.id = id;
224                }
225
226                public String getId() {
227                        return this.id;
228                }
229
230                public void setUserMap(Map<String, TransferSimpUser> users) {
231                        this.users = users;
232                }
233
234                public Map<String, TransferSimpUser> getUserMap() {
235                        return this.users;
236                }
237
238                public boolean isExpired(long now) {
239                        return (now > this.expirationTime);
240                }
241
242                public void init(long expirationPeriod, SessionLookup sessionLookup) {
243                        this.expirationTime = System.currentTimeMillis() + expirationPeriod;
244                        for (TransferSimpUser user : this.users.values()) {
245                                user.afterDeserialization(sessionLookup);
246                        }
247                }
248
249                public Set<SimpSubscription> findSubscriptions(SimpSubscriptionMatcher matcher) {
250                        Set<SimpSubscription> result = new HashSet<SimpSubscription>();
251                        for (TransferSimpUser user : this.users.values()) {
252                                for (TransferSimpSession session : user.sessions) {
253                                        for (SimpSubscription subscription : session.subscriptions) {
254                                                if (matcher.match(subscription)) {
255                                                        result.add(subscription);
256                                                }
257                                        }
258                                }
259                        }
260                        return result;
261                }
262
263                @Override
264                public String toString() {
265                        return "id=" + this.id + ", users=" + this.users;
266                }
267        }
268
269
270        /**
271         * SimpUser that can be (de)serialized and broadcast to other servers.
272         */
273        private static class TransferSimpUser implements SimpUser {
274
275                private String name;
276
277                // User sessions from "this" registry only (i.e. one server)
278                private Set<TransferSimpSession> sessions;
279
280                // Cross-server session lookup (e.g. user connected to multiple servers)
281                private SessionLookup sessionLookup;
282
283                /**
284                 * Default constructor for JSON deserialization.
285                 */
286                @SuppressWarnings("unused")
287                public TransferSimpUser() {
288                        this.sessions = new HashSet<TransferSimpSession>(1);
289                }
290
291                /**
292                 * Constructor to create user from a local user.
293                 */
294                public TransferSimpUser(SimpUser user) {
295                        this.name = user.getName();
296                        Set<SimpSession> sessions = user.getSessions();
297                        this.sessions = new HashSet<TransferSimpSession>(sessions.size());
298                        for (SimpSession session : sessions) {
299                                this.sessions.add(new TransferSimpSession(session));
300                        }
301                }
302
303                public void setName(String name) {
304                        this.name = name;
305                }
306
307                @Override
308                public String getName() {
309                        return this.name;
310                }
311
312                @Override
313                public boolean hasSessions() {
314                        if (this.sessionLookup != null) {
315                                return !this.sessionLookup.findSessions(getName()).isEmpty();
316                        }
317                        return !this.sessions.isEmpty();
318                }
319
320                @Override
321                public SimpSession getSession(String sessionId) {
322                        if (this.sessionLookup != null) {
323                                return this.sessionLookup.findSessions(getName()).get(sessionId);
324                        }
325                        for (TransferSimpSession session : this.sessions) {
326                                if (session.getId().equals(sessionId)) {
327                                        return session;
328                                }
329                        }
330                        return null;
331                }
332
333                public void setSessions(Set<TransferSimpSession> sessions) {
334                        this.sessions.addAll(sessions);
335                }
336
337                @Override
338                public Set<SimpSession> getSessions() {
339                        if (this.sessionLookup != null) {
340                                Map<String, SimpSession> sessions = this.sessionLookup.findSessions(getName());
341                                return new HashSet<SimpSession>(sessions.values());
342                        }
343                        return new HashSet<SimpSession>(this.sessions);
344                }
345
346                private void afterDeserialization(SessionLookup sessionLookup) {
347                        this.sessionLookup = sessionLookup;
348                        for (TransferSimpSession session : this.sessions) {
349                                session.setUser(this);
350                                session.afterDeserialization();
351                        }
352                }
353
354                private void addSessions(Map<String, SimpSession> map) {
355                        for (SimpSession session : this.sessions) {
356                                map.put(session.getId(), session);
357                        }
358                }
359
360
361                @Override
362                public boolean equals(Object other) {
363                        return (this == other || (other instanceof SimpUser && this.name.equals(((SimpUser) other).getName())));
364                }
365
366                @Override
367                public int hashCode() {
368                        return this.name.hashCode();
369                }
370
371                @Override
372                public String toString() {
373                        return "name=" + this.name + ", sessions=" + this.sessions;
374                }
375        }
376
377
378        /**
379         * SimpSession that can be (de)serialized and broadcast to other servers.
380         */
381        private static class TransferSimpSession implements SimpSession {
382
383                private String id;
384
385                private TransferSimpUser user;
386
387                private final Set<TransferSimpSubscription> subscriptions;
388
389                /**
390                 * Default constructor for JSON deserialization.
391                 */
392                @SuppressWarnings("unused")
393                public TransferSimpSession() {
394                        this.subscriptions = new HashSet<TransferSimpSubscription>(4);
395                }
396
397                /**
398                 * Constructor to create DTO from the local user session.
399                 */
400                public TransferSimpSession(SimpSession session) {
401                        this.id = session.getId();
402                        Set<SimpSubscription> subscriptions = session.getSubscriptions();
403                        this.subscriptions = new HashSet<TransferSimpSubscription>(subscriptions.size());
404                        for (SimpSubscription subscription : subscriptions) {
405                                this.subscriptions.add(new TransferSimpSubscription(subscription));
406                        }
407                }
408
409                public void setId(String id) {
410                        this.id = id;
411                }
412
413                @Override
414                public String getId() {
415                        return this.id;
416                }
417
418                public void setUser(TransferSimpUser user) {
419                        this.user = user;
420                }
421
422                @Override
423                public TransferSimpUser getUser() {
424                        return this.user;
425                }
426
427                public void setSubscriptions(Set<TransferSimpSubscription> subscriptions) {
428                        this.subscriptions.addAll(subscriptions);
429                }
430
431                @Override
432                public Set<SimpSubscription> getSubscriptions() {
433                        return new HashSet<SimpSubscription>(this.subscriptions);
434                }
435
436                private void afterDeserialization() {
437                        for (TransferSimpSubscription subscription : this.subscriptions) {
438                                subscription.setSession(this);
439                        }
440                }
441
442                @Override
443                public boolean equals(Object other) {
444                        return (this == other || (other instanceof SimpSession && this.id.equals(((SimpSession) other).getId())));
445                }
446
447                @Override
448                public int hashCode() {
449                        return this.id.hashCode();
450                }
451
452                @Override
453                public String toString() {
454                        return "id=" + this.id + ", subscriptions=" + this.subscriptions;
455                }
456        }
457
458
459        /**
460         * SimpSubscription that can be (de)serialized and broadcast to other servers.
461         */
462        private static class TransferSimpSubscription implements SimpSubscription {
463
464                private String id;
465
466                private TransferSimpSession session;
467
468                private String destination;
469
470                /**
471                 * Default constructor for JSON deserialization.
472                 */
473                @SuppressWarnings("unused")
474                public TransferSimpSubscription() {
475                }
476
477                /**
478                 * Constructor to create DTO from a local user subscription.
479                 */
480                public TransferSimpSubscription(SimpSubscription subscription) {
481                        this.id = subscription.getId();
482                        this.destination = subscription.getDestination();
483                }
484
485                public void setId(String id) {
486                        this.id = id;
487                }
488
489                @Override
490                public String getId() {
491                        return this.id;
492                }
493
494                public void setSession(TransferSimpSession session) {
495                        this.session = session;
496                }
497
498                @Override
499                public TransferSimpSession getSession() {
500                        return this.session;
501                }
502
503                public void setDestination(String destination) {
504                        this.destination = destination;
505                }
506
507                @Override
508                public String getDestination() {
509                        return this.destination;
510                }
511
512                @Override
513                public boolean equals(Object other) {
514                        if (this == other) {
515                                return true;
516                        }
517                        if (!(other instanceof SimpSubscription)) {
518                                return false;
519                        }
520                        SimpSubscription otherSubscription = (SimpSubscription) other;
521                        return (getId().equals(otherSubscription.getId()) &&
522                                        ObjectUtils.nullSafeEquals(getSession(), otherSubscription.getSession()));
523                }
524
525                @Override
526                public int hashCode() {
527                        return getId().hashCode() * 31 + ObjectUtils.nullSafeHashCode(getSession());
528                }
529
530                @Override
531                public String toString() {
532                        return "destination=" + this.destination;
533                }
534        }
535
536
537        /**
538         * Helper class to find user sessions across all servers.
539         */
540        private class SessionLookup {
541
542                public Map<String, SimpSession> findSessions(String userName) {
543                        Map<String, SimpSession> map = new HashMap<String, SimpSession>(1);
544                        SimpUser user = localRegistry.getUser(userName);
545                        if (user != null) {
546                                for (SimpSession session : user.getSessions()) {
547                                        map.put(session.getId(), session);
548                                }
549                        }
550                        for (UserRegistrySnapshot registry : remoteRegistries.values()) {
551                                TransferSimpUser transferUser = registry.getUserMap().get(userName);
552                                if (transferUser != null) {
553                                        transferUser.addSessions(map);
554                                }
555                        }
556                        return map;
557                }
558        }
559
560}