001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.scheduling.concurrent;
018
019import java.util.concurrent.BlockingQueue;
020import java.util.concurrent.Callable;
021import java.util.concurrent.Executor;
022import java.util.concurrent.ExecutorService;
023import java.util.concurrent.Future;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.RejectedExecutionException;
026import java.util.concurrent.RejectedExecutionHandler;
027import java.util.concurrent.SynchronousQueue;
028import java.util.concurrent.ThreadFactory;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031
032import org.springframework.core.task.AsyncListenableTaskExecutor;
033import org.springframework.core.task.TaskDecorator;
034import org.springframework.core.task.TaskRejectedException;
035import org.springframework.scheduling.SchedulingTaskExecutor;
036import org.springframework.util.Assert;
037import org.springframework.util.concurrent.ListenableFuture;
038import org.springframework.util.concurrent.ListenableFutureTask;
039
040/**
041 * JavaBean that allows for configuring a {@link java.util.concurrent.ThreadPoolExecutor}
042 * in bean style (through its "corePoolSize", "maxPoolSize", "keepAliveSeconds", "queueCapacity"
043 * properties) and exposing it as a Spring {@link org.springframework.core.task.TaskExecutor}.
044 * This class is also well suited for management and monitoring (e.g. through JMX),
045 * providing several useful attributes: "corePoolSize", "maxPoolSize", "keepAliveSeconds"
046 * (all supporting updates at runtime); "poolSize", "activeCount" (for introspection only).
047 *
048 * <p>The default configuration is a core pool size of 1, with unlimited max pool size
049 * and unlimited queue capacity. This is roughly equivalent to
050 * {@link java.util.concurrent.Executors#newSingleThreadExecutor()}, sharing a single
051 * thread for all tasks. Setting {@link #setQueueCapacity "queueCapacity"} to 0 mimics
052 * {@link java.util.concurrent.Executors#newCachedThreadPool()}, with immediate scaling
053 * of threads in the pool to a potentially very high number. Consider also setting a
054 * {@link #setMaxPoolSize "maxPoolSize"} at that point, as well as possibly a higher
055 * {@link #setCorePoolSize "corePoolSize"} (see also the
056 * {@link #setAllowCoreThreadTimeOut "allowCoreThreadTimeOut"} mode of scaling).
057 *
058 * <p><b>NOTE:</b> This class implements Spring's
059 * {@link org.springframework.core.task.TaskExecutor} interface as well as the
060 * {@link java.util.concurrent.Executor} interface, with the former being the primary
061 * interface, the other just serving as secondary convenience. For this reason, the
062 * exception handling follows the TaskExecutor contract rather than the Executor contract,
063 * in particular regarding the {@link org.springframework.core.task.TaskRejectedException}.
064 *
065 * <p>For an alternative, you may set up a ThreadPoolExecutor instance directly using
066 * constructor injection, or use a factory method definition that points to the
067 * {@link java.util.concurrent.Executors} class. To expose such a raw Executor as a
068 * Spring {@link org.springframework.core.task.TaskExecutor}, simply wrap it with a
069 * {@link org.springframework.scheduling.concurrent.ConcurrentTaskExecutor} adapter.
070 *
071 * @author Juergen Hoeller
072 * @since 2.0
073 * @see org.springframework.core.task.TaskExecutor
074 * @see java.util.concurrent.ThreadPoolExecutor
075 * @see ThreadPoolExecutorFactoryBean
076 * @see ConcurrentTaskExecutor
077 */
078@SuppressWarnings("serial")
079public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
080                implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
081
082        private final Object poolSizeMonitor = new Object();
083
084        private int corePoolSize = 1;
085
086        private int maxPoolSize = Integer.MAX_VALUE;
087
088        private int keepAliveSeconds = 60;
089
090        private int queueCapacity = Integer.MAX_VALUE;
091
092        private boolean allowCoreThreadTimeOut = false;
093
094        private TaskDecorator taskDecorator;
095
096        private ThreadPoolExecutor threadPoolExecutor;
097
098
099        /**
100         * Set the ThreadPoolExecutor's core pool size.
101         * Default is 1.
102         * <p><b>This setting can be modified at runtime, for example through JMX.</b>
103         */
104        public void setCorePoolSize(int corePoolSize) {
105                synchronized (this.poolSizeMonitor) {
106                        this.corePoolSize = corePoolSize;
107                        if (this.threadPoolExecutor != null) {
108                                this.threadPoolExecutor.setCorePoolSize(corePoolSize);
109                        }
110                }
111        }
112
113        /**
114         * Return the ThreadPoolExecutor's core pool size.
115         */
116        public int getCorePoolSize() {
117                synchronized (this.poolSizeMonitor) {
118                        return this.corePoolSize;
119                }
120        }
121
122        /**
123         * Set the ThreadPoolExecutor's maximum pool size.
124         * Default is {@code Integer.MAX_VALUE}.
125         * <p><b>This setting can be modified at runtime, for example through JMX.</b>
126         */
127        public void setMaxPoolSize(int maxPoolSize) {
128                synchronized (this.poolSizeMonitor) {
129                        this.maxPoolSize = maxPoolSize;
130                        if (this.threadPoolExecutor != null) {
131                                this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
132                        }
133                }
134        }
135
136        /**
137         * Return the ThreadPoolExecutor's maximum pool size.
138         */
139        public int getMaxPoolSize() {
140                synchronized (this.poolSizeMonitor) {
141                        return this.maxPoolSize;
142                }
143        }
144
145        /**
146         * Set the ThreadPoolExecutor's keep-alive seconds.
147         * Default is 60.
148         * <p><b>This setting can be modified at runtime, for example through JMX.</b>
149         */
150        public void setKeepAliveSeconds(int keepAliveSeconds) {
151                synchronized (this.poolSizeMonitor) {
152                        this.keepAliveSeconds = keepAliveSeconds;
153                        if (this.threadPoolExecutor != null) {
154                                this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS);
155                        }
156                }
157        }
158
159        /**
160         * Return the ThreadPoolExecutor's keep-alive seconds.
161         */
162        public int getKeepAliveSeconds() {
163                synchronized (this.poolSizeMonitor) {
164                        return this.keepAliveSeconds;
165                }
166        }
167
168        /**
169         * Set the capacity for the ThreadPoolExecutor's BlockingQueue.
170         * Default is {@code Integer.MAX_VALUE}.
171         * <p>Any positive value will lead to a LinkedBlockingQueue instance;
172         * any other value will lead to a SynchronousQueue instance.
173         * @see java.util.concurrent.LinkedBlockingQueue
174         * @see java.util.concurrent.SynchronousQueue
175         */
176        public void setQueueCapacity(int queueCapacity) {
177                this.queueCapacity = queueCapacity;
178        }
179
180        /**
181         * Specify whether to allow core threads to time out. This enables dynamic
182         * growing and shrinking even in combination with a non-zero queue (since
183         * the max pool size will only grow once the queue is full).
184         * <p>Default is "false".
185         * @see java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean)
186         */
187        public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
188                this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
189        }
190
191        /**
192         * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
193         * about to be executed.
194         * <p>Note that such a decorator is not necessarily being applied to the
195         * user-supplied {@code Runnable}/{@code Callable} but rather to the actual
196         * execution callback (which may be a wrapper around the user-supplied task).
197         * <p>The primary use case is to set some execution context around the task's
198         * invocation, or to provide some monitoring/statistics for task execution.
199         * <p><b>NOTE:</b> Exception handling in {@code TaskDecorator} implementations
200         * is limited to plain {@code Runnable} execution via {@code execute} calls.
201         * In case of {@code #submit} calls, the exposed {@code Runnable} will be a
202         * {@code FutureTask} which does not propagate any exceptions; you might
203         * have to cast it and call {@code Future#get} to evaluate exceptions.
204         * See the {@code ThreadPoolExecutor#afterExecute} javadoc for an example
205         * of how to access exceptions in such a {@code Future} case.
206         * @since 4.3
207         */
208        public void setTaskDecorator(TaskDecorator taskDecorator) {
209                this.taskDecorator = taskDecorator;
210        }
211
212
213        /**
214         * Note: This method exposes an {@link ExecutorService} to its base class
215         * but stores the actual {@link ThreadPoolExecutor} handle internally.
216         * Do not override this method for replacing the executor, rather just for
217         * decorating its {@code ExecutorService} handle or storing custom state.
218         */
219        @Override
220        protected ExecutorService initializeExecutor(
221                        ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
222
223                BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
224
225                ThreadPoolExecutor executor;
226                if (this.taskDecorator != null) {
227                        executor = new ThreadPoolExecutor(
228                                        this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
229                                        queue, threadFactory, rejectedExecutionHandler) {
230                                @Override
231                                public void execute(Runnable command) {
232                                        super.execute(taskDecorator.decorate(command));
233                                }
234                        };
235                }
236                else {
237                        executor = new ThreadPoolExecutor(
238                                        this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
239                                        queue, threadFactory, rejectedExecutionHandler);
240
241                }
242
243                if (this.allowCoreThreadTimeOut) {
244                        executor.allowCoreThreadTimeOut(true);
245                }
246
247                this.threadPoolExecutor = executor;
248                return executor;
249        }
250
251        /**
252         * Create the BlockingQueue to use for the ThreadPoolExecutor.
253         * <p>A LinkedBlockingQueue instance will be created for a positive
254         * capacity value; a SynchronousQueue else.
255         * @param queueCapacity the specified queue capacity
256         * @return the BlockingQueue instance
257         * @see java.util.concurrent.LinkedBlockingQueue
258         * @see java.util.concurrent.SynchronousQueue
259         */
260        protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
261                if (queueCapacity > 0) {
262                        return new LinkedBlockingQueue<Runnable>(queueCapacity);
263                }
264                else {
265                        return new SynchronousQueue<Runnable>();
266                }
267        }
268
269        /**
270         * Return the underlying ThreadPoolExecutor for native access.
271         * @return the underlying ThreadPoolExecutor (never {@code null})
272         * @throws IllegalStateException if the ThreadPoolTaskExecutor hasn't been initialized yet
273         */
274        public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
275                Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
276                return this.threadPoolExecutor;
277        }
278
279        /**
280         * Return the current pool size.
281         * @see java.util.concurrent.ThreadPoolExecutor#getPoolSize()
282         */
283        public int getPoolSize() {
284                if (this.threadPoolExecutor == null) {
285                        // Not initialized yet: assume core pool size.
286                        return this.corePoolSize;
287                }
288                return this.threadPoolExecutor.getPoolSize();
289        }
290
291        /**
292         * Return the number of currently active threads.
293         * @see java.util.concurrent.ThreadPoolExecutor#getActiveCount()
294         */
295        public int getActiveCount() {
296                if (this.threadPoolExecutor == null) {
297                        // Not initialized yet: assume no active threads.
298                        return 0;
299                }
300                return this.threadPoolExecutor.getActiveCount();
301        }
302
303
304        @Override
305        public void execute(Runnable task) {
306                Executor executor = getThreadPoolExecutor();
307                try {
308                        executor.execute(task);
309                }
310                catch (RejectedExecutionException ex) {
311                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
312                }
313        }
314
315        @Override
316        public void execute(Runnable task, long startTimeout) {
317                execute(task);
318        }
319
320        @Override
321        public Future<?> submit(Runnable task) {
322                ExecutorService executor = getThreadPoolExecutor();
323                try {
324                        return executor.submit(task);
325                }
326                catch (RejectedExecutionException ex) {
327                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
328                }
329        }
330
331        @Override
332        public <T> Future<T> submit(Callable<T> task) {
333                ExecutorService executor = getThreadPoolExecutor();
334                try {
335                        return executor.submit(task);
336                }
337                catch (RejectedExecutionException ex) {
338                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
339                }
340        }
341
342        @Override
343        public ListenableFuture<?> submitListenable(Runnable task) {
344                ExecutorService executor = getThreadPoolExecutor();
345                try {
346                        ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null);
347                        executor.execute(future);
348                        return future;
349                }
350                catch (RejectedExecutionException ex) {
351                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
352                }
353        }
354
355        @Override
356        public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
357                ExecutorService executor = getThreadPoolExecutor();
358                try {
359                        ListenableFutureTask<T> future = new ListenableFutureTask<T>(task);
360                        executor.execute(future);
361                        return future;
362                }
363                catch (RejectedExecutionException ex) {
364                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
365                }
366        }
367
368        /**
369         * This task executor prefers short-lived work units.
370         */
371        @Override
372        public boolean prefersShortLivedTasks() {
373                return true;
374        }
375
376}