001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.core.task;
018
019import java.io.Serializable;
020import java.util.concurrent.Callable;
021import java.util.concurrent.Future;
022import java.util.concurrent.FutureTask;
023import java.util.concurrent.ThreadFactory;
024
025import org.springframework.lang.Nullable;
026import org.springframework.util.Assert;
027import org.springframework.util.ConcurrencyThrottleSupport;
028import org.springframework.util.CustomizableThreadCreator;
029import org.springframework.util.concurrent.ListenableFuture;
030import org.springframework.util.concurrent.ListenableFutureTask;
031
032/**
033 * {@link TaskExecutor} implementation that fires up a new Thread for each task,
034 * executing it asynchronously.
035 *
036 * <p>Supports limiting concurrent threads through the "concurrencyLimit"
037 * bean property. By default, the number of concurrent threads is unlimited.
038 *
039 * <p><b>NOTE: This implementation does not reuse threads!</b> Consider a
040 * thread-pooling TaskExecutor implementation instead, in particular for
041 * executing a large number of short-lived tasks.
042 *
043 * @author Juergen Hoeller
044 * @since 2.0
045 * @see #setConcurrencyLimit
046 * @see SyncTaskExecutor
047 * @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
048 * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
049 */
050@SuppressWarnings("serial")
051public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
052                implements AsyncListenableTaskExecutor, Serializable {
053
054        /**
055         * Permit any number of concurrent invocations: that is, don't throttle concurrency.
056         * @see ConcurrencyThrottleSupport#UNBOUNDED_CONCURRENCY
057         */
058        public static final int UNBOUNDED_CONCURRENCY = ConcurrencyThrottleSupport.UNBOUNDED_CONCURRENCY;
059
060        /**
061         * Switch concurrency 'off': that is, don't allow any concurrent invocations.
062         * @see ConcurrencyThrottleSupport#NO_CONCURRENCY
063         */
064        public static final int NO_CONCURRENCY = ConcurrencyThrottleSupport.NO_CONCURRENCY;
065
066
067        /** Internal concurrency throttle used by this executor. */
068        private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();
069
070        @Nullable
071        private ThreadFactory threadFactory;
072
073        @Nullable
074        private TaskDecorator taskDecorator;
075
076
077        /**
078         * Create a new SimpleAsyncTaskExecutor with default thread name prefix.
079         */
080        public SimpleAsyncTaskExecutor() {
081                super();
082        }
083
084        /**
085         * Create a new SimpleAsyncTaskExecutor with the given thread name prefix.
086         * @param threadNamePrefix the prefix to use for the names of newly created threads
087         */
088        public SimpleAsyncTaskExecutor(String threadNamePrefix) {
089                super(threadNamePrefix);
090        }
091
092        /**
093         * Create a new SimpleAsyncTaskExecutor with the given external thread factory.
094         * @param threadFactory the factory to use for creating new Threads
095         */
096        public SimpleAsyncTaskExecutor(ThreadFactory threadFactory) {
097                this.threadFactory = threadFactory;
098        }
099
100
101        /**
102         * Specify an external factory to use for creating new Threads,
103         * instead of relying on the local properties of this executor.
104         * <p>You may specify an inner ThreadFactory bean or also a ThreadFactory reference
105         * obtained from JNDI (on a Java EE 6 server) or some other lookup mechanism.
106         * @see #setThreadNamePrefix
107         * @see #setThreadPriority
108         */
109        public void setThreadFactory(@Nullable ThreadFactory threadFactory) {
110                this.threadFactory = threadFactory;
111        }
112
113        /**
114         * Return the external factory to use for creating new Threads, if any.
115         */
116        @Nullable
117        public final ThreadFactory getThreadFactory() {
118                return this.threadFactory;
119        }
120
121        /**
122         * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
123         * about to be executed.
124         * <p>Note that such a decorator is not necessarily being applied to the
125         * user-supplied {@code Runnable}/{@code Callable} but rather to the actual
126         * execution callback (which may be a wrapper around the user-supplied task).
127         * <p>The primary use case is to set some execution context around the task's
128         * invocation, or to provide some monitoring/statistics for task execution.
129         * <p><b>NOTE:</b> Exception handling in {@code TaskDecorator} implementations
130         * is limited to plain {@code Runnable} execution via {@code execute} calls.
131         * In case of {@code #submit} calls, the exposed {@code Runnable} will be a
132         * {@code FutureTask} which does not propagate any exceptions; you might
133         * have to cast it and call {@code Future#get} to evaluate exceptions.
134         * @since 4.3
135         */
136        public final void setTaskDecorator(TaskDecorator taskDecorator) {
137                this.taskDecorator = taskDecorator;
138        }
139
140        /**
141         * Set the maximum number of parallel accesses allowed.
142         * -1 indicates no concurrency limit at all.
143         * <p>In principle, this limit can be changed at runtime,
144         * although it is generally designed as a config time setting.
145         * NOTE: Do not switch between -1 and any concrete limit at runtime,
146         * as this will lead to inconsistent concurrency counts: A limit
147         * of -1 effectively turns off concurrency counting completely.
148         * @see #UNBOUNDED_CONCURRENCY
149         */
150        public void setConcurrencyLimit(int concurrencyLimit) {
151                this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit);
152        }
153
154        /**
155         * Return the maximum number of parallel accesses allowed.
156         */
157        public final int getConcurrencyLimit() {
158                return this.concurrencyThrottle.getConcurrencyLimit();
159        }
160
161        /**
162         * Return whether this throttle is currently active.
163         * @return {@code true} if the concurrency limit for this instance is active
164         * @see #getConcurrencyLimit()
165         * @see #setConcurrencyLimit
166         */
167        public final boolean isThrottleActive() {
168                return this.concurrencyThrottle.isThrottleActive();
169        }
170
171
172        /**
173         * Executes the given task, within a concurrency throttle
174         * if configured (through the superclass's settings).
175         * @see #doExecute(Runnable)
176         */
177        @Override
178        public void execute(Runnable task) {
179                execute(task, TIMEOUT_INDEFINITE);
180        }
181
182        /**
183         * Executes the given task, within a concurrency throttle
184         * if configured (through the superclass's settings).
185         * <p>Executes urgent tasks (with 'immediate' timeout) directly,
186         * bypassing the concurrency throttle (if active). All other
187         * tasks are subject to throttling.
188         * @see #TIMEOUT_IMMEDIATE
189         * @see #doExecute(Runnable)
190         */
191        @Override
192        public void execute(Runnable task, long startTimeout) {
193                Assert.notNull(task, "Runnable must not be null");
194                Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
195                if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
196                        this.concurrencyThrottle.beforeAccess();
197                        doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
198                }
199                else {
200                        doExecute(taskToUse);
201                }
202        }
203
204        @Override
205        public Future<?> submit(Runnable task) {
206                FutureTask<Object> future = new FutureTask<>(task, null);
207                execute(future, TIMEOUT_INDEFINITE);
208                return future;
209        }
210
211        @Override
212        public <T> Future<T> submit(Callable<T> task) {
213                FutureTask<T> future = new FutureTask<>(task);
214                execute(future, TIMEOUT_INDEFINITE);
215                return future;
216        }
217
218        @Override
219        public ListenableFuture<?> submitListenable(Runnable task) {
220                ListenableFutureTask<Object> future = new ListenableFutureTask<>(task, null);
221                execute(future, TIMEOUT_INDEFINITE);
222                return future;
223        }
224
225        @Override
226        public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
227                ListenableFutureTask<T> future = new ListenableFutureTask<>(task);
228                execute(future, TIMEOUT_INDEFINITE);
229                return future;
230        }
231
232        /**
233         * Template method for the actual execution of a task.
234         * <p>The default implementation creates a new Thread and starts it.
235         * @param task the Runnable to execute
236         * @see #setThreadFactory
237         * @see #createThread
238         * @see java.lang.Thread#start()
239         */
240        protected void doExecute(Runnable task) {
241                Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
242                thread.start();
243        }
244
245
246        /**
247         * Subclass of the general ConcurrencyThrottleSupport class,
248         * making {@code beforeAccess()} and {@code afterAccess()}
249         * visible to the surrounding class.
250         */
251        private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {
252
253                @Override
254                protected void beforeAccess() {
255                        super.beforeAccess();
256                }
257
258                @Override
259                protected void afterAccess() {
260                        super.afterAccess();
261                }
262        }
263
264
265        /**
266         * This Runnable calls {@code afterAccess()} after the
267         * target Runnable has finished its execution.
268         */
269        private class ConcurrencyThrottlingRunnable implements Runnable {
270
271                private final Runnable target;
272
273                public ConcurrencyThrottlingRunnable(Runnable target) {
274                        this.target = target;
275                }
276
277                @Override
278                public void run() {
279                        try {
280                                this.target.run();
281                        }
282                        finally {
283                                concurrencyThrottle.afterAccess();
284                        }
285                }
286        }
287
288}