001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.core.task; 018 019import java.io.Serializable; 020import java.util.concurrent.Callable; 021import java.util.concurrent.Future; 022import java.util.concurrent.FutureTask; 023import java.util.concurrent.ThreadFactory; 024 025import org.springframework.util.Assert; 026import org.springframework.util.ConcurrencyThrottleSupport; 027import org.springframework.util.CustomizableThreadCreator; 028import org.springframework.util.concurrent.ListenableFuture; 029import org.springframework.util.concurrent.ListenableFutureTask; 030 031/** 032 * {@link TaskExecutor} implementation that fires up a new Thread for each task, 033 * executing it asynchronously. 034 * 035 * <p>Supports limiting concurrent threads through the "concurrencyLimit" 036 * bean property. By default, the number of concurrent threads is unlimited. 037 * 038 * <p><b>NOTE: This implementation does not reuse threads!</b> Consider a 039 * thread-pooling TaskExecutor implementation instead, in particular for 040 * executing a large number of short-lived tasks. 041 * 042 * @author Juergen Hoeller 043 * @since 2.0 044 * @see #setConcurrencyLimit 045 * @see SyncTaskExecutor 046 * @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor 047 * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor 048 */ 049@SuppressWarnings("serial") 050public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator 051 implements AsyncListenableTaskExecutor, Serializable { 052 053 /** 054 * Permit any number of concurrent invocations: that is, don't throttle concurrency. 055 * @see ConcurrencyThrottleSupport#UNBOUNDED_CONCURRENCY 056 */ 057 public static final int UNBOUNDED_CONCURRENCY = ConcurrencyThrottleSupport.UNBOUNDED_CONCURRENCY; 058 059 /** 060 * Switch concurrency 'off': that is, don't allow any concurrent invocations. 061 * @see ConcurrencyThrottleSupport#NO_CONCURRENCY 062 */ 063 public static final int NO_CONCURRENCY = ConcurrencyThrottleSupport.NO_CONCURRENCY; 064 065 066 /** Internal concurrency throttle used by this executor */ 067 private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter(); 068 069 private ThreadFactory threadFactory; 070 071 private TaskDecorator taskDecorator; 072 073 074 /** 075 * Create a new SimpleAsyncTaskExecutor with default thread name prefix. 076 */ 077 public SimpleAsyncTaskExecutor() { 078 super(); 079 } 080 081 /** 082 * Create a new SimpleAsyncTaskExecutor with the given thread name prefix. 083 * @param threadNamePrefix the prefix to use for the names of newly created threads 084 */ 085 public SimpleAsyncTaskExecutor(String threadNamePrefix) { 086 super(threadNamePrefix); 087 } 088 089 /** 090 * Create a new SimpleAsyncTaskExecutor with the given external thread factory. 091 * @param threadFactory the factory to use for creating new Threads 092 */ 093 public SimpleAsyncTaskExecutor(ThreadFactory threadFactory) { 094 this.threadFactory = threadFactory; 095 } 096 097 098 /** 099 * Specify an external factory to use for creating new Threads, 100 * instead of relying on the local properties of this executor. 101 * <p>You may specify an inner ThreadFactory bean or also a ThreadFactory reference 102 * obtained from JNDI (on a Java EE 6 server) or some other lookup mechanism. 103 * @see #setThreadNamePrefix 104 * @see #setThreadPriority 105 */ 106 public void setThreadFactory(ThreadFactory threadFactory) { 107 this.threadFactory = threadFactory; 108 } 109 110 /** 111 * Return the external factory to use for creating new Threads, if any. 112 */ 113 public final ThreadFactory getThreadFactory() { 114 return this.threadFactory; 115 } 116 117 /** 118 * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable} 119 * about to be executed. 120 * <p>Note that such a decorator is not necessarily being applied to the 121 * user-supplied {@code Runnable}/{@code Callable} but rather to the actual 122 * execution callback (which may be a wrapper around the user-supplied task). 123 * <p>The primary use case is to set some execution context around the task's 124 * invocation, or to provide some monitoring/statistics for task execution. 125 * <p><b>NOTE:</b> Exception handling in {@code TaskDecorator} implementations 126 * is limited to plain {@code Runnable} execution via {@code execute} calls. 127 * In case of {@code #submit} calls, the exposed {@code Runnable} will be a 128 * {@code FutureTask} which does not propagate any exceptions; you might 129 * have to cast it and call {@code Future#get} to evaluate exceptions. 130 * @since 4.3 131 */ 132 public final void setTaskDecorator(TaskDecorator taskDecorator) { 133 this.taskDecorator = taskDecorator; 134 } 135 136 /** 137 * Set the maximum number of parallel accesses allowed. 138 * -1 indicates no concurrency limit at all. 139 * <p>In principle, this limit can be changed at runtime, 140 * although it is generally designed as a config time setting. 141 * NOTE: Do not switch between -1 and any concrete limit at runtime, 142 * as this will lead to inconsistent concurrency counts: A limit 143 * of -1 effectively turns off concurrency counting completely. 144 * @see #UNBOUNDED_CONCURRENCY 145 */ 146 public void setConcurrencyLimit(int concurrencyLimit) { 147 this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit); 148 } 149 150 /** 151 * Return the maximum number of parallel accesses allowed. 152 */ 153 public final int getConcurrencyLimit() { 154 return this.concurrencyThrottle.getConcurrencyLimit(); 155 } 156 157 /** 158 * Return whether this throttle is currently active. 159 * @return {@code true} if the concurrency limit for this instance is active 160 * @see #getConcurrencyLimit() 161 * @see #setConcurrencyLimit 162 */ 163 public final boolean isThrottleActive() { 164 return this.concurrencyThrottle.isThrottleActive(); 165 } 166 167 168 /** 169 * Executes the given task, within a concurrency throttle 170 * if configured (through the superclass's settings). 171 * @see #doExecute(Runnable) 172 */ 173 @Override 174 public void execute(Runnable task) { 175 execute(task, TIMEOUT_INDEFINITE); 176 } 177 178 /** 179 * Executes the given task, within a concurrency throttle 180 * if configured (through the superclass's settings). 181 * <p>Executes urgent tasks (with 'immediate' timeout) directly, 182 * bypassing the concurrency throttle (if active). All other 183 * tasks are subject to throttling. 184 * @see #TIMEOUT_IMMEDIATE 185 * @see #doExecute(Runnable) 186 */ 187 @Override 188 public void execute(Runnable task, long startTimeout) { 189 Assert.notNull(task, "Runnable must not be null"); 190 Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task); 191 if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) { 192 this.concurrencyThrottle.beforeAccess(); 193 doExecute(new ConcurrencyThrottlingRunnable(taskToUse)); 194 } 195 else { 196 doExecute(taskToUse); 197 } 198 } 199 200 @Override 201 public Future<?> submit(Runnable task) { 202 FutureTask<Object> future = new FutureTask<Object>(task, null); 203 execute(future, TIMEOUT_INDEFINITE); 204 return future; 205 } 206 207 @Override 208 public <T> Future<T> submit(Callable<T> task) { 209 FutureTask<T> future = new FutureTask<T>(task); 210 execute(future, TIMEOUT_INDEFINITE); 211 return future; 212 } 213 214 @Override 215 public ListenableFuture<?> submitListenable(Runnable task) { 216 ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null); 217 execute(future, TIMEOUT_INDEFINITE); 218 return future; 219 } 220 221 @Override 222 public <T> ListenableFuture<T> submitListenable(Callable<T> task) { 223 ListenableFutureTask<T> future = new ListenableFutureTask<T>(task); 224 execute(future, TIMEOUT_INDEFINITE); 225 return future; 226 } 227 228 /** 229 * Template method for the actual execution of a task. 230 * <p>The default implementation creates a new Thread and starts it. 231 * @param task the Runnable to execute 232 * @see #setThreadFactory 233 * @see #createThread 234 * @see java.lang.Thread#start() 235 */ 236 protected void doExecute(Runnable task) { 237 Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task)); 238 thread.start(); 239 } 240 241 242 /** 243 * Subclass of the general ConcurrencyThrottleSupport class, 244 * making {@code beforeAccess()} and {@code afterAccess()} 245 * visible to the surrounding class. 246 */ 247 private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport { 248 249 @Override 250 protected void beforeAccess() { 251 super.beforeAccess(); 252 } 253 254 @Override 255 protected void afterAccess() { 256 super.afterAccess(); 257 } 258 } 259 260 261 /** 262 * This Runnable calls {@code afterAccess()} after the 263 * target Runnable has finished its execution. 264 */ 265 private class ConcurrencyThrottlingRunnable implements Runnable { 266 267 private final Runnable target; 268 269 public ConcurrencyThrottlingRunnable(Runnable target) { 270 this.target = target; 271 } 272 273 @Override 274 public void run() { 275 try { 276 this.target.run(); 277 } 278 finally { 279 concurrencyThrottle.afterAccess(); 280 } 281 } 282 } 283 284}