001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.user;
018
019import java.util.concurrent.ScheduledFuture;
020import java.util.concurrent.TimeUnit;
021
022import org.springframework.context.ApplicationListener;
023import org.springframework.lang.Nullable;
024import org.springframework.messaging.Message;
025import org.springframework.messaging.MessageHandler;
026import org.springframework.messaging.MessagingException;
027import org.springframework.messaging.converter.MessageConverter;
028import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
029import org.springframework.messaging.simp.SimpMessageType;
030import org.springframework.messaging.simp.SimpMessagingTemplate;
031import org.springframework.messaging.simp.broker.BrokerAvailabilityEvent;
032import org.springframework.scheduling.TaskScheduler;
033import org.springframework.util.Assert;
034
035/**
036 * {@code MessageHandler} that handles user registry broadcasts from other
037 * application servers and periodically broadcasts the content of the local
038 * user registry.
039 *
040 * <p>The aggregated information is maintained in a {@link MultiServerUserRegistry}.
041 *
042 * @author Rossen Stoyanchev
043 * @since 4.2
044 */
045public class UserRegistryMessageHandler implements MessageHandler, ApplicationListener<BrokerAvailabilityEvent> {
046
047        private final MultiServerUserRegistry userRegistry;
048
049        private final SimpMessagingTemplate brokerTemplate;
050
051        private final String broadcastDestination;
052
053        private final TaskScheduler scheduler;
054
055        private final UserRegistryTask schedulerTask = new UserRegistryTask();
056
057        @Nullable
058        private volatile ScheduledFuture<?> scheduledFuture;
059
060        private long registryExpirationPeriod = TimeUnit.SECONDS.toMillis(20);
061
062
063        /**
064         * Constructor.
065         * @param userRegistry the registry with local and remote user registry information
066         * @param brokerTemplate template for broadcasting local registry information
067         * @param broadcastDestination the destination to broadcast to
068         * @param scheduler the task scheduler to use
069         */
070        public UserRegistryMessageHandler(MultiServerUserRegistry userRegistry,
071                        SimpMessagingTemplate brokerTemplate, String broadcastDestination, TaskScheduler scheduler) {
072
073                Assert.notNull(userRegistry, "'userRegistry' is required");
074                Assert.notNull(brokerTemplate, "'brokerTemplate' is required");
075                Assert.hasText(broadcastDestination, "'broadcastDestination' is required");
076                Assert.notNull(scheduler, "'scheduler' is required");
077
078                this.userRegistry = userRegistry;
079                this.brokerTemplate = brokerTemplate;
080                this.broadcastDestination = broadcastDestination;
081                this.scheduler = scheduler;
082        }
083
084
085        /**
086         * Return the configured destination for broadcasting UserRegistry information.
087         */
088        public String getBroadcastDestination() {
089                return this.broadcastDestination;
090        }
091
092        /**
093         * Configure the amount of time (in milliseconds) before a remote user
094         * registry snapshot is considered expired.
095         * <p>By default this is set to 20 seconds (value of 20000).
096         * @param milliseconds the expiration period in milliseconds
097         */
098        @SuppressWarnings("unused")
099        public void setRegistryExpirationPeriod(long milliseconds) {
100                this.registryExpirationPeriod = milliseconds;
101        }
102
103        /**
104         * Return the configured registry expiration period.
105         */
106        public long getRegistryExpirationPeriod() {
107                return this.registryExpirationPeriod;
108        }
109
110
111        @Override
112        public void onApplicationEvent(BrokerAvailabilityEvent event) {
113                if (event.isBrokerAvailable()) {
114                        long delay = getRegistryExpirationPeriod() / 2;
115                        this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(this.schedulerTask, delay);
116                }
117                else {
118                        ScheduledFuture<?> future = this.scheduledFuture;
119                        if (future != null ){
120                                future.cancel(true);
121                                this.scheduledFuture = null;
122                        }
123                }
124        }
125
126        @Override
127        public void handleMessage(Message<?> message) throws MessagingException {
128                MessageConverter converter = this.brokerTemplate.getMessageConverter();
129                this.userRegistry.addRemoteRegistryDto(message, converter, getRegistryExpirationPeriod());
130        }
131
132
133        private class UserRegistryTask implements Runnable {
134
135                @Override
136                public void run() {
137                        try {
138                                SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
139                                accessor.setHeader(SimpMessageHeaderAccessor.IGNORE_ERROR, true);
140                                accessor.setLeaveMutable(true);
141                                Object payload = userRegistry.getLocalRegistryDto();
142                                brokerTemplate.convertAndSend(getBroadcastDestination(), payload, accessor.getMessageHeaders());
143                        }
144                        finally {
145                                userRegistry.purgeExpiredRegistries();
146                        }
147                }
148        }
149
150}