001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.scheduling.concurrent; 018 019import java.util.concurrent.BlockingQueue; 020import java.util.concurrent.ExecutorService; 021import java.util.concurrent.Executors; 022import java.util.concurrent.LinkedBlockingQueue; 023import java.util.concurrent.RejectedExecutionHandler; 024import java.util.concurrent.SynchronousQueue; 025import java.util.concurrent.ThreadFactory; 026import java.util.concurrent.ThreadPoolExecutor; 027import java.util.concurrent.TimeUnit; 028 029import org.springframework.beans.factory.DisposableBean; 030import org.springframework.beans.factory.FactoryBean; 031import org.springframework.beans.factory.InitializingBean; 032import org.springframework.lang.Nullable; 033 034/** 035 * JavaBean that allows for configuring a {@link java.util.concurrent.ThreadPoolExecutor} 036 * in bean style (through its "corePoolSize", "maxPoolSize", "keepAliveSeconds", 037 * "queueCapacity" properties) and exposing it as a bean reference of its native 038 * {@link java.util.concurrent.ExecutorService} type. 039 * 040 * <p>The default configuration is a core pool size of 1, with unlimited max pool size 041 * and unlimited queue capacity. This is roughly equivalent to 042 * {@link java.util.concurrent.Executors#newSingleThreadExecutor()}, sharing a single 043 * thread for all tasks. Setting {@link #setQueueCapacity "queueCapacity"} to 0 mimics 044 * {@link java.util.concurrent.Executors#newCachedThreadPool()}, with immediate scaling 045 * of threads in the pool to a potentially very high number. Consider also setting a 046 * {@link #setMaxPoolSize "maxPoolSize"} at that point, as well as possibly a higher 047 * {@link #setCorePoolSize "corePoolSize"} (see also the 048 * {@link #setAllowCoreThreadTimeOut "allowCoreThreadTimeOut"} mode of scaling). 049 * 050 * <p>For an alternative, you may set up a {@link ThreadPoolExecutor} instance directly 051 * using constructor injection, or use a factory method definition that points to the 052 * {@link java.util.concurrent.Executors} class. 053 * <b>This is strongly recommended in particular for common {@code @Bean} methods in 054 * configuration classes, where this {@code FactoryBean} variant would force you to 055 * return the {@code FactoryBean} type instead of the actual {@code Executor} type.</b> 056 * 057 * <p>If you need a timing-based {@link java.util.concurrent.ScheduledExecutorService} 058 * instead, consider {@link ScheduledExecutorFactoryBean}. 059 060 * @author Juergen Hoeller 061 * @since 3.0 062 * @see java.util.concurrent.ExecutorService 063 * @see java.util.concurrent.Executors 064 * @see java.util.concurrent.ThreadPoolExecutor 065 */ 066@SuppressWarnings("serial") 067public class ThreadPoolExecutorFactoryBean extends ExecutorConfigurationSupport 068 implements FactoryBean<ExecutorService>, InitializingBean, DisposableBean { 069 070 private int corePoolSize = 1; 071 072 private int maxPoolSize = Integer.MAX_VALUE; 073 074 private int keepAliveSeconds = 60; 075 076 private boolean allowCoreThreadTimeOut = false; 077 078 private int queueCapacity = Integer.MAX_VALUE; 079 080 private boolean exposeUnconfigurableExecutor = false; 081 082 @Nullable 083 private ExecutorService exposedExecutor; 084 085 086 /** 087 * Set the ThreadPoolExecutor's core pool size. 088 * Default is 1. 089 */ 090 public void setCorePoolSize(int corePoolSize) { 091 this.corePoolSize = corePoolSize; 092 } 093 094 /** 095 * Set the ThreadPoolExecutor's maximum pool size. 096 * Default is {@code Integer.MAX_VALUE}. 097 */ 098 public void setMaxPoolSize(int maxPoolSize) { 099 this.maxPoolSize = maxPoolSize; 100 } 101 102 /** 103 * Set the ThreadPoolExecutor's keep-alive seconds. 104 * Default is 60. 105 */ 106 public void setKeepAliveSeconds(int keepAliveSeconds) { 107 this.keepAliveSeconds = keepAliveSeconds; 108 } 109 110 /** 111 * Specify whether to allow core threads to time out. This enables dynamic 112 * growing and shrinking even in combination with a non-zero queue (since 113 * the max pool size will only grow once the queue is full). 114 * <p>Default is "false". 115 * @see java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean) 116 */ 117 public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) { 118 this.allowCoreThreadTimeOut = allowCoreThreadTimeOut; 119 } 120 121 /** 122 * Set the capacity for the ThreadPoolExecutor's BlockingQueue. 123 * Default is {@code Integer.MAX_VALUE}. 124 * <p>Any positive value will lead to a LinkedBlockingQueue instance; 125 * any other value will lead to a SynchronousQueue instance. 126 * @see java.util.concurrent.LinkedBlockingQueue 127 * @see java.util.concurrent.SynchronousQueue 128 */ 129 public void setQueueCapacity(int queueCapacity) { 130 this.queueCapacity = queueCapacity; 131 } 132 133 /** 134 * Specify whether this FactoryBean should expose an unconfigurable 135 * decorator for the created executor. 136 * <p>Default is "false", exposing the raw executor as bean reference. 137 * Switch this flag to "true" to strictly prevent clients from 138 * modifying the executor's configuration. 139 * @see java.util.concurrent.Executors#unconfigurableExecutorService 140 */ 141 public void setExposeUnconfigurableExecutor(boolean exposeUnconfigurableExecutor) { 142 this.exposeUnconfigurableExecutor = exposeUnconfigurableExecutor; 143 } 144 145 146 @Override 147 protected ExecutorService initializeExecutor( 148 ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { 149 150 BlockingQueue<Runnable> queue = createQueue(this.queueCapacity); 151 ThreadPoolExecutor executor = createExecutor(this.corePoolSize, this.maxPoolSize, 152 this.keepAliveSeconds, queue, threadFactory, rejectedExecutionHandler); 153 if (this.allowCoreThreadTimeOut) { 154 executor.allowCoreThreadTimeOut(true); 155 } 156 157 // Wrap executor with an unconfigurable decorator. 158 this.exposedExecutor = (this.exposeUnconfigurableExecutor ? 159 Executors.unconfigurableExecutorService(executor) : executor); 160 161 return executor; 162 } 163 164 /** 165 * Create a new instance of {@link ThreadPoolExecutor} or a subclass thereof. 166 * <p>The default implementation creates a standard {@link ThreadPoolExecutor}. 167 * Can be overridden to provide custom {@link ThreadPoolExecutor} subclasses. 168 * @param corePoolSize the specified core pool size 169 * @param maxPoolSize the specified maximum pool size 170 * @param keepAliveSeconds the specified keep-alive time in seconds 171 * @param queue the BlockingQueue to use 172 * @param threadFactory the ThreadFactory to use 173 * @param rejectedExecutionHandler the RejectedExecutionHandler to use 174 * @return a new ThreadPoolExecutor instance 175 * @see #afterPropertiesSet() 176 */ 177 protected ThreadPoolExecutor createExecutor( 178 int corePoolSize, int maxPoolSize, int keepAliveSeconds, BlockingQueue<Runnable> queue, 179 ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { 180 181 return new ThreadPoolExecutor(corePoolSize, maxPoolSize, 182 keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); 183 } 184 185 /** 186 * Create the BlockingQueue to use for the ThreadPoolExecutor. 187 * <p>A LinkedBlockingQueue instance will be created for a positive 188 * capacity value; a SynchronousQueue else. 189 * @param queueCapacity the specified queue capacity 190 * @return the BlockingQueue instance 191 * @see java.util.concurrent.LinkedBlockingQueue 192 * @see java.util.concurrent.SynchronousQueue 193 */ 194 protected BlockingQueue<Runnable> createQueue(int queueCapacity) { 195 if (queueCapacity > 0) { 196 return new LinkedBlockingQueue<>(queueCapacity); 197 } 198 else { 199 return new SynchronousQueue<>(); 200 } 201 } 202 203 204 @Override 205 @Nullable 206 public ExecutorService getObject() { 207 return this.exposedExecutor; 208 } 209 210 @Override 211 public Class<? extends ExecutorService> getObjectType() { 212 return (this.exposedExecutor != null ? this.exposedExecutor.getClass() : ExecutorService.class); 213 } 214 215 @Override 216 public boolean isSingleton() { 217 return true; 218 } 219 220}