001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.scheduling.concurrent;
018
019import java.util.concurrent.BlockingQueue;
020import java.util.concurrent.ExecutorService;
021import java.util.concurrent.Executors;
022import java.util.concurrent.LinkedBlockingQueue;
023import java.util.concurrent.RejectedExecutionHandler;
024import java.util.concurrent.SynchronousQueue;
025import java.util.concurrent.ThreadFactory;
026import java.util.concurrent.ThreadPoolExecutor;
027import java.util.concurrent.TimeUnit;
028
029import org.springframework.beans.factory.DisposableBean;
030import org.springframework.beans.factory.FactoryBean;
031import org.springframework.beans.factory.InitializingBean;
032
033/**
034 * JavaBean that allows for configuring a {@link java.util.concurrent.ThreadPoolExecutor}
035 * in bean style (through its "corePoolSize", "maxPoolSize", "keepAliveSeconds",
036 * "queueCapacity" properties) and exposing it as a bean reference of its native
037 * {@link java.util.concurrent.ExecutorService} type.
038 *
039 * <p>The default configuration is a core pool size of 1, with unlimited max pool size
040 * and unlimited queue capacity. This is roughly equivalent to
041 * {@link java.util.concurrent.Executors#newSingleThreadExecutor()}, sharing a single
042 * thread for all tasks. Setting {@link #setQueueCapacity "queueCapacity"} to 0 mimics
043 * {@link java.util.concurrent.Executors#newCachedThreadPool()}, with immediate scaling
044 * of threads in the pool to a potentially very high number. Consider also setting a
045 * {@link #setMaxPoolSize "maxPoolSize"} at that point, as well as possibly a higher
046 * {@link #setCorePoolSize "corePoolSize"} (see also the
047 * {@link #setAllowCoreThreadTimeOut "allowCoreThreadTimeOut"} mode of scaling).
048 *
049 * <p>For an alternative, you may set up a {@link ThreadPoolExecutor} instance directly
050 * using constructor injection, or use a factory method definition that points to the
051 * {@link java.util.concurrent.Executors} class.
052 * <b>This is strongly recommended in particular for common {@code @Bean} methods in
053 * configuration classes, where this {@code FactoryBean} variant would force you to
054 * return the {@code FactoryBean} type instead of the actual {@code Executor} type.</b>
055 *
056 * <p>If you need a timing-based {@link java.util.concurrent.ScheduledExecutorService}
057 * instead, consider {@link ScheduledExecutorFactoryBean}.
058
059 * @author Juergen Hoeller
060 * @since 3.0
061 * @see java.util.concurrent.ExecutorService
062 * @see java.util.concurrent.Executors
063 * @see java.util.concurrent.ThreadPoolExecutor
064 */
065@SuppressWarnings("serial")
066public class ThreadPoolExecutorFactoryBean extends ExecutorConfigurationSupport
067                implements FactoryBean<ExecutorService>, InitializingBean, DisposableBean {
068
069        private int corePoolSize = 1;
070
071        private int maxPoolSize = Integer.MAX_VALUE;
072
073        private int keepAliveSeconds = 60;
074
075        private boolean allowCoreThreadTimeOut = false;
076
077        private int queueCapacity = Integer.MAX_VALUE;
078
079        private boolean exposeUnconfigurableExecutor = false;
080
081        private ExecutorService exposedExecutor;
082
083
084        /**
085         * Set the ThreadPoolExecutor's core pool size.
086         * Default is 1.
087         */
088        public void setCorePoolSize(int corePoolSize) {
089                this.corePoolSize = corePoolSize;
090        }
091
092        /**
093         * Set the ThreadPoolExecutor's maximum pool size.
094         * Default is {@code Integer.MAX_VALUE}.
095         */
096        public void setMaxPoolSize(int maxPoolSize) {
097                this.maxPoolSize = maxPoolSize;
098        }
099
100        /**
101         * Set the ThreadPoolExecutor's keep-alive seconds.
102         * Default is 60.
103         */
104        public void setKeepAliveSeconds(int keepAliveSeconds) {
105                this.keepAliveSeconds = keepAliveSeconds;
106        }
107
108        /**
109         * Specify whether to allow core threads to time out. This enables dynamic
110         * growing and shrinking even in combination with a non-zero queue (since
111         * the max pool size will only grow once the queue is full).
112         * <p>Default is "false".
113         * @see java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean)
114         */
115        public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
116                this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
117        }
118
119        /**
120         * Set the capacity for the ThreadPoolExecutor's BlockingQueue.
121         * Default is {@code Integer.MAX_VALUE}.
122         * <p>Any positive value will lead to a LinkedBlockingQueue instance;
123         * any other value will lead to a SynchronousQueue instance.
124         * @see java.util.concurrent.LinkedBlockingQueue
125         * @see java.util.concurrent.SynchronousQueue
126         */
127        public void setQueueCapacity(int queueCapacity) {
128                this.queueCapacity = queueCapacity;
129        }
130
131        /**
132         * Specify whether this FactoryBean should expose an unconfigurable
133         * decorator for the created executor.
134         * <p>Default is "false", exposing the raw executor as bean reference.
135         * Switch this flag to "true" to strictly prevent clients from
136         * modifying the executor's configuration.
137         * @see java.util.concurrent.Executors#unconfigurableExecutorService
138         */
139        public void setExposeUnconfigurableExecutor(boolean exposeUnconfigurableExecutor) {
140                this.exposeUnconfigurableExecutor = exposeUnconfigurableExecutor;
141        }
142
143
144        @Override
145        protected ExecutorService initializeExecutor(
146                        ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
147
148                BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
149                ThreadPoolExecutor executor  = createExecutor(this.corePoolSize, this.maxPoolSize,
150                                this.keepAliveSeconds, queue, threadFactory, rejectedExecutionHandler);
151                if (this.allowCoreThreadTimeOut) {
152                        executor.allowCoreThreadTimeOut(true);
153                }
154
155                // Wrap executor with an unconfigurable decorator.
156                this.exposedExecutor = (this.exposeUnconfigurableExecutor ?
157                                Executors.unconfigurableExecutorService(executor) : executor);
158
159                return executor;
160        }
161
162        /**
163         * Create a new instance of {@link ThreadPoolExecutor} or a subclass thereof.
164         * <p>The default implementation creates a standard {@link ThreadPoolExecutor}.
165         * Can be overridden to provide custom {@link ThreadPoolExecutor} subclasses.
166         * @param corePoolSize the specified core pool size
167         * @param maxPoolSize the specified maximum pool size
168         * @param keepAliveSeconds the specified keep-alive time in seconds
169         * @param queue the BlockingQueue to use
170         * @param threadFactory the ThreadFactory to use
171         * @param rejectedExecutionHandler the RejectedExecutionHandler to use
172         * @return a new ThreadPoolExecutor instance
173         * @see #afterPropertiesSet()
174         */
175        protected ThreadPoolExecutor createExecutor(
176                        int corePoolSize, int maxPoolSize, int keepAliveSeconds, BlockingQueue<Runnable> queue,
177                        ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
178
179                return new ThreadPoolExecutor(corePoolSize, maxPoolSize,
180                                keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
181        }
182
183        /**
184         * Create the BlockingQueue to use for the ThreadPoolExecutor.
185         * <p>A LinkedBlockingQueue instance will be created for a positive
186         * capacity value; a SynchronousQueue else.
187         * @param queueCapacity the specified queue capacity
188         * @return the BlockingQueue instance
189         * @see java.util.concurrent.LinkedBlockingQueue
190         * @see java.util.concurrent.SynchronousQueue
191         */
192        protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
193                if (queueCapacity > 0) {
194                        return new LinkedBlockingQueue<Runnable>(queueCapacity);
195                }
196                else {
197                        return new SynchronousQueue<Runnable>();
198                }
199        }
200
201
202        @Override
203        public ExecutorService getObject() {
204                return this.exposedExecutor;
205        }
206
207        @Override
208        public Class<? extends ExecutorService> getObjectType() {
209                return (this.exposedExecutor != null ? this.exposedExecutor.getClass() : ExecutorService.class);
210        }
211
212        @Override
213        public boolean isSingleton() {
214                return true;
215        }
216
217}