001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.web.socket.config;
018
019import java.time.Duration;
020import java.time.Instant;
021import java.util.concurrent.Executor;
022import java.util.concurrent.ScheduledFuture;
023import java.util.concurrent.TimeUnit;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027
028import org.springframework.core.task.TaskExecutor;
029import org.springframework.lang.Nullable;
030import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
031import org.springframework.scheduling.TaskScheduler;
032import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
033import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
034import org.springframework.web.socket.messaging.StompSubProtocolHandler;
035import org.springframework.web.socket.messaging.SubProtocolHandler;
036import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler;
037
038/**
039 * A central class for aggregating information about internal state and counters
040 * from key infrastructure components of the setup that comes with
041 * {@code @EnableWebSocketMessageBroker} for Java config and
042 * {@code <websocket:message-broker>} for XML.
043 *
044 * <p>By default aggregated information is logged every 30 minutes at INFO level.
045 * The frequency of logging can be changed via {@link #setLoggingPeriod(long)}.
046 *
047 * <p>This class is declared as a Spring bean by the above configuration with the
048 * name "webSocketMessageBrokerStats" and can be easily exported to JMX, e.g. with
049 * the {@link org.springframework.jmx.export.MBeanExporter MBeanExporter}.
050 *
051 * @author Rossen Stoyanchev
052 * @since 4.1
053 */
054public class WebSocketMessageBrokerStats {
055
056        private static final Log logger = LogFactory.getLog(WebSocketMessageBrokerStats.class);
057
058
059        @Nullable
060        private SubProtocolWebSocketHandler webSocketHandler;
061
062        @Nullable
063        private StompSubProtocolHandler stompSubProtocolHandler;
064
065        @Nullable
066        private StompBrokerRelayMessageHandler stompBrokerRelay;
067
068        @Nullable
069        private TaskExecutor inboundChannelExecutor;
070
071        @Nullable
072        private TaskExecutor outboundChannelExecutor;
073
074        @Nullable
075        private TaskScheduler sockJsTaskScheduler;
076
077        @Nullable
078        private ScheduledFuture<?> loggingTask;
079
080        private long loggingPeriod = TimeUnit.MINUTES.toMillis(30);
081
082
083        public void setSubProtocolWebSocketHandler(SubProtocolWebSocketHandler webSocketHandler) {
084                this.webSocketHandler = webSocketHandler;
085                this.stompSubProtocolHandler = initStompSubProtocolHandler();
086        }
087
088        @Nullable
089        private StompSubProtocolHandler initStompSubProtocolHandler() {
090                if (this.webSocketHandler == null) {
091                        return null;
092                }
093                for (SubProtocolHandler handler : this.webSocketHandler.getProtocolHandlers()) {
094                        if (handler instanceof StompSubProtocolHandler) {
095                                return (StompSubProtocolHandler) handler;
096                        }
097                }
098                SubProtocolHandler defaultHandler = this.webSocketHandler.getDefaultProtocolHandler();
099                if (defaultHandler != null && defaultHandler instanceof StompSubProtocolHandler) {
100                        return (StompSubProtocolHandler) defaultHandler;
101                }
102                return null;
103        }
104
105        public void setStompBrokerRelay(StompBrokerRelayMessageHandler stompBrokerRelay) {
106                this.stompBrokerRelay = stompBrokerRelay;
107        }
108
109        public void setInboundChannelExecutor(TaskExecutor inboundChannelExecutor) {
110                this.inboundChannelExecutor = inboundChannelExecutor;
111        }
112
113        public void setOutboundChannelExecutor(TaskExecutor outboundChannelExecutor) {
114                this.outboundChannelExecutor = outboundChannelExecutor;
115        }
116
117        public void setSockJsTaskScheduler(TaskScheduler sockJsTaskScheduler) {
118                this.sockJsTaskScheduler = sockJsTaskScheduler;
119                this.loggingTask = initLoggingTask(TimeUnit.MINUTES.toMillis(1));
120        }
121
122        @Nullable
123        private ScheduledFuture<?> initLoggingTask(long initialDelay) {
124                if (this.sockJsTaskScheduler != null && this.loggingPeriod > 0 && logger.isInfoEnabled()) {
125                        return this.sockJsTaskScheduler.scheduleWithFixedDelay(
126                                        () -> logger.info(WebSocketMessageBrokerStats.this.toString()),
127                                        Instant.now().plusMillis(initialDelay), Duration.ofMillis(this.loggingPeriod));
128                }
129                return null;
130        }
131
132        /**
133         * Set the frequency for logging information at INFO level in milliseconds.
134         * If set 0 or less than 0, the logging task is cancelled.
135         * <p>By default this property is set to 30 minutes (30 * 60 * 1000).
136         */
137        public void setLoggingPeriod(long period) {
138                if (this.loggingTask != null) {
139                        this.loggingTask.cancel(true);
140                }
141                this.loggingPeriod = period;
142                this.loggingTask = initLoggingTask(0);
143        }
144
145        /**
146         * Return the configured logging period frequency in milliseconds.
147         */
148        public long getLoggingPeriod() {
149                return this.loggingPeriod;
150        }
151
152        /**
153         * Get stats about WebSocket sessions.
154         */
155        public String getWebSocketSessionStatsInfo() {
156                return (this.webSocketHandler != null ? this.webSocketHandler.getStatsInfo() : "null");
157        }
158
159        /**
160         * Get stats about STOMP-related WebSocket message processing.
161         */
162        public String getStompSubProtocolStatsInfo() {
163                return (this.stompSubProtocolHandler != null ? this.stompSubProtocolHandler.getStatsInfo() : "null");
164        }
165
166        /**
167         * Get stats about STOMP broker relay (when using a full-featured STOMP broker).
168         */
169        public String getStompBrokerRelayStatsInfo() {
170                return (this.stompBrokerRelay != null ? this.stompBrokerRelay.getStatsInfo() : "null");
171        }
172
173        /**
174         * Get stats about the executor processing incoming messages from WebSocket clients.
175         */
176        public String getClientInboundExecutorStatsInfo() {
177                return (this.inboundChannelExecutor != null ?
178                                getExecutorStatsInfo(this.inboundChannelExecutor) : "null");
179        }
180
181        /**
182         * Get stats about the executor processing outgoing messages to WebSocket clients.
183         */
184        public String getClientOutboundExecutorStatsInfo() {
185                return (this.outboundChannelExecutor != null ?
186                                getExecutorStatsInfo(this.outboundChannelExecutor) : "null");
187        }
188
189        /**
190         * Get stats about the SockJS task scheduler.
191         */
192        public String getSockJsTaskSchedulerStatsInfo() {
193                if (this.sockJsTaskScheduler == null) {
194                        return "null";
195                }
196                if (this.sockJsTaskScheduler instanceof ThreadPoolTaskScheduler) {
197                        return getExecutorStatsInfo(((ThreadPoolTaskScheduler) this.sockJsTaskScheduler)
198                                        .getScheduledThreadPoolExecutor());
199                }
200                else {
201                        return "unknown";
202                }
203        }
204
205        private String getExecutorStatsInfo(Executor executor) {
206                executor = executor instanceof ThreadPoolTaskExecutor ?
207                                ((ThreadPoolTaskExecutor) executor).getThreadPoolExecutor() : executor;
208                String str = executor.toString();
209                return str.substring(str.indexOf("pool"), str.length() - 1);
210        }
211
212        @Override
213        public String toString() {
214                return "WebSocketSession[" + getWebSocketSessionStatsInfo() + "]" +
215                                ", stompSubProtocol[" + getStompSubProtocolStatsInfo() + "]" +
216                                ", stompBrokerRelay[" + getStompBrokerRelayStatsInfo() + "]" +
217                                ", inboundChannel[" + getClientInboundExecutorStatsInfo() + "]" +
218                                ", outboundChannel[" + getClientOutboundExecutorStatsInfo() + "]" +
219                                ", sockJsScheduler[" + getSockJsTaskSchedulerStatsInfo() + "]";
220        }
221
222}