001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.core.task;
018
019import java.io.Serializable;
020import java.util.concurrent.Callable;
021import java.util.concurrent.Future;
022import java.util.concurrent.FutureTask;
023import java.util.concurrent.ThreadFactory;
024
025import org.springframework.util.Assert;
026import org.springframework.util.ConcurrencyThrottleSupport;
027import org.springframework.util.CustomizableThreadCreator;
028import org.springframework.util.concurrent.ListenableFuture;
029import org.springframework.util.concurrent.ListenableFutureTask;
030
031/**
032 * {@link TaskExecutor} implementation that fires up a new Thread for each task,
033 * executing it asynchronously.
034 *
035 * <p>Supports limiting concurrent threads through the "concurrencyLimit"
036 * bean property. By default, the number of concurrent threads is unlimited.
037 *
038 * <p><b>NOTE: This implementation does not reuse threads!</b> Consider a
039 * thread-pooling TaskExecutor implementation instead, in particular for
040 * executing a large number of short-lived tasks.
041 *
042 * @author Juergen Hoeller
043 * @since 2.0
044 * @see #setConcurrencyLimit
045 * @see SyncTaskExecutor
046 * @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
047 * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
048 */
049@SuppressWarnings("serial")
050public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
051                implements AsyncListenableTaskExecutor, Serializable {
052
053        /**
054         * Permit any number of concurrent invocations: that is, don't throttle concurrency.
055         * @see ConcurrencyThrottleSupport#UNBOUNDED_CONCURRENCY
056         */
057        public static final int UNBOUNDED_CONCURRENCY = ConcurrencyThrottleSupport.UNBOUNDED_CONCURRENCY;
058
059        /**
060         * Switch concurrency 'off': that is, don't allow any concurrent invocations.
061         * @see ConcurrencyThrottleSupport#NO_CONCURRENCY
062         */
063        public static final int NO_CONCURRENCY = ConcurrencyThrottleSupport.NO_CONCURRENCY;
064
065
066        /** Internal concurrency throttle used by this executor */
067        private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();
068
069        private ThreadFactory threadFactory;
070
071        private TaskDecorator taskDecorator;
072
073
074        /**
075         * Create a new SimpleAsyncTaskExecutor with default thread name prefix.
076         */
077        public SimpleAsyncTaskExecutor() {
078                super();
079        }
080
081        /**
082         * Create a new SimpleAsyncTaskExecutor with the given thread name prefix.
083         * @param threadNamePrefix the prefix to use for the names of newly created threads
084         */
085        public SimpleAsyncTaskExecutor(String threadNamePrefix) {
086                super(threadNamePrefix);
087        }
088
089        /**
090         * Create a new SimpleAsyncTaskExecutor with the given external thread factory.
091         * @param threadFactory the factory to use for creating new Threads
092         */
093        public SimpleAsyncTaskExecutor(ThreadFactory threadFactory) {
094                this.threadFactory = threadFactory;
095        }
096
097
098        /**
099         * Specify an external factory to use for creating new Threads,
100         * instead of relying on the local properties of this executor.
101         * <p>You may specify an inner ThreadFactory bean or also a ThreadFactory reference
102         * obtained from JNDI (on a Java EE 6 server) or some other lookup mechanism.
103         * @see #setThreadNamePrefix
104         * @see #setThreadPriority
105         */
106        public void setThreadFactory(ThreadFactory threadFactory) {
107                this.threadFactory = threadFactory;
108        }
109
110        /**
111         * Return the external factory to use for creating new Threads, if any.
112         */
113        public final ThreadFactory getThreadFactory() {
114                return this.threadFactory;
115        }
116
117        /**
118         * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
119         * about to be executed.
120         * <p>Note that such a decorator is not necessarily being applied to the
121         * user-supplied {@code Runnable}/{@code Callable} but rather to the actual
122         * execution callback (which may be a wrapper around the user-supplied task).
123         * <p>The primary use case is to set some execution context around the task's
124         * invocation, or to provide some monitoring/statistics for task execution.
125         * <p><b>NOTE:</b> Exception handling in {@code TaskDecorator} implementations
126         * is limited to plain {@code Runnable} execution via {@code execute} calls.
127         * In case of {@code #submit} calls, the exposed {@code Runnable} will be a
128         * {@code FutureTask} which does not propagate any exceptions; you might
129         * have to cast it and call {@code Future#get} to evaluate exceptions.
130         * @since 4.3
131         */
132        public final void setTaskDecorator(TaskDecorator taskDecorator) {
133                this.taskDecorator = taskDecorator;
134        }
135
136        /**
137         * Set the maximum number of parallel accesses allowed.
138         * -1 indicates no concurrency limit at all.
139         * <p>In principle, this limit can be changed at runtime,
140         * although it is generally designed as a config time setting.
141         * NOTE: Do not switch between -1 and any concrete limit at runtime,
142         * as this will lead to inconsistent concurrency counts: A limit
143         * of -1 effectively turns off concurrency counting completely.
144         * @see #UNBOUNDED_CONCURRENCY
145         */
146        public void setConcurrencyLimit(int concurrencyLimit) {
147                this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit);
148        }
149
150        /**
151         * Return the maximum number of parallel accesses allowed.
152         */
153        public final int getConcurrencyLimit() {
154                return this.concurrencyThrottle.getConcurrencyLimit();
155        }
156
157        /**
158         * Return whether this throttle is currently active.
159         * @return {@code true} if the concurrency limit for this instance is active
160         * @see #getConcurrencyLimit()
161         * @see #setConcurrencyLimit
162         */
163        public final boolean isThrottleActive() {
164                return this.concurrencyThrottle.isThrottleActive();
165        }
166
167
168        /**
169         * Executes the given task, within a concurrency throttle
170         * if configured (through the superclass's settings).
171         * @see #doExecute(Runnable)
172         */
173        @Override
174        public void execute(Runnable task) {
175                execute(task, TIMEOUT_INDEFINITE);
176        }
177
178        /**
179         * Executes the given task, within a concurrency throttle
180         * if configured (through the superclass's settings).
181         * <p>Executes urgent tasks (with 'immediate' timeout) directly,
182         * bypassing the concurrency throttle (if active). All other
183         * tasks are subject to throttling.
184         * @see #TIMEOUT_IMMEDIATE
185         * @see #doExecute(Runnable)
186         */
187        @Override
188        public void execute(Runnable task, long startTimeout) {
189                Assert.notNull(task, "Runnable must not be null");
190                Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
191                if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
192                        this.concurrencyThrottle.beforeAccess();
193                        doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
194                }
195                else {
196                        doExecute(taskToUse);
197                }
198        }
199
200        @Override
201        public Future<?> submit(Runnable task) {
202                FutureTask<Object> future = new FutureTask<Object>(task, null);
203                execute(future, TIMEOUT_INDEFINITE);
204                return future;
205        }
206
207        @Override
208        public <T> Future<T> submit(Callable<T> task) {
209                FutureTask<T> future = new FutureTask<T>(task);
210                execute(future, TIMEOUT_INDEFINITE);
211                return future;
212        }
213
214        @Override
215        public ListenableFuture<?> submitListenable(Runnable task) {
216                ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null);
217                execute(future, TIMEOUT_INDEFINITE);
218                return future;
219        }
220
221        @Override
222        public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
223                ListenableFutureTask<T> future = new ListenableFutureTask<T>(task);
224                execute(future, TIMEOUT_INDEFINITE);
225                return future;
226        }
227
228        /**
229         * Template method for the actual execution of a task.
230         * <p>The default implementation creates a new Thread and starts it.
231         * @param task the Runnable to execute
232         * @see #setThreadFactory
233         * @see #createThread
234         * @see java.lang.Thread#start()
235         */
236        protected void doExecute(Runnable task) {
237                Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
238                thread.start();
239        }
240
241
242        /**
243         * Subclass of the general ConcurrencyThrottleSupport class,
244         * making {@code beforeAccess()} and {@code afterAccess()}
245         * visible to the surrounding class.
246         */
247        private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {
248
249                @Override
250                protected void beforeAccess() {
251                        super.beforeAccess();
252                }
253
254                @Override
255                protected void afterAccess() {
256                        super.afterAccess();
257                }
258        }
259
260
261        /**
262         * This Runnable calls {@code afterAccess()} after the
263         * target Runnable has finished its execution.
264         */
265        private class ConcurrencyThrottlingRunnable implements Runnable {
266
267                private final Runnable target;
268
269                public ConcurrencyThrottlingRunnable(Runnable target) {
270                        this.target = target;
271                }
272
273                @Override
274                public void run() {
275                        try {
276                                this.target.run();
277                        }
278                        finally {
279                                concurrencyThrottle.afterAccess();
280                        }
281                }
282        }
283
284}