001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.scheduling.concurrent;
018
019import java.util.Map;
020import java.util.concurrent.BlockingQueue;
021import java.util.concurrent.Callable;
022import java.util.concurrent.Executor;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.Future;
025import java.util.concurrent.LinkedBlockingQueue;
026import java.util.concurrent.RejectedExecutionException;
027import java.util.concurrent.RejectedExecutionHandler;
028import java.util.concurrent.SynchronousQueue;
029import java.util.concurrent.ThreadFactory;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.TimeUnit;
032
033import org.springframework.core.task.AsyncListenableTaskExecutor;
034import org.springframework.core.task.TaskDecorator;
035import org.springframework.core.task.TaskRejectedException;
036import org.springframework.lang.Nullable;
037import org.springframework.scheduling.SchedulingTaskExecutor;
038import org.springframework.util.Assert;
039import org.springframework.util.ConcurrentReferenceHashMap;
040import org.springframework.util.concurrent.ListenableFuture;
041import org.springframework.util.concurrent.ListenableFutureTask;
042
043/**
044 * JavaBean that allows for configuring a {@link java.util.concurrent.ThreadPoolExecutor}
045 * in bean style (through its "corePoolSize", "maxPoolSize", "keepAliveSeconds", "queueCapacity"
046 * properties) and exposing it as a Spring {@link org.springframework.core.task.TaskExecutor}.
047 * This class is also well suited for management and monitoring (e.g. through JMX),
048 * providing several useful attributes: "corePoolSize", "maxPoolSize", "keepAliveSeconds"
049 * (all supporting updates at runtime); "poolSize", "activeCount" (for introspection only).
050 *
051 * <p>The default configuration is a core pool size of 1, with unlimited max pool size
052 * and unlimited queue capacity. This is roughly equivalent to
053 * {@link java.util.concurrent.Executors#newSingleThreadExecutor()}, sharing a single
054 * thread for all tasks. Setting {@link #setQueueCapacity "queueCapacity"} to 0 mimics
055 * {@link java.util.concurrent.Executors#newCachedThreadPool()}, with immediate scaling
056 * of threads in the pool to a potentially very high number. Consider also setting a
057 * {@link #setMaxPoolSize "maxPoolSize"} at that point, as well as possibly a higher
058 * {@link #setCorePoolSize "corePoolSize"} (see also the
059 * {@link #setAllowCoreThreadTimeOut "allowCoreThreadTimeOut"} mode of scaling).
060 *
061 * <p><b>NOTE:</b> This class implements Spring's
062 * {@link org.springframework.core.task.TaskExecutor} interface as well as the
063 * {@link java.util.concurrent.Executor} interface, with the former being the primary
064 * interface, the other just serving as secondary convenience. For this reason, the
065 * exception handling follows the TaskExecutor contract rather than the Executor contract,
066 * in particular regarding the {@link org.springframework.core.task.TaskRejectedException}.
067 *
068 * <p>For an alternative, you may set up a ThreadPoolExecutor instance directly using
069 * constructor injection, or use a factory method definition that points to the
070 * {@link java.util.concurrent.Executors} class. To expose such a raw Executor as a
071 * Spring {@link org.springframework.core.task.TaskExecutor}, simply wrap it with a
072 * {@link org.springframework.scheduling.concurrent.ConcurrentTaskExecutor} adapter.
073 *
074 * @author Juergen Hoeller
075 * @since 2.0
076 * @see org.springframework.core.task.TaskExecutor
077 * @see java.util.concurrent.ThreadPoolExecutor
078 * @see ThreadPoolExecutorFactoryBean
079 * @see ConcurrentTaskExecutor
080 */
081@SuppressWarnings("serial")
082public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
083                implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
084
085        private final Object poolSizeMonitor = new Object();
086
087        private int corePoolSize = 1;
088
089        private int maxPoolSize = Integer.MAX_VALUE;
090
091        private int keepAliveSeconds = 60;
092
093        private int queueCapacity = Integer.MAX_VALUE;
094
095        private boolean allowCoreThreadTimeOut = false;
096
097        @Nullable
098        private TaskDecorator taskDecorator;
099
100        @Nullable
101        private ThreadPoolExecutor threadPoolExecutor;
102
103        // Runnable decorator to user-level FutureTask, if different
104        private final Map<Runnable, Object> decoratedTaskMap =
105                        new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
106
107
108        /**
109         * Set the ThreadPoolExecutor's core pool size.
110         * Default is 1.
111         * <p><b>This setting can be modified at runtime, for example through JMX.</b>
112         */
113        public void setCorePoolSize(int corePoolSize) {
114                synchronized (this.poolSizeMonitor) {
115                        this.corePoolSize = corePoolSize;
116                        if (this.threadPoolExecutor != null) {
117                                this.threadPoolExecutor.setCorePoolSize(corePoolSize);
118                        }
119                }
120        }
121
122        /**
123         * Return the ThreadPoolExecutor's core pool size.
124         */
125        public int getCorePoolSize() {
126                synchronized (this.poolSizeMonitor) {
127                        return this.corePoolSize;
128                }
129        }
130
131        /**
132         * Set the ThreadPoolExecutor's maximum pool size.
133         * Default is {@code Integer.MAX_VALUE}.
134         * <p><b>This setting can be modified at runtime, for example through JMX.</b>
135         */
136        public void setMaxPoolSize(int maxPoolSize) {
137                synchronized (this.poolSizeMonitor) {
138                        this.maxPoolSize = maxPoolSize;
139                        if (this.threadPoolExecutor != null) {
140                                this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
141                        }
142                }
143        }
144
145        /**
146         * Return the ThreadPoolExecutor's maximum pool size.
147         */
148        public int getMaxPoolSize() {
149                synchronized (this.poolSizeMonitor) {
150                        return this.maxPoolSize;
151                }
152        }
153
154        /**
155         * Set the ThreadPoolExecutor's keep-alive seconds.
156         * Default is 60.
157         * <p><b>This setting can be modified at runtime, for example through JMX.</b>
158         */
159        public void setKeepAliveSeconds(int keepAliveSeconds) {
160                synchronized (this.poolSizeMonitor) {
161                        this.keepAliveSeconds = keepAliveSeconds;
162                        if (this.threadPoolExecutor != null) {
163                                this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS);
164                        }
165                }
166        }
167
168        /**
169         * Return the ThreadPoolExecutor's keep-alive seconds.
170         */
171        public int getKeepAliveSeconds() {
172                synchronized (this.poolSizeMonitor) {
173                        return this.keepAliveSeconds;
174                }
175        }
176
177        /**
178         * Set the capacity for the ThreadPoolExecutor's BlockingQueue.
179         * Default is {@code Integer.MAX_VALUE}.
180         * <p>Any positive value will lead to a LinkedBlockingQueue instance;
181         * any other value will lead to a SynchronousQueue instance.
182         * @see java.util.concurrent.LinkedBlockingQueue
183         * @see java.util.concurrent.SynchronousQueue
184         */
185        public void setQueueCapacity(int queueCapacity) {
186                this.queueCapacity = queueCapacity;
187        }
188
189        /**
190         * Specify whether to allow core threads to time out. This enables dynamic
191         * growing and shrinking even in combination with a non-zero queue (since
192         * the max pool size will only grow once the queue is full).
193         * <p>Default is "false".
194         * @see java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean)
195         */
196        public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
197                this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
198        }
199
200        /**
201         * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
202         * about to be executed.
203         * <p>Note that such a decorator is not necessarily being applied to the
204         * user-supplied {@code Runnable}/{@code Callable} but rather to the actual
205         * execution callback (which may be a wrapper around the user-supplied task).
206         * <p>The primary use case is to set some execution context around the task's
207         * invocation, or to provide some monitoring/statistics for task execution.
208         * <p><b>NOTE:</b> Exception handling in {@code TaskDecorator} implementations
209         * is limited to plain {@code Runnable} execution via {@code execute} calls.
210         * In case of {@code #submit} calls, the exposed {@code Runnable} will be a
211         * {@code FutureTask} which does not propagate any exceptions; you might
212         * have to cast it and call {@code Future#get} to evaluate exceptions.
213         * See the {@code ThreadPoolExecutor#afterExecute} javadoc for an example
214         * of how to access exceptions in such a {@code Future} case.
215         * @since 4.3
216         */
217        public void setTaskDecorator(TaskDecorator taskDecorator) {
218                this.taskDecorator = taskDecorator;
219        }
220
221
222        /**
223         * Note: This method exposes an {@link ExecutorService} to its base class
224         * but stores the actual {@link ThreadPoolExecutor} handle internally.
225         * Do not override this method for replacing the executor, rather just for
226         * decorating its {@code ExecutorService} handle or storing custom state.
227         */
228        @Override
229        protected ExecutorService initializeExecutor(
230                        ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
231
232                BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
233
234                ThreadPoolExecutor executor;
235                if (this.taskDecorator != null) {
236                        executor = new ThreadPoolExecutor(
237                                        this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
238                                        queue, threadFactory, rejectedExecutionHandler) {
239                                @Override
240                                public void execute(Runnable command) {
241                                        Runnable decorated = taskDecorator.decorate(command);
242                                        if (decorated != command) {
243                                                decoratedTaskMap.put(decorated, command);
244                                        }
245                                        super.execute(decorated);
246                                }
247                        };
248                }
249                else {
250                        executor = new ThreadPoolExecutor(
251                                        this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
252                                        queue, threadFactory, rejectedExecutionHandler);
253
254                }
255
256                if (this.allowCoreThreadTimeOut) {
257                        executor.allowCoreThreadTimeOut(true);
258                }
259
260                this.threadPoolExecutor = executor;
261                return executor;
262        }
263
264        /**
265         * Create the BlockingQueue to use for the ThreadPoolExecutor.
266         * <p>A LinkedBlockingQueue instance will be created for a positive
267         * capacity value; a SynchronousQueue else.
268         * @param queueCapacity the specified queue capacity
269         * @return the BlockingQueue instance
270         * @see java.util.concurrent.LinkedBlockingQueue
271         * @see java.util.concurrent.SynchronousQueue
272         */
273        protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
274                if (queueCapacity > 0) {
275                        return new LinkedBlockingQueue<>(queueCapacity);
276                }
277                else {
278                        return new SynchronousQueue<>();
279                }
280        }
281
282        /**
283         * Return the underlying ThreadPoolExecutor for native access.
284         * @return the underlying ThreadPoolExecutor (never {@code null})
285         * @throws IllegalStateException if the ThreadPoolTaskExecutor hasn't been initialized yet
286         */
287        public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
288                Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
289                return this.threadPoolExecutor;
290        }
291
292        /**
293         * Return the current pool size.
294         * @see java.util.concurrent.ThreadPoolExecutor#getPoolSize()
295         */
296        public int getPoolSize() {
297                if (this.threadPoolExecutor == null) {
298                        // Not initialized yet: assume core pool size.
299                        return this.corePoolSize;
300                }
301                return this.threadPoolExecutor.getPoolSize();
302        }
303
304        /**
305         * Return the number of currently active threads.
306         * @see java.util.concurrent.ThreadPoolExecutor#getActiveCount()
307         */
308        public int getActiveCount() {
309                if (this.threadPoolExecutor == null) {
310                        // Not initialized yet: assume no active threads.
311                        return 0;
312                }
313                return this.threadPoolExecutor.getActiveCount();
314        }
315
316
317        @Override
318        public void execute(Runnable task) {
319                Executor executor = getThreadPoolExecutor();
320                try {
321                        executor.execute(task);
322                }
323                catch (RejectedExecutionException ex) {
324                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
325                }
326        }
327
328        @Override
329        public void execute(Runnable task, long startTimeout) {
330                execute(task);
331        }
332
333        @Override
334        public Future<?> submit(Runnable task) {
335                ExecutorService executor = getThreadPoolExecutor();
336                try {
337                        return executor.submit(task);
338                }
339                catch (RejectedExecutionException ex) {
340                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
341                }
342        }
343
344        @Override
345        public <T> Future<T> submit(Callable<T> task) {
346                ExecutorService executor = getThreadPoolExecutor();
347                try {
348                        return executor.submit(task);
349                }
350                catch (RejectedExecutionException ex) {
351                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
352                }
353        }
354
355        @Override
356        public ListenableFuture<?> submitListenable(Runnable task) {
357                ExecutorService executor = getThreadPoolExecutor();
358                try {
359                        ListenableFutureTask<Object> future = new ListenableFutureTask<>(task, null);
360                        executor.execute(future);
361                        return future;
362                }
363                catch (RejectedExecutionException ex) {
364                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
365                }
366        }
367
368        @Override
369        public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
370                ExecutorService executor = getThreadPoolExecutor();
371                try {
372                        ListenableFutureTask<T> future = new ListenableFutureTask<>(task);
373                        executor.execute(future);
374                        return future;
375                }
376                catch (RejectedExecutionException ex) {
377                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
378                }
379        }
380
381        @Override
382        protected void cancelRemainingTask(Runnable task) {
383                super.cancelRemainingTask(task);
384                // Cancel associated user-level Future handle as well
385                Object original = this.decoratedTaskMap.get(task);
386                if (original instanceof Future) {
387                        ((Future<?>) original).cancel(true);
388                }
389        }
390
391}