001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.user;
018
019import java.util.concurrent.ScheduledFuture;
020
021import org.springframework.context.ApplicationListener;
022import org.springframework.messaging.Message;
023import org.springframework.messaging.MessageHandler;
024import org.springframework.messaging.MessagingException;
025import org.springframework.messaging.converter.MessageConverter;
026import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
027import org.springframework.messaging.simp.SimpMessageType;
028import org.springframework.messaging.simp.SimpMessagingTemplate;
029import org.springframework.messaging.simp.broker.BrokerAvailabilityEvent;
030import org.springframework.scheduling.TaskScheduler;
031import org.springframework.util.Assert;
032
033/**
034 * {@code MessageHandler} that handles user registry broadcasts from other
035 * application servers and periodically broadcasts the content of the local
036 * user registry.
037 *
038 * <p>The aggregated information is maintained in a {@link MultiServerUserRegistry}.
039 *
040 * @author Rossen Stoyanchev
041 * @since 4.2
042 */
043public class UserRegistryMessageHandler implements MessageHandler, ApplicationListener<BrokerAvailabilityEvent> {
044
045        private final MultiServerUserRegistry userRegistry;
046
047        private final SimpMessagingTemplate brokerTemplate;
048
049        private final String broadcastDestination;
050
051        private final TaskScheduler scheduler;
052
053        private final UserRegistryTask schedulerTask = new UserRegistryTask();
054
055        private volatile ScheduledFuture<?> scheduledFuture;
056
057        private long registryExpirationPeriod = 20 * 1000;
058
059
060        /**
061         * Constructor.
062         * @param userRegistry the registry with local and remote user registry information
063         * @param brokerTemplate template for broadcasting local registry information
064         * @param broadcastDestination the destination to broadcast to
065         * @param scheduler the task scheduler to use
066         */
067        public UserRegistryMessageHandler(MultiServerUserRegistry userRegistry,
068                        SimpMessagingTemplate brokerTemplate, String broadcastDestination, TaskScheduler scheduler) {
069
070                Assert.notNull(userRegistry, "'userRegistry' is required");
071                Assert.notNull(brokerTemplate, "'brokerTemplate' is required");
072                Assert.hasText(broadcastDestination, "'broadcastDestination' is required");
073                Assert.notNull(scheduler, "'scheduler' is required");
074
075                this.userRegistry = userRegistry;
076                this.brokerTemplate = brokerTemplate;
077                this.broadcastDestination = broadcastDestination;
078                this.scheduler = scheduler;
079        }
080
081
082        /**
083         * Return the configured destination for broadcasting UserRegistry information.
084         */
085        public String getBroadcastDestination() {
086                return this.broadcastDestination;
087        }
088
089        /**
090         * Configure the amount of time (in milliseconds) before a remote user
091         * registry snapshot is considered expired.
092         * <p>By default this is set to 20 seconds (value of 20000).
093         * @param milliseconds the expiration period in milliseconds
094         */
095        @SuppressWarnings("unused")
096        public void setRegistryExpirationPeriod(long milliseconds) {
097                this.registryExpirationPeriod = milliseconds;
098        }
099
100        /**
101         * Return the configured registry expiration period.
102         */
103        public long getRegistryExpirationPeriod() {
104                return this.registryExpirationPeriod;
105        }
106
107
108        @Override
109        public void onApplicationEvent(BrokerAvailabilityEvent event) {
110                if (event.isBrokerAvailable()) {
111                        long delay = getRegistryExpirationPeriod() / 2;
112                        this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(this.schedulerTask, delay);
113                }
114                else {
115                        ScheduledFuture<?> future = this.scheduledFuture;
116                        if (future != null ){
117                                future.cancel(true);
118                                this.scheduledFuture = null;
119                        }
120                }
121        }
122
123        @Override
124        public void handleMessage(Message<?> message) throws MessagingException {
125                MessageConverter converter = this.brokerTemplate.getMessageConverter();
126                this.userRegistry.addRemoteRegistryDto(message, converter, getRegistryExpirationPeriod());
127        }
128
129
130        private class UserRegistryTask implements Runnable {
131
132                @Override
133                public void run() {
134                        try {
135                                SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
136                                accessor.setHeader(SimpMessageHeaderAccessor.IGNORE_ERROR, true);
137                                accessor.setLeaveMutable(true);
138                                Object payload = userRegistry.getLocalRegistryDto();
139                                brokerTemplate.convertAndSend(getBroadcastDestination(), payload, accessor.getMessageHeaders());
140                        }
141                        finally {
142                                userRegistry.purgeExpiredRegistries();
143                        }
144                }
145        }
146
147}