001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.scheduling.concurrent; 018 019import java.util.concurrent.BlockingQueue; 020import java.util.concurrent.ExecutorService; 021import java.util.concurrent.Executors; 022import java.util.concurrent.LinkedBlockingQueue; 023import java.util.concurrent.RejectedExecutionHandler; 024import java.util.concurrent.SynchronousQueue; 025import java.util.concurrent.ThreadFactory; 026import java.util.concurrent.ThreadPoolExecutor; 027import java.util.concurrent.TimeUnit; 028 029import org.springframework.beans.factory.DisposableBean; 030import org.springframework.beans.factory.FactoryBean; 031import org.springframework.beans.factory.InitializingBean; 032 033/** 034 * JavaBean that allows for configuring a {@link java.util.concurrent.ThreadPoolExecutor} 035 * in bean style (through its "corePoolSize", "maxPoolSize", "keepAliveSeconds", 036 * "queueCapacity" properties) and exposing it as a bean reference of its native 037 * {@link java.util.concurrent.ExecutorService} type. 038 * 039 * <p>The default configuration is a core pool size of 1, with unlimited max pool size 040 * and unlimited queue capacity. This is roughly equivalent to 041 * {@link java.util.concurrent.Executors#newSingleThreadExecutor()}, sharing a single 042 * thread for all tasks. Setting {@link #setQueueCapacity "queueCapacity"} to 0 mimics 043 * {@link java.util.concurrent.Executors#newCachedThreadPool()}, with immediate scaling 044 * of threads in the pool to a potentially very high number. Consider also setting a 045 * {@link #setMaxPoolSize "maxPoolSize"} at that point, as well as possibly a higher 046 * {@link #setCorePoolSize "corePoolSize"} (see also the 047 * {@link #setAllowCoreThreadTimeOut "allowCoreThreadTimeOut"} mode of scaling). 048 * 049 * <p>For an alternative, you may set up a {@link ThreadPoolExecutor} instance directly 050 * using constructor injection, or use a factory method definition that points to the 051 * {@link java.util.concurrent.Executors} class. 052 * <b>This is strongly recommended in particular for common {@code @Bean} methods in 053 * configuration classes, where this {@code FactoryBean} variant would force you to 054 * return the {@code FactoryBean} type instead of the actual {@code Executor} type.</b> 055 * 056 * <p>If you need a timing-based {@link java.util.concurrent.ScheduledExecutorService} 057 * instead, consider {@link ScheduledExecutorFactoryBean}. 058 059 * @author Juergen Hoeller 060 * @since 3.0 061 * @see java.util.concurrent.ExecutorService 062 * @see java.util.concurrent.Executors 063 * @see java.util.concurrent.ThreadPoolExecutor 064 */ 065@SuppressWarnings("serial") 066public class ThreadPoolExecutorFactoryBean extends ExecutorConfigurationSupport 067 implements FactoryBean<ExecutorService>, InitializingBean, DisposableBean { 068 069 private int corePoolSize = 1; 070 071 private int maxPoolSize = Integer.MAX_VALUE; 072 073 private int keepAliveSeconds = 60; 074 075 private boolean allowCoreThreadTimeOut = false; 076 077 private int queueCapacity = Integer.MAX_VALUE; 078 079 private boolean exposeUnconfigurableExecutor = false; 080 081 private ExecutorService exposedExecutor; 082 083 084 /** 085 * Set the ThreadPoolExecutor's core pool size. 086 * Default is 1. 087 */ 088 public void setCorePoolSize(int corePoolSize) { 089 this.corePoolSize = corePoolSize; 090 } 091 092 /** 093 * Set the ThreadPoolExecutor's maximum pool size. 094 * Default is {@code Integer.MAX_VALUE}. 095 */ 096 public void setMaxPoolSize(int maxPoolSize) { 097 this.maxPoolSize = maxPoolSize; 098 } 099 100 /** 101 * Set the ThreadPoolExecutor's keep-alive seconds. 102 * Default is 60. 103 */ 104 public void setKeepAliveSeconds(int keepAliveSeconds) { 105 this.keepAliveSeconds = keepAliveSeconds; 106 } 107 108 /** 109 * Specify whether to allow core threads to time out. This enables dynamic 110 * growing and shrinking even in combination with a non-zero queue (since 111 * the max pool size will only grow once the queue is full). 112 * <p>Default is "false". 113 * @see java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean) 114 */ 115 public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) { 116 this.allowCoreThreadTimeOut = allowCoreThreadTimeOut; 117 } 118 119 /** 120 * Set the capacity for the ThreadPoolExecutor's BlockingQueue. 121 * Default is {@code Integer.MAX_VALUE}. 122 * <p>Any positive value will lead to a LinkedBlockingQueue instance; 123 * any other value will lead to a SynchronousQueue instance. 124 * @see java.util.concurrent.LinkedBlockingQueue 125 * @see java.util.concurrent.SynchronousQueue 126 */ 127 public void setQueueCapacity(int queueCapacity) { 128 this.queueCapacity = queueCapacity; 129 } 130 131 /** 132 * Specify whether this FactoryBean should expose an unconfigurable 133 * decorator for the created executor. 134 * <p>Default is "false", exposing the raw executor as bean reference. 135 * Switch this flag to "true" to strictly prevent clients from 136 * modifying the executor's configuration. 137 * @see java.util.concurrent.Executors#unconfigurableExecutorService 138 */ 139 public void setExposeUnconfigurableExecutor(boolean exposeUnconfigurableExecutor) { 140 this.exposeUnconfigurableExecutor = exposeUnconfigurableExecutor; 141 } 142 143 144 @Override 145 protected ExecutorService initializeExecutor( 146 ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { 147 148 BlockingQueue<Runnable> queue = createQueue(this.queueCapacity); 149 ThreadPoolExecutor executor = createExecutor(this.corePoolSize, this.maxPoolSize, 150 this.keepAliveSeconds, queue, threadFactory, rejectedExecutionHandler); 151 if (this.allowCoreThreadTimeOut) { 152 executor.allowCoreThreadTimeOut(true); 153 } 154 155 // Wrap executor with an unconfigurable decorator. 156 this.exposedExecutor = (this.exposeUnconfigurableExecutor ? 157 Executors.unconfigurableExecutorService(executor) : executor); 158 159 return executor; 160 } 161 162 /** 163 * Create a new instance of {@link ThreadPoolExecutor} or a subclass thereof. 164 * <p>The default implementation creates a standard {@link ThreadPoolExecutor}. 165 * Can be overridden to provide custom {@link ThreadPoolExecutor} subclasses. 166 * @param corePoolSize the specified core pool size 167 * @param maxPoolSize the specified maximum pool size 168 * @param keepAliveSeconds the specified keep-alive time in seconds 169 * @param queue the BlockingQueue to use 170 * @param threadFactory the ThreadFactory to use 171 * @param rejectedExecutionHandler the RejectedExecutionHandler to use 172 * @return a new ThreadPoolExecutor instance 173 * @see #afterPropertiesSet() 174 */ 175 protected ThreadPoolExecutor createExecutor( 176 int corePoolSize, int maxPoolSize, int keepAliveSeconds, BlockingQueue<Runnable> queue, 177 ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { 178 179 return new ThreadPoolExecutor(corePoolSize, maxPoolSize, 180 keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); 181 } 182 183 /** 184 * Create the BlockingQueue to use for the ThreadPoolExecutor. 185 * <p>A LinkedBlockingQueue instance will be created for a positive 186 * capacity value; a SynchronousQueue else. 187 * @param queueCapacity the specified queue capacity 188 * @return the BlockingQueue instance 189 * @see java.util.concurrent.LinkedBlockingQueue 190 * @see java.util.concurrent.SynchronousQueue 191 */ 192 protected BlockingQueue<Runnable> createQueue(int queueCapacity) { 193 if (queueCapacity > 0) { 194 return new LinkedBlockingQueue<Runnable>(queueCapacity); 195 } 196 else { 197 return new SynchronousQueue<Runnable>(); 198 } 199 } 200 201 202 @Override 203 public ExecutorService getObject() { 204 return this.exposedExecutor; 205 } 206 207 @Override 208 public Class<? extends ExecutorService> getObjectType() { 209 return (this.exposedExecutor != null ? this.exposedExecutor.getClass() : ExecutorService.class); 210 } 211 212 @Override 213 public boolean isSingleton() { 214 return true; 215 } 216 217}