001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.scheduling.concurrent;
018
019import java.util.concurrent.ExecutorService;
020import java.util.concurrent.Future;
021import java.util.concurrent.RejectedExecutionHandler;
022import java.util.concurrent.RunnableFuture;
023import java.util.concurrent.ThreadFactory;
024import java.util.concurrent.ThreadPoolExecutor;
025import java.util.concurrent.TimeUnit;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029
030import org.springframework.beans.factory.BeanNameAware;
031import org.springframework.beans.factory.DisposableBean;
032import org.springframework.beans.factory.InitializingBean;
033import org.springframework.lang.Nullable;
034
035/**
036 * Base class for setting up a {@link java.util.concurrent.ExecutorService}
037 * (typically a {@link java.util.concurrent.ThreadPoolExecutor} or
038 * {@link java.util.concurrent.ScheduledThreadPoolExecutor}).
039 * Defines common configuration settings and common lifecycle handling.
040 *
041 * @author Juergen Hoeller
042 * @since 3.0
043 * @see java.util.concurrent.ExecutorService
044 * @see java.util.concurrent.Executors
045 * @see java.util.concurrent.ThreadPoolExecutor
046 * @see java.util.concurrent.ScheduledThreadPoolExecutor
047 */
048@SuppressWarnings("serial")
049public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory
050                implements BeanNameAware, InitializingBean, DisposableBean {
051
052        protected final Log logger = LogFactory.getLog(getClass());
053
054        private ThreadFactory threadFactory = this;
055
056        private boolean threadNamePrefixSet = false;
057
058        private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
059
060        private boolean waitForTasksToCompleteOnShutdown = false;
061
062        private long awaitTerminationMillis = 0;
063
064        @Nullable
065        private String beanName;
066
067        @Nullable
068        private ExecutorService executor;
069
070
071        /**
072         * Set the ThreadFactory to use for the ExecutorService's thread pool.
073         * Default is the underlying ExecutorService's default thread factory.
074         * <p>In a Java EE 7 or other managed environment with JSR-236 support,
075         * consider specifying a JNDI-located ManagedThreadFactory: by default,
076         * to be found at "java:comp/DefaultManagedThreadFactory".
077         * Use the "jee:jndi-lookup" namespace element in XML or the programmatic
078         * {@link org.springframework.jndi.JndiLocatorDelegate} for convenient lookup.
079         * Alternatively, consider using Spring's {@link DefaultManagedAwareThreadFactory}
080         * with its fallback to local threads in case of no managed thread factory found.
081         * @see java.util.concurrent.Executors#defaultThreadFactory()
082         * @see javax.enterprise.concurrent.ManagedThreadFactory
083         * @see DefaultManagedAwareThreadFactory
084         */
085        public void setThreadFactory(@Nullable ThreadFactory threadFactory) {
086                this.threadFactory = (threadFactory != null ? threadFactory : this);
087        }
088
089        @Override
090        public void setThreadNamePrefix(@Nullable String threadNamePrefix) {
091                super.setThreadNamePrefix(threadNamePrefix);
092                this.threadNamePrefixSet = true;
093        }
094
095        /**
096         * Set the RejectedExecutionHandler to use for the ExecutorService.
097         * Default is the ExecutorService's default abort policy.
098         * @see java.util.concurrent.ThreadPoolExecutor.AbortPolicy
099         */
100        public void setRejectedExecutionHandler(@Nullable RejectedExecutionHandler rejectedExecutionHandler) {
101                this.rejectedExecutionHandler =
102                                (rejectedExecutionHandler != null ? rejectedExecutionHandler : new ThreadPoolExecutor.AbortPolicy());
103        }
104
105        /**
106         * Set whether to wait for scheduled tasks to complete on shutdown,
107         * not interrupting running tasks and executing all tasks in the queue.
108         * <p>Default is "false", shutting down immediately through interrupting
109         * ongoing tasks and clearing the queue. Switch this flag to "true" if you
110         * prefer fully completed tasks at the expense of a longer shutdown phase.
111         * <p>Note that Spring's container shutdown continues while ongoing tasks
112         * are being completed. If you want this executor to block and wait for the
113         * termination of tasks before the rest of the container continues to shut
114         * down - e.g. in order to keep up other resources that your tasks may need -,
115         * set the {@link #setAwaitTerminationSeconds "awaitTerminationSeconds"}
116         * property instead of or in addition to this property.
117         * @see java.util.concurrent.ExecutorService#shutdown()
118         * @see java.util.concurrent.ExecutorService#shutdownNow()
119         */
120        public void setWaitForTasksToCompleteOnShutdown(boolean waitForJobsToCompleteOnShutdown) {
121                this.waitForTasksToCompleteOnShutdown = waitForJobsToCompleteOnShutdown;
122        }
123
124        /**
125         * Set the maximum number of seconds that this executor is supposed to block
126         * on shutdown in order to wait for remaining tasks to complete their execution
127         * before the rest of the container continues to shut down. This is particularly
128         * useful if your remaining tasks are likely to need access to other resources
129         * that are also managed by the container.
130         * <p>By default, this executor won't wait for the termination of tasks at all.
131         * It will either shut down immediately, interrupting ongoing tasks and clearing
132         * the remaining task queue - or, if the
133         * {@link #setWaitForTasksToCompleteOnShutdown "waitForTasksToCompleteOnShutdown"}
134         * flag has been set to {@code true}, it will continue to fully execute all
135         * ongoing tasks as well as all remaining tasks in the queue, in parallel to
136         * the rest of the container shutting down.
137         * <p>In either case, if you specify an await-termination period using this property,
138         * this executor will wait for the given time (max) for the termination of tasks.
139         * As a rule of thumb, specify a significantly higher timeout here if you set
140         * "waitForTasksToCompleteOnShutdown" to {@code true} at the same time,
141         * since all remaining tasks in the queue will still get executed - in contrast
142         * to the default shutdown behavior where it's just about waiting for currently
143         * executing tasks that aren't reacting to thread interruption.
144         * @see #setAwaitTerminationMillis
145         * @see java.util.concurrent.ExecutorService#shutdown()
146         * @see java.util.concurrent.ExecutorService#awaitTermination
147         */
148        public void setAwaitTerminationSeconds(int awaitTerminationSeconds) {
149                this.awaitTerminationMillis = awaitTerminationSeconds * 1000L;
150        }
151
152        /**
153         * Variant of {@link #setAwaitTerminationSeconds} with millisecond precision.
154         * @since 5.2.4
155         * @see #setAwaitTerminationSeconds
156         */
157        public void setAwaitTerminationMillis(long awaitTerminationMillis) {
158                this.awaitTerminationMillis = awaitTerminationMillis;
159        }
160
161        @Override
162        public void setBeanName(String name) {
163                this.beanName = name;
164        }
165
166
167        /**
168         * Calls {@code initialize()} after the container applied all property values.
169         * @see #initialize()
170         */
171        @Override
172        public void afterPropertiesSet() {
173                initialize();
174        }
175
176        /**
177         * Set up the ExecutorService.
178         */
179        public void initialize() {
180                if (logger.isInfoEnabled()) {
181                        logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
182                }
183                if (!this.threadNamePrefixSet && this.beanName != null) {
184                        setThreadNamePrefix(this.beanName + "-");
185                }
186                this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
187        }
188
189        /**
190         * Create the target {@link java.util.concurrent.ExecutorService} instance.
191         * Called by {@code afterPropertiesSet}.
192         * @param threadFactory the ThreadFactory to use
193         * @param rejectedExecutionHandler the RejectedExecutionHandler to use
194         * @return a new ExecutorService instance
195         * @see #afterPropertiesSet()
196         */
197        protected abstract ExecutorService initializeExecutor(
198                        ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);
199
200
201        /**
202         * Calls {@code shutdown} when the BeanFactory destroys
203         * the task executor instance.
204         * @see #shutdown()
205         */
206        @Override
207        public void destroy() {
208                shutdown();
209        }
210
211        /**
212         * Perform a shutdown on the underlying ExecutorService.
213         * @see java.util.concurrent.ExecutorService#shutdown()
214         * @see java.util.concurrent.ExecutorService#shutdownNow()
215         */
216        public void shutdown() {
217                if (logger.isInfoEnabled()) {
218                        logger.info("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
219                }
220                if (this.executor != null) {
221                        if (this.waitForTasksToCompleteOnShutdown) {
222                                this.executor.shutdown();
223                        }
224                        else {
225                                for (Runnable remainingTask : this.executor.shutdownNow()) {
226                                        cancelRemainingTask(remainingTask);
227                                }
228                        }
229                        awaitTerminationIfNecessary(this.executor);
230                }
231        }
232
233        /**
234         * Cancel the given remaining task which never commended execution,
235         * as returned from {@link ExecutorService#shutdownNow()}.
236         * @param task the task to cancel (typically a {@link RunnableFuture})
237         * @since 5.0.5
238         * @see #shutdown()
239         * @see RunnableFuture#cancel(boolean)
240         */
241        protected void cancelRemainingTask(Runnable task) {
242                if (task instanceof Future) {
243                        ((Future<?>) task).cancel(true);
244                }
245        }
246
247        /**
248         * Wait for the executor to terminate, according to the value of the
249         * {@link #setAwaitTerminationSeconds "awaitTerminationSeconds"} property.
250         */
251        private void awaitTerminationIfNecessary(ExecutorService executor) {
252                if (this.awaitTerminationMillis > 0) {
253                        try {
254                                if (!executor.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS)) {
255                                        if (logger.isWarnEnabled()) {
256                                                logger.warn("Timed out while waiting for executor" +
257                                                                (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
258                                        }
259                                }
260                        }
261                        catch (InterruptedException ex) {
262                                if (logger.isWarnEnabled()) {
263                                        logger.warn("Interrupted while waiting for executor" +
264                                                        (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
265                                }
266                                Thread.currentThread().interrupt();
267                        }
268                }
269        }
270
271}