001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.config; 018 019import org.springframework.lang.Nullable; 020import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 021import org.springframework.util.Assert; 022 023/** 024 * A registration class for customizing the properties of {@link ThreadPoolTaskExecutor}. 025 * 026 * @author Rossen Stoyanchev 027 * @author Juergen Hoeller 028 * @since 4.0 029 */ 030public class TaskExecutorRegistration { 031 032 private final ThreadPoolTaskExecutor taskExecutor; 033 034 @Nullable 035 private Integer corePoolSize; 036 037 @Nullable 038 private Integer maxPoolSize; 039 040 @Nullable 041 private Integer keepAliveSeconds; 042 043 @Nullable 044 private Integer queueCapacity; 045 046 047 /** 048 * Create a new {@code TaskExecutorRegistration} for a default 049 * {@link ThreadPoolTaskExecutor}. 050 */ 051 public TaskExecutorRegistration() { 052 this.taskExecutor = new ThreadPoolTaskExecutor(); 053 this.taskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2); 054 this.taskExecutor.setAllowCoreThreadTimeOut(true); 055 } 056 057 /** 058 * Create a new {@code TaskExecutorRegistration} for a given 059 * {@link ThreadPoolTaskExecutor}. 060 * @param taskExecutor the executor to use 061 */ 062 public TaskExecutorRegistration(ThreadPoolTaskExecutor taskExecutor) { 063 Assert.notNull(taskExecutor, "ThreadPoolTaskExecutor must not be null"); 064 this.taskExecutor = taskExecutor; 065 } 066 067 068 /** 069 * Set the core pool size of the ThreadPoolExecutor. 070 * <p><strong>NOTE:</strong> The core pool size is effectively the max pool size 071 * when an unbounded {@link #queueCapacity(int) queueCapacity} is configured 072 * (the default). This is essentially the "Unbounded queues" strategy as explained 073 * in {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor}. When 074 * this strategy is used, the {@link #maxPoolSize(int) maxPoolSize} is ignored. 075 * <p>By default this is set to twice the value of 076 * {@link Runtime#availableProcessors()}. In an application where tasks do not 077 * block frequently, the number should be closer to or equal to the number of 078 * available CPUs/cores. 079 */ 080 public TaskExecutorRegistration corePoolSize(int corePoolSize) { 081 this.corePoolSize = corePoolSize; 082 return this; 083 } 084 085 /** 086 * Set the max pool size of the ThreadPoolExecutor. 087 * <p><strong>NOTE:</strong> When an unbounded 088 * {@link #queueCapacity(int) queueCapacity} is configured (the default), the 089 * max pool size is effectively ignored. See the "Unbounded queues" strategy 090 * in {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor} for 091 * more details. 092 * <p>By default this is set to {@code Integer.MAX_VALUE}. 093 */ 094 public TaskExecutorRegistration maxPoolSize(int maxPoolSize) { 095 this.maxPoolSize = maxPoolSize; 096 return this; 097 } 098 099 /** 100 * Set the time limit for which threads may remain idle before being terminated. 101 * If there are more than the core number of threads currently in the pool, 102 * after waiting this amount of time without processing a task, excess threads 103 * will be terminated. This overrides any value set in the constructor. 104 * <p>By default this is set to 60. 105 */ 106 public TaskExecutorRegistration keepAliveSeconds(int keepAliveSeconds) { 107 this.keepAliveSeconds = keepAliveSeconds; 108 return this; 109 } 110 111 /** 112 * Set the queue capacity for the ThreadPoolExecutor. 113 * <p><strong>NOTE:</strong> when an unbounded {@code queueCapacity} is configured 114 * (the default), the core pool size is effectively the max pool size. This is 115 * essentially the "Unbounded queues" strategy as explained in 116 * {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor}. When 117 * this strategy is used, the {@link #maxPoolSize(int) maxPoolSize} is ignored. 118 * <p>By default this is set to {@code Integer.MAX_VALUE}. 119 */ 120 public TaskExecutorRegistration queueCapacity(int queueCapacity) { 121 this.queueCapacity = queueCapacity; 122 return this; 123 } 124 125 126 protected ThreadPoolTaskExecutor getTaskExecutor() { 127 if (this.corePoolSize != null) { 128 this.taskExecutor.setCorePoolSize(this.corePoolSize); 129 } 130 if (this.maxPoolSize != null) { 131 this.taskExecutor.setMaxPoolSize(this.maxPoolSize); 132 } 133 if (this.keepAliveSeconds != null) { 134 this.taskExecutor.setKeepAliveSeconds(this.keepAliveSeconds); 135 } 136 if (this.queueCapacity != null) { 137 this.taskExecutor.setQueueCapacity(this.queueCapacity); 138 } 139 return this.taskExecutor; 140 } 141 142}