001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.scheduling.concurrent;
018
019import java.util.concurrent.BlockingQueue;
020import java.util.concurrent.ExecutorService;
021import java.util.concurrent.Executors;
022import java.util.concurrent.LinkedBlockingQueue;
023import java.util.concurrent.RejectedExecutionHandler;
024import java.util.concurrent.SynchronousQueue;
025import java.util.concurrent.ThreadFactory;
026import java.util.concurrent.ThreadPoolExecutor;
027import java.util.concurrent.TimeUnit;
028
029import org.springframework.beans.factory.DisposableBean;
030import org.springframework.beans.factory.FactoryBean;
031import org.springframework.beans.factory.InitializingBean;
032import org.springframework.lang.Nullable;
033
034/**
035 * JavaBean that allows for configuring a {@link java.util.concurrent.ThreadPoolExecutor}
036 * in bean style (through its "corePoolSize", "maxPoolSize", "keepAliveSeconds",
037 * "queueCapacity" properties) and exposing it as a bean reference of its native
038 * {@link java.util.concurrent.ExecutorService} type.
039 *
040 * <p>The default configuration is a core pool size of 1, with unlimited max pool size
041 * and unlimited queue capacity. This is roughly equivalent to
042 * {@link java.util.concurrent.Executors#newSingleThreadExecutor()}, sharing a single
043 * thread for all tasks. Setting {@link #setQueueCapacity "queueCapacity"} to 0 mimics
044 * {@link java.util.concurrent.Executors#newCachedThreadPool()}, with immediate scaling
045 * of threads in the pool to a potentially very high number. Consider also setting a
046 * {@link #setMaxPoolSize "maxPoolSize"} at that point, as well as possibly a higher
047 * {@link #setCorePoolSize "corePoolSize"} (see also the
048 * {@link #setAllowCoreThreadTimeOut "allowCoreThreadTimeOut"} mode of scaling).
049 *
050 * <p>For an alternative, you may set up a {@link ThreadPoolExecutor} instance directly
051 * using constructor injection, or use a factory method definition that points to the
052 * {@link java.util.concurrent.Executors} class.
053 * <b>This is strongly recommended in particular for common {@code @Bean} methods in
054 * configuration classes, where this {@code FactoryBean} variant would force you to
055 * return the {@code FactoryBean} type instead of the actual {@code Executor} type.</b>
056 *
057 * <p>If you need a timing-based {@link java.util.concurrent.ScheduledExecutorService}
058 * instead, consider {@link ScheduledExecutorFactoryBean}.
059
060 * @author Juergen Hoeller
061 * @since 3.0
062 * @see java.util.concurrent.ExecutorService
063 * @see java.util.concurrent.Executors
064 * @see java.util.concurrent.ThreadPoolExecutor
065 */
066@SuppressWarnings("serial")
067public class ThreadPoolExecutorFactoryBean extends ExecutorConfigurationSupport
068                implements FactoryBean<ExecutorService>, InitializingBean, DisposableBean {
069
070        private int corePoolSize = 1;
071
072        private int maxPoolSize = Integer.MAX_VALUE;
073
074        private int keepAliveSeconds = 60;
075
076        private boolean allowCoreThreadTimeOut = false;
077
078        private int queueCapacity = Integer.MAX_VALUE;
079
080        private boolean exposeUnconfigurableExecutor = false;
081
082        @Nullable
083        private ExecutorService exposedExecutor;
084
085
086        /**
087         * Set the ThreadPoolExecutor's core pool size.
088         * Default is 1.
089         */
090        public void setCorePoolSize(int corePoolSize) {
091                this.corePoolSize = corePoolSize;
092        }
093
094        /**
095         * Set the ThreadPoolExecutor's maximum pool size.
096         * Default is {@code Integer.MAX_VALUE}.
097         */
098        public void setMaxPoolSize(int maxPoolSize) {
099                this.maxPoolSize = maxPoolSize;
100        }
101
102        /**
103         * Set the ThreadPoolExecutor's keep-alive seconds.
104         * Default is 60.
105         */
106        public void setKeepAliveSeconds(int keepAliveSeconds) {
107                this.keepAliveSeconds = keepAliveSeconds;
108        }
109
110        /**
111         * Specify whether to allow core threads to time out. This enables dynamic
112         * growing and shrinking even in combination with a non-zero queue (since
113         * the max pool size will only grow once the queue is full).
114         * <p>Default is "false".
115         * @see java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean)
116         */
117        public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
118                this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
119        }
120
121        /**
122         * Set the capacity for the ThreadPoolExecutor's BlockingQueue.
123         * Default is {@code Integer.MAX_VALUE}.
124         * <p>Any positive value will lead to a LinkedBlockingQueue instance;
125         * any other value will lead to a SynchronousQueue instance.
126         * @see java.util.concurrent.LinkedBlockingQueue
127         * @see java.util.concurrent.SynchronousQueue
128         */
129        public void setQueueCapacity(int queueCapacity) {
130                this.queueCapacity = queueCapacity;
131        }
132
133        /**
134         * Specify whether this FactoryBean should expose an unconfigurable
135         * decorator for the created executor.
136         * <p>Default is "false", exposing the raw executor as bean reference.
137         * Switch this flag to "true" to strictly prevent clients from
138         * modifying the executor's configuration.
139         * @see java.util.concurrent.Executors#unconfigurableExecutorService
140         */
141        public void setExposeUnconfigurableExecutor(boolean exposeUnconfigurableExecutor) {
142                this.exposeUnconfigurableExecutor = exposeUnconfigurableExecutor;
143        }
144
145
146        @Override
147        protected ExecutorService initializeExecutor(
148                        ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
149
150                BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
151                ThreadPoolExecutor executor  = createExecutor(this.corePoolSize, this.maxPoolSize,
152                                this.keepAliveSeconds, queue, threadFactory, rejectedExecutionHandler);
153                if (this.allowCoreThreadTimeOut) {
154                        executor.allowCoreThreadTimeOut(true);
155                }
156
157                // Wrap executor with an unconfigurable decorator.
158                this.exposedExecutor = (this.exposeUnconfigurableExecutor ?
159                                Executors.unconfigurableExecutorService(executor) : executor);
160
161                return executor;
162        }
163
164        /**
165         * Create a new instance of {@link ThreadPoolExecutor} or a subclass thereof.
166         * <p>The default implementation creates a standard {@link ThreadPoolExecutor}.
167         * Can be overridden to provide custom {@link ThreadPoolExecutor} subclasses.
168         * @param corePoolSize the specified core pool size
169         * @param maxPoolSize the specified maximum pool size
170         * @param keepAliveSeconds the specified keep-alive time in seconds
171         * @param queue the BlockingQueue to use
172         * @param threadFactory the ThreadFactory to use
173         * @param rejectedExecutionHandler the RejectedExecutionHandler to use
174         * @return a new ThreadPoolExecutor instance
175         * @see #afterPropertiesSet()
176         */
177        protected ThreadPoolExecutor createExecutor(
178                        int corePoolSize, int maxPoolSize, int keepAliveSeconds, BlockingQueue<Runnable> queue,
179                        ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
180
181                return new ThreadPoolExecutor(corePoolSize, maxPoolSize,
182                                keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
183        }
184
185        /**
186         * Create the BlockingQueue to use for the ThreadPoolExecutor.
187         * <p>A LinkedBlockingQueue instance will be created for a positive
188         * capacity value; a SynchronousQueue else.
189         * @param queueCapacity the specified queue capacity
190         * @return the BlockingQueue instance
191         * @see java.util.concurrent.LinkedBlockingQueue
192         * @see java.util.concurrent.SynchronousQueue
193         */
194        protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
195                if (queueCapacity > 0) {
196                        return new LinkedBlockingQueue<>(queueCapacity);
197                }
198                else {
199                        return new SynchronousQueue<>();
200                }
201        }
202
203
204        @Override
205        @Nullable
206        public ExecutorService getObject() {
207                return this.exposedExecutor;
208        }
209
210        @Override
211        public Class<? extends ExecutorService> getObjectType() {
212                return (this.exposedExecutor != null ? this.exposedExecutor.getClass() : ExecutorService.class);
213        }
214
215        @Override
216        public boolean isSingleton() {
217                return true;
218        }
219
220}