001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.scheduling.concurrent; 018 019import java.util.Map; 020import java.util.concurrent.BlockingQueue; 021import java.util.concurrent.Callable; 022import java.util.concurrent.Executor; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.Future; 025import java.util.concurrent.LinkedBlockingQueue; 026import java.util.concurrent.RejectedExecutionException; 027import java.util.concurrent.RejectedExecutionHandler; 028import java.util.concurrent.SynchronousQueue; 029import java.util.concurrent.ThreadFactory; 030import java.util.concurrent.ThreadPoolExecutor; 031import java.util.concurrent.TimeUnit; 032 033import org.springframework.core.task.AsyncListenableTaskExecutor; 034import org.springframework.core.task.TaskDecorator; 035import org.springframework.core.task.TaskRejectedException; 036import org.springframework.lang.Nullable; 037import org.springframework.scheduling.SchedulingTaskExecutor; 038import org.springframework.util.Assert; 039import org.springframework.util.ConcurrentReferenceHashMap; 040import org.springframework.util.concurrent.ListenableFuture; 041import org.springframework.util.concurrent.ListenableFutureTask; 042 043/** 044 * JavaBean that allows for configuring a {@link java.util.concurrent.ThreadPoolExecutor} 045 * in bean style (through its "corePoolSize", "maxPoolSize", "keepAliveSeconds", "queueCapacity" 046 * properties) and exposing it as a Spring {@link org.springframework.core.task.TaskExecutor}. 047 * This class is also well suited for management and monitoring (e.g. through JMX), 048 * providing several useful attributes: "corePoolSize", "maxPoolSize", "keepAliveSeconds" 049 * (all supporting updates at runtime); "poolSize", "activeCount" (for introspection only). 050 * 051 * <p>The default configuration is a core pool size of 1, with unlimited max pool size 052 * and unlimited queue capacity. This is roughly equivalent to 053 * {@link java.util.concurrent.Executors#newSingleThreadExecutor()}, sharing a single 054 * thread for all tasks. Setting {@link #setQueueCapacity "queueCapacity"} to 0 mimics 055 * {@link java.util.concurrent.Executors#newCachedThreadPool()}, with immediate scaling 056 * of threads in the pool to a potentially very high number. Consider also setting a 057 * {@link #setMaxPoolSize "maxPoolSize"} at that point, as well as possibly a higher 058 * {@link #setCorePoolSize "corePoolSize"} (see also the 059 * {@link #setAllowCoreThreadTimeOut "allowCoreThreadTimeOut"} mode of scaling). 060 * 061 * <p><b>NOTE:</b> This class implements Spring's 062 * {@link org.springframework.core.task.TaskExecutor} interface as well as the 063 * {@link java.util.concurrent.Executor} interface, with the former being the primary 064 * interface, the other just serving as secondary convenience. For this reason, the 065 * exception handling follows the TaskExecutor contract rather than the Executor contract, 066 * in particular regarding the {@link org.springframework.core.task.TaskRejectedException}. 067 * 068 * <p>For an alternative, you may set up a ThreadPoolExecutor instance directly using 069 * constructor injection, or use a factory method definition that points to the 070 * {@link java.util.concurrent.Executors} class. To expose such a raw Executor as a 071 * Spring {@link org.springframework.core.task.TaskExecutor}, simply wrap it with a 072 * {@link org.springframework.scheduling.concurrent.ConcurrentTaskExecutor} adapter. 073 * 074 * @author Juergen Hoeller 075 * @since 2.0 076 * @see org.springframework.core.task.TaskExecutor 077 * @see java.util.concurrent.ThreadPoolExecutor 078 * @see ThreadPoolExecutorFactoryBean 079 * @see ConcurrentTaskExecutor 080 */ 081@SuppressWarnings("serial") 082public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport 083 implements AsyncListenableTaskExecutor, SchedulingTaskExecutor { 084 085 private final Object poolSizeMonitor = new Object(); 086 087 private int corePoolSize = 1; 088 089 private int maxPoolSize = Integer.MAX_VALUE; 090 091 private int keepAliveSeconds = 60; 092 093 private int queueCapacity = Integer.MAX_VALUE; 094 095 private boolean allowCoreThreadTimeOut = false; 096 097 @Nullable 098 private TaskDecorator taskDecorator; 099 100 @Nullable 101 private ThreadPoolExecutor threadPoolExecutor; 102 103 // Runnable decorator to user-level FutureTask, if different 104 private final Map<Runnable, Object> decoratedTaskMap = 105 new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); 106 107 108 /** 109 * Set the ThreadPoolExecutor's core pool size. 110 * Default is 1. 111 * <p><b>This setting can be modified at runtime, for example through JMX.</b> 112 */ 113 public void setCorePoolSize(int corePoolSize) { 114 synchronized (this.poolSizeMonitor) { 115 this.corePoolSize = corePoolSize; 116 if (this.threadPoolExecutor != null) { 117 this.threadPoolExecutor.setCorePoolSize(corePoolSize); 118 } 119 } 120 } 121 122 /** 123 * Return the ThreadPoolExecutor's core pool size. 124 */ 125 public int getCorePoolSize() { 126 synchronized (this.poolSizeMonitor) { 127 return this.corePoolSize; 128 } 129 } 130 131 /** 132 * Set the ThreadPoolExecutor's maximum pool size. 133 * Default is {@code Integer.MAX_VALUE}. 134 * <p><b>This setting can be modified at runtime, for example through JMX.</b> 135 */ 136 public void setMaxPoolSize(int maxPoolSize) { 137 synchronized (this.poolSizeMonitor) { 138 this.maxPoolSize = maxPoolSize; 139 if (this.threadPoolExecutor != null) { 140 this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize); 141 } 142 } 143 } 144 145 /** 146 * Return the ThreadPoolExecutor's maximum pool size. 147 */ 148 public int getMaxPoolSize() { 149 synchronized (this.poolSizeMonitor) { 150 return this.maxPoolSize; 151 } 152 } 153 154 /** 155 * Set the ThreadPoolExecutor's keep-alive seconds. 156 * Default is 60. 157 * <p><b>This setting can be modified at runtime, for example through JMX.</b> 158 */ 159 public void setKeepAliveSeconds(int keepAliveSeconds) { 160 synchronized (this.poolSizeMonitor) { 161 this.keepAliveSeconds = keepAliveSeconds; 162 if (this.threadPoolExecutor != null) { 163 this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS); 164 } 165 } 166 } 167 168 /** 169 * Return the ThreadPoolExecutor's keep-alive seconds. 170 */ 171 public int getKeepAliveSeconds() { 172 synchronized (this.poolSizeMonitor) { 173 return this.keepAliveSeconds; 174 } 175 } 176 177 /** 178 * Set the capacity for the ThreadPoolExecutor's BlockingQueue. 179 * Default is {@code Integer.MAX_VALUE}. 180 * <p>Any positive value will lead to a LinkedBlockingQueue instance; 181 * any other value will lead to a SynchronousQueue instance. 182 * @see java.util.concurrent.LinkedBlockingQueue 183 * @see java.util.concurrent.SynchronousQueue 184 */ 185 public void setQueueCapacity(int queueCapacity) { 186 this.queueCapacity = queueCapacity; 187 } 188 189 /** 190 * Specify whether to allow core threads to time out. This enables dynamic 191 * growing and shrinking even in combination with a non-zero queue (since 192 * the max pool size will only grow once the queue is full). 193 * <p>Default is "false". 194 * @see java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean) 195 */ 196 public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) { 197 this.allowCoreThreadTimeOut = allowCoreThreadTimeOut; 198 } 199 200 /** 201 * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable} 202 * about to be executed. 203 * <p>Note that such a decorator is not necessarily being applied to the 204 * user-supplied {@code Runnable}/{@code Callable} but rather to the actual 205 * execution callback (which may be a wrapper around the user-supplied task). 206 * <p>The primary use case is to set some execution context around the task's 207 * invocation, or to provide some monitoring/statistics for task execution. 208 * <p><b>NOTE:</b> Exception handling in {@code TaskDecorator} implementations 209 * is limited to plain {@code Runnable} execution via {@code execute} calls. 210 * In case of {@code #submit} calls, the exposed {@code Runnable} will be a 211 * {@code FutureTask} which does not propagate any exceptions; you might 212 * have to cast it and call {@code Future#get} to evaluate exceptions. 213 * See the {@code ThreadPoolExecutor#afterExecute} javadoc for an example 214 * of how to access exceptions in such a {@code Future} case. 215 * @since 4.3 216 */ 217 public void setTaskDecorator(TaskDecorator taskDecorator) { 218 this.taskDecorator = taskDecorator; 219 } 220 221 222 /** 223 * Note: This method exposes an {@link ExecutorService} to its base class 224 * but stores the actual {@link ThreadPoolExecutor} handle internally. 225 * Do not override this method for replacing the executor, rather just for 226 * decorating its {@code ExecutorService} handle or storing custom state. 227 */ 228 @Override 229 protected ExecutorService initializeExecutor( 230 ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { 231 232 BlockingQueue<Runnable> queue = createQueue(this.queueCapacity); 233 234 ThreadPoolExecutor executor; 235 if (this.taskDecorator != null) { 236 executor = new ThreadPoolExecutor( 237 this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, 238 queue, threadFactory, rejectedExecutionHandler) { 239 @Override 240 public void execute(Runnable command) { 241 Runnable decorated = taskDecorator.decorate(command); 242 if (decorated != command) { 243 decoratedTaskMap.put(decorated, command); 244 } 245 super.execute(decorated); 246 } 247 }; 248 } 249 else { 250 executor = new ThreadPoolExecutor( 251 this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, 252 queue, threadFactory, rejectedExecutionHandler); 253 254 } 255 256 if (this.allowCoreThreadTimeOut) { 257 executor.allowCoreThreadTimeOut(true); 258 } 259 260 this.threadPoolExecutor = executor; 261 return executor; 262 } 263 264 /** 265 * Create the BlockingQueue to use for the ThreadPoolExecutor. 266 * <p>A LinkedBlockingQueue instance will be created for a positive 267 * capacity value; a SynchronousQueue else. 268 * @param queueCapacity the specified queue capacity 269 * @return the BlockingQueue instance 270 * @see java.util.concurrent.LinkedBlockingQueue 271 * @see java.util.concurrent.SynchronousQueue 272 */ 273 protected BlockingQueue<Runnable> createQueue(int queueCapacity) { 274 if (queueCapacity > 0) { 275 return new LinkedBlockingQueue<>(queueCapacity); 276 } 277 else { 278 return new SynchronousQueue<>(); 279 } 280 } 281 282 /** 283 * Return the underlying ThreadPoolExecutor for native access. 284 * @return the underlying ThreadPoolExecutor (never {@code null}) 285 * @throws IllegalStateException if the ThreadPoolTaskExecutor hasn't been initialized yet 286 */ 287 public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException { 288 Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized"); 289 return this.threadPoolExecutor; 290 } 291 292 /** 293 * Return the current pool size. 294 * @see java.util.concurrent.ThreadPoolExecutor#getPoolSize() 295 */ 296 public int getPoolSize() { 297 if (this.threadPoolExecutor == null) { 298 // Not initialized yet: assume core pool size. 299 return this.corePoolSize; 300 } 301 return this.threadPoolExecutor.getPoolSize(); 302 } 303 304 /** 305 * Return the number of currently active threads. 306 * @see java.util.concurrent.ThreadPoolExecutor#getActiveCount() 307 */ 308 public int getActiveCount() { 309 if (this.threadPoolExecutor == null) { 310 // Not initialized yet: assume no active threads. 311 return 0; 312 } 313 return this.threadPoolExecutor.getActiveCount(); 314 } 315 316 317 @Override 318 public void execute(Runnable task) { 319 Executor executor = getThreadPoolExecutor(); 320 try { 321 executor.execute(task); 322 } 323 catch (RejectedExecutionException ex) { 324 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 325 } 326 } 327 328 @Override 329 public void execute(Runnable task, long startTimeout) { 330 execute(task); 331 } 332 333 @Override 334 public Future<?> submit(Runnable task) { 335 ExecutorService executor = getThreadPoolExecutor(); 336 try { 337 return executor.submit(task); 338 } 339 catch (RejectedExecutionException ex) { 340 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 341 } 342 } 343 344 @Override 345 public <T> Future<T> submit(Callable<T> task) { 346 ExecutorService executor = getThreadPoolExecutor(); 347 try { 348 return executor.submit(task); 349 } 350 catch (RejectedExecutionException ex) { 351 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 352 } 353 } 354 355 @Override 356 public ListenableFuture<?> submitListenable(Runnable task) { 357 ExecutorService executor = getThreadPoolExecutor(); 358 try { 359 ListenableFutureTask<Object> future = new ListenableFutureTask<>(task, null); 360 executor.execute(future); 361 return future; 362 } 363 catch (RejectedExecutionException ex) { 364 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 365 } 366 } 367 368 @Override 369 public <T> ListenableFuture<T> submitListenable(Callable<T> task) { 370 ExecutorService executor = getThreadPoolExecutor(); 371 try { 372 ListenableFutureTask<T> future = new ListenableFutureTask<>(task); 373 executor.execute(future); 374 return future; 375 } 376 catch (RejectedExecutionException ex) { 377 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 378 } 379 } 380 381 @Override 382 protected void cancelRemainingTask(Runnable task) { 383 super.cancelRemainingTask(task); 384 // Cancel associated user-level Future handle as well 385 Object original = this.decoratedTaskMap.get(task); 386 if (original instanceof Future) { 387 ((Future<?>) original).cancel(true); 388 } 389 } 390 391}