001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.scheduling.concurrent; 018 019import java.util.concurrent.BlockingQueue; 020import java.util.concurrent.Callable; 021import java.util.concurrent.Executor; 022import java.util.concurrent.ExecutorService; 023import java.util.concurrent.Future; 024import java.util.concurrent.LinkedBlockingQueue; 025import java.util.concurrent.RejectedExecutionException; 026import java.util.concurrent.RejectedExecutionHandler; 027import java.util.concurrent.SynchronousQueue; 028import java.util.concurrent.ThreadFactory; 029import java.util.concurrent.ThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031 032import org.springframework.core.task.AsyncListenableTaskExecutor; 033import org.springframework.core.task.TaskDecorator; 034import org.springframework.core.task.TaskRejectedException; 035import org.springframework.scheduling.SchedulingTaskExecutor; 036import org.springframework.util.Assert; 037import org.springframework.util.concurrent.ListenableFuture; 038import org.springframework.util.concurrent.ListenableFutureTask; 039 040/** 041 * JavaBean that allows for configuring a {@link java.util.concurrent.ThreadPoolExecutor} 042 * in bean style (through its "corePoolSize", "maxPoolSize", "keepAliveSeconds", "queueCapacity" 043 * properties) and exposing it as a Spring {@link org.springframework.core.task.TaskExecutor}. 044 * This class is also well suited for management and monitoring (e.g. through JMX), 045 * providing several useful attributes: "corePoolSize", "maxPoolSize", "keepAliveSeconds" 046 * (all supporting updates at runtime); "poolSize", "activeCount" (for introspection only). 047 * 048 * <p>The default configuration is a core pool size of 1, with unlimited max pool size 049 * and unlimited queue capacity. This is roughly equivalent to 050 * {@link java.util.concurrent.Executors#newSingleThreadExecutor()}, sharing a single 051 * thread for all tasks. Setting {@link #setQueueCapacity "queueCapacity"} to 0 mimics 052 * {@link java.util.concurrent.Executors#newCachedThreadPool()}, with immediate scaling 053 * of threads in the pool to a potentially very high number. Consider also setting a 054 * {@link #setMaxPoolSize "maxPoolSize"} at that point, as well as possibly a higher 055 * {@link #setCorePoolSize "corePoolSize"} (see also the 056 * {@link #setAllowCoreThreadTimeOut "allowCoreThreadTimeOut"} mode of scaling). 057 * 058 * <p><b>NOTE:</b> This class implements Spring's 059 * {@link org.springframework.core.task.TaskExecutor} interface as well as the 060 * {@link java.util.concurrent.Executor} interface, with the former being the primary 061 * interface, the other just serving as secondary convenience. For this reason, the 062 * exception handling follows the TaskExecutor contract rather than the Executor contract, 063 * in particular regarding the {@link org.springframework.core.task.TaskRejectedException}. 064 * 065 * <p>For an alternative, you may set up a ThreadPoolExecutor instance directly using 066 * constructor injection, or use a factory method definition that points to the 067 * {@link java.util.concurrent.Executors} class. To expose such a raw Executor as a 068 * Spring {@link org.springframework.core.task.TaskExecutor}, simply wrap it with a 069 * {@link org.springframework.scheduling.concurrent.ConcurrentTaskExecutor} adapter. 070 * 071 * @author Juergen Hoeller 072 * @since 2.0 073 * @see org.springframework.core.task.TaskExecutor 074 * @see java.util.concurrent.ThreadPoolExecutor 075 * @see ThreadPoolExecutorFactoryBean 076 * @see ConcurrentTaskExecutor 077 */ 078@SuppressWarnings("serial") 079public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport 080 implements AsyncListenableTaskExecutor, SchedulingTaskExecutor { 081 082 private final Object poolSizeMonitor = new Object(); 083 084 private int corePoolSize = 1; 085 086 private int maxPoolSize = Integer.MAX_VALUE; 087 088 private int keepAliveSeconds = 60; 089 090 private int queueCapacity = Integer.MAX_VALUE; 091 092 private boolean allowCoreThreadTimeOut = false; 093 094 private TaskDecorator taskDecorator; 095 096 private ThreadPoolExecutor threadPoolExecutor; 097 098 099 /** 100 * Set the ThreadPoolExecutor's core pool size. 101 * Default is 1. 102 * <p><b>This setting can be modified at runtime, for example through JMX.</b> 103 */ 104 public void setCorePoolSize(int corePoolSize) { 105 synchronized (this.poolSizeMonitor) { 106 this.corePoolSize = corePoolSize; 107 if (this.threadPoolExecutor != null) { 108 this.threadPoolExecutor.setCorePoolSize(corePoolSize); 109 } 110 } 111 } 112 113 /** 114 * Return the ThreadPoolExecutor's core pool size. 115 */ 116 public int getCorePoolSize() { 117 synchronized (this.poolSizeMonitor) { 118 return this.corePoolSize; 119 } 120 } 121 122 /** 123 * Set the ThreadPoolExecutor's maximum pool size. 124 * Default is {@code Integer.MAX_VALUE}. 125 * <p><b>This setting can be modified at runtime, for example through JMX.</b> 126 */ 127 public void setMaxPoolSize(int maxPoolSize) { 128 synchronized (this.poolSizeMonitor) { 129 this.maxPoolSize = maxPoolSize; 130 if (this.threadPoolExecutor != null) { 131 this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize); 132 } 133 } 134 } 135 136 /** 137 * Return the ThreadPoolExecutor's maximum pool size. 138 */ 139 public int getMaxPoolSize() { 140 synchronized (this.poolSizeMonitor) { 141 return this.maxPoolSize; 142 } 143 } 144 145 /** 146 * Set the ThreadPoolExecutor's keep-alive seconds. 147 * Default is 60. 148 * <p><b>This setting can be modified at runtime, for example through JMX.</b> 149 */ 150 public void setKeepAliveSeconds(int keepAliveSeconds) { 151 synchronized (this.poolSizeMonitor) { 152 this.keepAliveSeconds = keepAliveSeconds; 153 if (this.threadPoolExecutor != null) { 154 this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS); 155 } 156 } 157 } 158 159 /** 160 * Return the ThreadPoolExecutor's keep-alive seconds. 161 */ 162 public int getKeepAliveSeconds() { 163 synchronized (this.poolSizeMonitor) { 164 return this.keepAliveSeconds; 165 } 166 } 167 168 /** 169 * Set the capacity for the ThreadPoolExecutor's BlockingQueue. 170 * Default is {@code Integer.MAX_VALUE}. 171 * <p>Any positive value will lead to a LinkedBlockingQueue instance; 172 * any other value will lead to a SynchronousQueue instance. 173 * @see java.util.concurrent.LinkedBlockingQueue 174 * @see java.util.concurrent.SynchronousQueue 175 */ 176 public void setQueueCapacity(int queueCapacity) { 177 this.queueCapacity = queueCapacity; 178 } 179 180 /** 181 * Specify whether to allow core threads to time out. This enables dynamic 182 * growing and shrinking even in combination with a non-zero queue (since 183 * the max pool size will only grow once the queue is full). 184 * <p>Default is "false". 185 * @see java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean) 186 */ 187 public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) { 188 this.allowCoreThreadTimeOut = allowCoreThreadTimeOut; 189 } 190 191 /** 192 * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable} 193 * about to be executed. 194 * <p>Note that such a decorator is not necessarily being applied to the 195 * user-supplied {@code Runnable}/{@code Callable} but rather to the actual 196 * execution callback (which may be a wrapper around the user-supplied task). 197 * <p>The primary use case is to set some execution context around the task's 198 * invocation, or to provide some monitoring/statistics for task execution. 199 * <p><b>NOTE:</b> Exception handling in {@code TaskDecorator} implementations 200 * is limited to plain {@code Runnable} execution via {@code execute} calls. 201 * In case of {@code #submit} calls, the exposed {@code Runnable} will be a 202 * {@code FutureTask} which does not propagate any exceptions; you might 203 * have to cast it and call {@code Future#get} to evaluate exceptions. 204 * See the {@code ThreadPoolExecutor#afterExecute} javadoc for an example 205 * of how to access exceptions in such a {@code Future} case. 206 * @since 4.3 207 */ 208 public void setTaskDecorator(TaskDecorator taskDecorator) { 209 this.taskDecorator = taskDecorator; 210 } 211 212 213 /** 214 * Note: This method exposes an {@link ExecutorService} to its base class 215 * but stores the actual {@link ThreadPoolExecutor} handle internally. 216 * Do not override this method for replacing the executor, rather just for 217 * decorating its {@code ExecutorService} handle or storing custom state. 218 */ 219 @Override 220 protected ExecutorService initializeExecutor( 221 ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { 222 223 BlockingQueue<Runnable> queue = createQueue(this.queueCapacity); 224 225 ThreadPoolExecutor executor; 226 if (this.taskDecorator != null) { 227 executor = new ThreadPoolExecutor( 228 this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, 229 queue, threadFactory, rejectedExecutionHandler) { 230 @Override 231 public void execute(Runnable command) { 232 super.execute(taskDecorator.decorate(command)); 233 } 234 }; 235 } 236 else { 237 executor = new ThreadPoolExecutor( 238 this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, 239 queue, threadFactory, rejectedExecutionHandler); 240 241 } 242 243 if (this.allowCoreThreadTimeOut) { 244 executor.allowCoreThreadTimeOut(true); 245 } 246 247 this.threadPoolExecutor = executor; 248 return executor; 249 } 250 251 /** 252 * Create the BlockingQueue to use for the ThreadPoolExecutor. 253 * <p>A LinkedBlockingQueue instance will be created for a positive 254 * capacity value; a SynchronousQueue else. 255 * @param queueCapacity the specified queue capacity 256 * @return the BlockingQueue instance 257 * @see java.util.concurrent.LinkedBlockingQueue 258 * @see java.util.concurrent.SynchronousQueue 259 */ 260 protected BlockingQueue<Runnable> createQueue(int queueCapacity) { 261 if (queueCapacity > 0) { 262 return new LinkedBlockingQueue<Runnable>(queueCapacity); 263 } 264 else { 265 return new SynchronousQueue<Runnable>(); 266 } 267 } 268 269 /** 270 * Return the underlying ThreadPoolExecutor for native access. 271 * @return the underlying ThreadPoolExecutor (never {@code null}) 272 * @throws IllegalStateException if the ThreadPoolTaskExecutor hasn't been initialized yet 273 */ 274 public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException { 275 Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized"); 276 return this.threadPoolExecutor; 277 } 278 279 /** 280 * Return the current pool size. 281 * @see java.util.concurrent.ThreadPoolExecutor#getPoolSize() 282 */ 283 public int getPoolSize() { 284 if (this.threadPoolExecutor == null) { 285 // Not initialized yet: assume core pool size. 286 return this.corePoolSize; 287 } 288 return this.threadPoolExecutor.getPoolSize(); 289 } 290 291 /** 292 * Return the number of currently active threads. 293 * @see java.util.concurrent.ThreadPoolExecutor#getActiveCount() 294 */ 295 public int getActiveCount() { 296 if (this.threadPoolExecutor == null) { 297 // Not initialized yet: assume no active threads. 298 return 0; 299 } 300 return this.threadPoolExecutor.getActiveCount(); 301 } 302 303 304 @Override 305 public void execute(Runnable task) { 306 Executor executor = getThreadPoolExecutor(); 307 try { 308 executor.execute(task); 309 } 310 catch (RejectedExecutionException ex) { 311 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 312 } 313 } 314 315 @Override 316 public void execute(Runnable task, long startTimeout) { 317 execute(task); 318 } 319 320 @Override 321 public Future<?> submit(Runnable task) { 322 ExecutorService executor = getThreadPoolExecutor(); 323 try { 324 return executor.submit(task); 325 } 326 catch (RejectedExecutionException ex) { 327 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 328 } 329 } 330 331 @Override 332 public <T> Future<T> submit(Callable<T> task) { 333 ExecutorService executor = getThreadPoolExecutor(); 334 try { 335 return executor.submit(task); 336 } 337 catch (RejectedExecutionException ex) { 338 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 339 } 340 } 341 342 @Override 343 public ListenableFuture<?> submitListenable(Runnable task) { 344 ExecutorService executor = getThreadPoolExecutor(); 345 try { 346 ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null); 347 executor.execute(future); 348 return future; 349 } 350 catch (RejectedExecutionException ex) { 351 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 352 } 353 } 354 355 @Override 356 public <T> ListenableFuture<T> submitListenable(Callable<T> task) { 357 ExecutorService executor = getThreadPoolExecutor(); 358 try { 359 ListenableFutureTask<T> future = new ListenableFutureTask<T>(task); 360 executor.execute(future); 361 return future; 362 } 363 catch (RejectedExecutionException ex) { 364 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 365 } 366 } 367 368 /** 369 * This task executor prefers short-lived work units. 370 */ 371 @Override 372 public boolean prefersShortLivedTasks() { 373 return true; 374 } 375 376}