001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.core.task; 018 019import java.io.Serializable; 020import java.util.concurrent.Callable; 021import java.util.concurrent.Future; 022import java.util.concurrent.FutureTask; 023import java.util.concurrent.ThreadFactory; 024 025import org.springframework.lang.Nullable; 026import org.springframework.util.Assert; 027import org.springframework.util.ConcurrencyThrottleSupport; 028import org.springframework.util.CustomizableThreadCreator; 029import org.springframework.util.concurrent.ListenableFuture; 030import org.springframework.util.concurrent.ListenableFutureTask; 031 032/** 033 * {@link TaskExecutor} implementation that fires up a new Thread for each task, 034 * executing it asynchronously. 035 * 036 * <p>Supports limiting concurrent threads through the "concurrencyLimit" 037 * bean property. By default, the number of concurrent threads is unlimited. 038 * 039 * <p><b>NOTE: This implementation does not reuse threads!</b> Consider a 040 * thread-pooling TaskExecutor implementation instead, in particular for 041 * executing a large number of short-lived tasks. 042 * 043 * @author Juergen Hoeller 044 * @since 2.0 045 * @see #setConcurrencyLimit 046 * @see SyncTaskExecutor 047 * @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor 048 * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor 049 */ 050@SuppressWarnings("serial") 051public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator 052 implements AsyncListenableTaskExecutor, Serializable { 053 054 /** 055 * Permit any number of concurrent invocations: that is, don't throttle concurrency. 056 * @see ConcurrencyThrottleSupport#UNBOUNDED_CONCURRENCY 057 */ 058 public static final int UNBOUNDED_CONCURRENCY = ConcurrencyThrottleSupport.UNBOUNDED_CONCURRENCY; 059 060 /** 061 * Switch concurrency 'off': that is, don't allow any concurrent invocations. 062 * @see ConcurrencyThrottleSupport#NO_CONCURRENCY 063 */ 064 public static final int NO_CONCURRENCY = ConcurrencyThrottleSupport.NO_CONCURRENCY; 065 066 067 /** Internal concurrency throttle used by this executor. */ 068 private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter(); 069 070 @Nullable 071 private ThreadFactory threadFactory; 072 073 @Nullable 074 private TaskDecorator taskDecorator; 075 076 077 /** 078 * Create a new SimpleAsyncTaskExecutor with default thread name prefix. 079 */ 080 public SimpleAsyncTaskExecutor() { 081 super(); 082 } 083 084 /** 085 * Create a new SimpleAsyncTaskExecutor with the given thread name prefix. 086 * @param threadNamePrefix the prefix to use for the names of newly created threads 087 */ 088 public SimpleAsyncTaskExecutor(String threadNamePrefix) { 089 super(threadNamePrefix); 090 } 091 092 /** 093 * Create a new SimpleAsyncTaskExecutor with the given external thread factory. 094 * @param threadFactory the factory to use for creating new Threads 095 */ 096 public SimpleAsyncTaskExecutor(ThreadFactory threadFactory) { 097 this.threadFactory = threadFactory; 098 } 099 100 101 /** 102 * Specify an external factory to use for creating new Threads, 103 * instead of relying on the local properties of this executor. 104 * <p>You may specify an inner ThreadFactory bean or also a ThreadFactory reference 105 * obtained from JNDI (on a Java EE 6 server) or some other lookup mechanism. 106 * @see #setThreadNamePrefix 107 * @see #setThreadPriority 108 */ 109 public void setThreadFactory(@Nullable ThreadFactory threadFactory) { 110 this.threadFactory = threadFactory; 111 } 112 113 /** 114 * Return the external factory to use for creating new Threads, if any. 115 */ 116 @Nullable 117 public final ThreadFactory getThreadFactory() { 118 return this.threadFactory; 119 } 120 121 /** 122 * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable} 123 * about to be executed. 124 * <p>Note that such a decorator is not necessarily being applied to the 125 * user-supplied {@code Runnable}/{@code Callable} but rather to the actual 126 * execution callback (which may be a wrapper around the user-supplied task). 127 * <p>The primary use case is to set some execution context around the task's 128 * invocation, or to provide some monitoring/statistics for task execution. 129 * <p><b>NOTE:</b> Exception handling in {@code TaskDecorator} implementations 130 * is limited to plain {@code Runnable} execution via {@code execute} calls. 131 * In case of {@code #submit} calls, the exposed {@code Runnable} will be a 132 * {@code FutureTask} which does not propagate any exceptions; you might 133 * have to cast it and call {@code Future#get} to evaluate exceptions. 134 * @since 4.3 135 */ 136 public final void setTaskDecorator(TaskDecorator taskDecorator) { 137 this.taskDecorator = taskDecorator; 138 } 139 140 /** 141 * Set the maximum number of parallel accesses allowed. 142 * -1 indicates no concurrency limit at all. 143 * <p>In principle, this limit can be changed at runtime, 144 * although it is generally designed as a config time setting. 145 * NOTE: Do not switch between -1 and any concrete limit at runtime, 146 * as this will lead to inconsistent concurrency counts: A limit 147 * of -1 effectively turns off concurrency counting completely. 148 * @see #UNBOUNDED_CONCURRENCY 149 */ 150 public void setConcurrencyLimit(int concurrencyLimit) { 151 this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit); 152 } 153 154 /** 155 * Return the maximum number of parallel accesses allowed. 156 */ 157 public final int getConcurrencyLimit() { 158 return this.concurrencyThrottle.getConcurrencyLimit(); 159 } 160 161 /** 162 * Return whether this throttle is currently active. 163 * @return {@code true} if the concurrency limit for this instance is active 164 * @see #getConcurrencyLimit() 165 * @see #setConcurrencyLimit 166 */ 167 public final boolean isThrottleActive() { 168 return this.concurrencyThrottle.isThrottleActive(); 169 } 170 171 172 /** 173 * Executes the given task, within a concurrency throttle 174 * if configured (through the superclass's settings). 175 * @see #doExecute(Runnable) 176 */ 177 @Override 178 public void execute(Runnable task) { 179 execute(task, TIMEOUT_INDEFINITE); 180 } 181 182 /** 183 * Executes the given task, within a concurrency throttle 184 * if configured (through the superclass's settings). 185 * <p>Executes urgent tasks (with 'immediate' timeout) directly, 186 * bypassing the concurrency throttle (if active). All other 187 * tasks are subject to throttling. 188 * @see #TIMEOUT_IMMEDIATE 189 * @see #doExecute(Runnable) 190 */ 191 @Override 192 public void execute(Runnable task, long startTimeout) { 193 Assert.notNull(task, "Runnable must not be null"); 194 Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task); 195 if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) { 196 this.concurrencyThrottle.beforeAccess(); 197 doExecute(new ConcurrencyThrottlingRunnable(taskToUse)); 198 } 199 else { 200 doExecute(taskToUse); 201 } 202 } 203 204 @Override 205 public Future<?> submit(Runnable task) { 206 FutureTask<Object> future = new FutureTask<>(task, null); 207 execute(future, TIMEOUT_INDEFINITE); 208 return future; 209 } 210 211 @Override 212 public <T> Future<T> submit(Callable<T> task) { 213 FutureTask<T> future = new FutureTask<>(task); 214 execute(future, TIMEOUT_INDEFINITE); 215 return future; 216 } 217 218 @Override 219 public ListenableFuture<?> submitListenable(Runnable task) { 220 ListenableFutureTask<Object> future = new ListenableFutureTask<>(task, null); 221 execute(future, TIMEOUT_INDEFINITE); 222 return future; 223 } 224 225 @Override 226 public <T> ListenableFuture<T> submitListenable(Callable<T> task) { 227 ListenableFutureTask<T> future = new ListenableFutureTask<>(task); 228 execute(future, TIMEOUT_INDEFINITE); 229 return future; 230 } 231 232 /** 233 * Template method for the actual execution of a task. 234 * <p>The default implementation creates a new Thread and starts it. 235 * @param task the Runnable to execute 236 * @see #setThreadFactory 237 * @see #createThread 238 * @see java.lang.Thread#start() 239 */ 240 protected void doExecute(Runnable task) { 241 Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task)); 242 thread.start(); 243 } 244 245 246 /** 247 * Subclass of the general ConcurrencyThrottleSupport class, 248 * making {@code beforeAccess()} and {@code afterAccess()} 249 * visible to the surrounding class. 250 */ 251 private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport { 252 253 @Override 254 protected void beforeAccess() { 255 super.beforeAccess(); 256 } 257 258 @Override 259 protected void afterAccess() { 260 super.afterAccess(); 261 } 262 } 263 264 265 /** 266 * This Runnable calls {@code afterAccess()} after the 267 * target Runnable has finished its execution. 268 */ 269 private class ConcurrencyThrottlingRunnable implements Runnable { 270 271 private final Runnable target; 272 273 public ConcurrencyThrottlingRunnable(Runnable target) { 274 this.target = target; 275 } 276 277 @Override 278 public void run() { 279 try { 280 this.target.run(); 281 } 282 finally { 283 concurrencyThrottle.afterAccess(); 284 } 285 } 286 } 287 288}