001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.web.socket.config; 018 019import java.time.Duration; 020import java.time.Instant; 021import java.util.concurrent.Executor; 022import java.util.concurrent.ScheduledFuture; 023import java.util.concurrent.TimeUnit; 024 025import org.apache.commons.logging.Log; 026import org.apache.commons.logging.LogFactory; 027 028import org.springframework.core.task.TaskExecutor; 029import org.springframework.lang.Nullable; 030import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; 031import org.springframework.scheduling.TaskScheduler; 032import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 033import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; 034import org.springframework.web.socket.messaging.StompSubProtocolHandler; 035import org.springframework.web.socket.messaging.SubProtocolHandler; 036import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler; 037 038/** 039 * A central class for aggregating information about internal state and counters 040 * from key infrastructure components of the setup that comes with 041 * {@code @EnableWebSocketMessageBroker} for Java config and 042 * {@code <websocket:message-broker>} for XML. 043 * 044 * <p>By default aggregated information is logged every 30 minutes at INFO level. 045 * The frequency of logging can be changed via {@link #setLoggingPeriod(long)}. 046 * 047 * <p>This class is declared as a Spring bean by the above configuration with the 048 * name "webSocketMessageBrokerStats" and can be easily exported to JMX, e.g. with 049 * the {@link org.springframework.jmx.export.MBeanExporter MBeanExporter}. 050 * 051 * @author Rossen Stoyanchev 052 * @since 4.1 053 */ 054public class WebSocketMessageBrokerStats { 055 056 private static final Log logger = LogFactory.getLog(WebSocketMessageBrokerStats.class); 057 058 059 @Nullable 060 private SubProtocolWebSocketHandler webSocketHandler; 061 062 @Nullable 063 private StompSubProtocolHandler stompSubProtocolHandler; 064 065 @Nullable 066 private StompBrokerRelayMessageHandler stompBrokerRelay; 067 068 @Nullable 069 private TaskExecutor inboundChannelExecutor; 070 071 @Nullable 072 private TaskExecutor outboundChannelExecutor; 073 074 @Nullable 075 private TaskScheduler sockJsTaskScheduler; 076 077 @Nullable 078 private ScheduledFuture<?> loggingTask; 079 080 private long loggingPeriod = TimeUnit.MINUTES.toMillis(30); 081 082 083 public void setSubProtocolWebSocketHandler(SubProtocolWebSocketHandler webSocketHandler) { 084 this.webSocketHandler = webSocketHandler; 085 this.stompSubProtocolHandler = initStompSubProtocolHandler(); 086 } 087 088 @Nullable 089 private StompSubProtocolHandler initStompSubProtocolHandler() { 090 if (this.webSocketHandler == null) { 091 return null; 092 } 093 for (SubProtocolHandler handler : this.webSocketHandler.getProtocolHandlers()) { 094 if (handler instanceof StompSubProtocolHandler) { 095 return (StompSubProtocolHandler) handler; 096 } 097 } 098 SubProtocolHandler defaultHandler = this.webSocketHandler.getDefaultProtocolHandler(); 099 if (defaultHandler != null && defaultHandler instanceof StompSubProtocolHandler) { 100 return (StompSubProtocolHandler) defaultHandler; 101 } 102 return null; 103 } 104 105 public void setStompBrokerRelay(StompBrokerRelayMessageHandler stompBrokerRelay) { 106 this.stompBrokerRelay = stompBrokerRelay; 107 } 108 109 public void setInboundChannelExecutor(TaskExecutor inboundChannelExecutor) { 110 this.inboundChannelExecutor = inboundChannelExecutor; 111 } 112 113 public void setOutboundChannelExecutor(TaskExecutor outboundChannelExecutor) { 114 this.outboundChannelExecutor = outboundChannelExecutor; 115 } 116 117 public void setSockJsTaskScheduler(TaskScheduler sockJsTaskScheduler) { 118 this.sockJsTaskScheduler = sockJsTaskScheduler; 119 this.loggingTask = initLoggingTask(TimeUnit.MINUTES.toMillis(1)); 120 } 121 122 @Nullable 123 private ScheduledFuture<?> initLoggingTask(long initialDelay) { 124 if (this.sockJsTaskScheduler != null && this.loggingPeriod > 0 && logger.isInfoEnabled()) { 125 return this.sockJsTaskScheduler.scheduleWithFixedDelay( 126 () -> logger.info(WebSocketMessageBrokerStats.this.toString()), 127 Instant.now().plusMillis(initialDelay), Duration.ofMillis(this.loggingPeriod)); 128 } 129 return null; 130 } 131 132 /** 133 * Set the frequency for logging information at INFO level in milliseconds. 134 * If set 0 or less than 0, the logging task is cancelled. 135 * <p>By default this property is set to 30 minutes (30 * 60 * 1000). 136 */ 137 public void setLoggingPeriod(long period) { 138 if (this.loggingTask != null) { 139 this.loggingTask.cancel(true); 140 } 141 this.loggingPeriod = period; 142 this.loggingTask = initLoggingTask(0); 143 } 144 145 /** 146 * Return the configured logging period frequency in milliseconds. 147 */ 148 public long getLoggingPeriod() { 149 return this.loggingPeriod; 150 } 151 152 /** 153 * Get stats about WebSocket sessions. 154 */ 155 public String getWebSocketSessionStatsInfo() { 156 return (this.webSocketHandler != null ? this.webSocketHandler.getStatsInfo() : "null"); 157 } 158 159 /** 160 * Get stats about STOMP-related WebSocket message processing. 161 */ 162 public String getStompSubProtocolStatsInfo() { 163 return (this.stompSubProtocolHandler != null ? this.stompSubProtocolHandler.getStatsInfo() : "null"); 164 } 165 166 /** 167 * Get stats about STOMP broker relay (when using a full-featured STOMP broker). 168 */ 169 public String getStompBrokerRelayStatsInfo() { 170 return (this.stompBrokerRelay != null ? this.stompBrokerRelay.getStatsInfo() : "null"); 171 } 172 173 /** 174 * Get stats about the executor processing incoming messages from WebSocket clients. 175 */ 176 public String getClientInboundExecutorStatsInfo() { 177 return (this.inboundChannelExecutor != null ? 178 getExecutorStatsInfo(this.inboundChannelExecutor) : "null"); 179 } 180 181 /** 182 * Get stats about the executor processing outgoing messages to WebSocket clients. 183 */ 184 public String getClientOutboundExecutorStatsInfo() { 185 return (this.outboundChannelExecutor != null ? 186 getExecutorStatsInfo(this.outboundChannelExecutor) : "null"); 187 } 188 189 /** 190 * Get stats about the SockJS task scheduler. 191 */ 192 public String getSockJsTaskSchedulerStatsInfo() { 193 if (this.sockJsTaskScheduler == null) { 194 return "null"; 195 } 196 if (this.sockJsTaskScheduler instanceof ThreadPoolTaskScheduler) { 197 return getExecutorStatsInfo(((ThreadPoolTaskScheduler) this.sockJsTaskScheduler) 198 .getScheduledThreadPoolExecutor()); 199 } 200 else { 201 return "unknown"; 202 } 203 } 204 205 private String getExecutorStatsInfo(Executor executor) { 206 executor = executor instanceof ThreadPoolTaskExecutor ? 207 ((ThreadPoolTaskExecutor) executor).getThreadPoolExecutor() : executor; 208 String str = executor.toString(); 209 return str.substring(str.indexOf("pool"), str.length() - 1); 210 } 211 212 @Override 213 public String toString() { 214 return "WebSocketSession[" + getWebSocketSessionStatsInfo() + "]" + 215 ", stompSubProtocol[" + getStompSubProtocolStatsInfo() + "]" + 216 ", stompBrokerRelay[" + getStompBrokerRelayStatsInfo() + "]" + 217 ", inboundChannel[" + getClientInboundExecutorStatsInfo() + "]" + 218 ", outboundChannel[" + getClientOutboundExecutorStatsInfo() + "]" + 219 ", sockJsScheduler[" + getSockJsTaskSchedulerStatsInfo() + "]"; 220 } 221 222}