001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.scheduling.concurrent; 018 019import java.util.Date; 020import java.util.concurrent.Callable; 021import java.util.concurrent.Executor; 022import java.util.concurrent.ExecutorService; 023import java.util.concurrent.Future; 024import java.util.concurrent.RejectedExecutionException; 025import java.util.concurrent.RejectedExecutionHandler; 026import java.util.concurrent.ScheduledExecutorService; 027import java.util.concurrent.ScheduledFuture; 028import java.util.concurrent.ScheduledThreadPoolExecutor; 029import java.util.concurrent.ThreadFactory; 030import java.util.concurrent.TimeUnit; 031 032import org.springframework.core.task.AsyncListenableTaskExecutor; 033import org.springframework.core.task.TaskRejectedException; 034import org.springframework.lang.UsesJava7; 035import org.springframework.scheduling.SchedulingTaskExecutor; 036import org.springframework.scheduling.TaskScheduler; 037import org.springframework.scheduling.Trigger; 038import org.springframework.scheduling.support.TaskUtils; 039import org.springframework.util.Assert; 040import org.springframework.util.ClassUtils; 041import org.springframework.util.ErrorHandler; 042import org.springframework.util.concurrent.ListenableFuture; 043import org.springframework.util.concurrent.ListenableFutureTask; 044 045/** 046 * Implementation of Spring's {@link TaskScheduler} interface, wrapping 047 * a native {@link java.util.concurrent.ScheduledThreadPoolExecutor}. 048 * 049 * @author Juergen Hoeller 050 * @author Mark Fisher 051 * @since 3.0 052 * @see #setPoolSize 053 * @see #setRemoveOnCancelPolicy 054 * @see #setThreadFactory 055 * @see #setErrorHandler 056 */ 057@SuppressWarnings("serial") 058public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport 059 implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler { 060 061 // ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(boolean) only available on JDK 7+ 062 private static final boolean setRemoveOnCancelPolicyAvailable = 063 ClassUtils.hasMethod(ScheduledThreadPoolExecutor.class, "setRemoveOnCancelPolicy", boolean.class); 064 065 066 private volatile int poolSize = 1; 067 068 private volatile boolean removeOnCancelPolicy = false; 069 070 private volatile ErrorHandler errorHandler; 071 072 private volatile ScheduledExecutorService scheduledExecutor; 073 074 075 /** 076 * Set the ScheduledExecutorService's pool size. 077 * Default is 1. 078 * <p><b>This setting can be modified at runtime, for example through JMX.</b> 079 */ 080 public void setPoolSize(int poolSize) { 081 Assert.isTrue(poolSize > 0, "'poolSize' must be 1 or higher"); 082 this.poolSize = poolSize; 083 if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) { 084 ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setCorePoolSize(poolSize); 085 } 086 } 087 088 /** 089 * Set the remove-on-cancel mode on {@link ScheduledThreadPoolExecutor} (JDK 7+). 090 * <p>Default is {@code false}. If set to {@code true}, the target executor will be 091 * switched into remove-on-cancel mode (if possible, with a soft fallback otherwise). 092 * <p><b>This setting can be modified at runtime, for example through JMX.</b> 093 */ 094 @UsesJava7 095 public void setRemoveOnCancelPolicy(boolean removeOnCancelPolicy) { 096 this.removeOnCancelPolicy = removeOnCancelPolicy; 097 if (setRemoveOnCancelPolicyAvailable && this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) { 098 ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(removeOnCancelPolicy); 099 } 100 else if (removeOnCancelPolicy && this.scheduledExecutor != null) { 101 logger.info("Could not apply remove-on-cancel policy - not a Java 7+ ScheduledThreadPoolExecutor"); 102 } 103 } 104 105 /** 106 * Set a custom {@link ErrorHandler} strategy. 107 */ 108 public void setErrorHandler(ErrorHandler errorHandler) { 109 this.errorHandler = errorHandler; 110 } 111 112 113 @UsesJava7 114 @Override 115 protected ExecutorService initializeExecutor( 116 ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { 117 118 this.scheduledExecutor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler); 119 120 if (this.removeOnCancelPolicy) { 121 if (setRemoveOnCancelPolicyAvailable && this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) { 122 ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(true); 123 } 124 else { 125 logger.info("Could not apply remove-on-cancel policy - not a Java 7+ ScheduledThreadPoolExecutor"); 126 } 127 } 128 129 return this.scheduledExecutor; 130 } 131 132 /** 133 * Create a new {@link ScheduledExecutorService} instance. 134 * <p>The default implementation creates a {@link ScheduledThreadPoolExecutor}. 135 * Can be overridden in subclasses to provide custom {@link ScheduledExecutorService} instances. 136 * @param poolSize the specified pool size 137 * @param threadFactory the ThreadFactory to use 138 * @param rejectedExecutionHandler the RejectedExecutionHandler to use 139 * @return a new ScheduledExecutorService instance 140 * @see #afterPropertiesSet() 141 * @see java.util.concurrent.ScheduledThreadPoolExecutor 142 */ 143 protected ScheduledExecutorService createExecutor( 144 int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { 145 146 return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler); 147 } 148 149 /** 150 * Return the underlying ScheduledExecutorService for native access. 151 * @return the underlying ScheduledExecutorService (never {@code null}) 152 * @throws IllegalStateException if the ThreadPoolTaskScheduler hasn't been initialized yet 153 */ 154 public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException { 155 Assert.state(this.scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized"); 156 return this.scheduledExecutor; 157 } 158 159 /** 160 * Return the underlying ScheduledThreadPoolExecutor, if available. 161 * @return the underlying ScheduledExecutorService (never {@code null}) 162 * @throws IllegalStateException if the ThreadPoolTaskScheduler hasn't been initialized yet 163 * or if the underlying ScheduledExecutorService isn't a ScheduledThreadPoolExecutor 164 * @see #getScheduledExecutor() 165 */ 166 public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() throws IllegalStateException { 167 Assert.state(this.scheduledExecutor instanceof ScheduledThreadPoolExecutor, 168 "No ScheduledThreadPoolExecutor available"); 169 return (ScheduledThreadPoolExecutor) this.scheduledExecutor; 170 } 171 172 /** 173 * Return the current pool size. 174 * <p>Requires an underlying {@link ScheduledThreadPoolExecutor}. 175 * @see #getScheduledThreadPoolExecutor() 176 * @see java.util.concurrent.ScheduledThreadPoolExecutor#getPoolSize() 177 */ 178 public int getPoolSize() { 179 if (this.scheduledExecutor == null) { 180 // Not initialized yet: assume initial pool size. 181 return this.poolSize; 182 } 183 return getScheduledThreadPoolExecutor().getPoolSize(); 184 } 185 186 /** 187 * Return the current setting for the remove-on-cancel mode. 188 * <p>Requires an underlying {@link ScheduledThreadPoolExecutor}. 189 */ 190 @UsesJava7 191 public boolean isRemoveOnCancelPolicy() { 192 if (!setRemoveOnCancelPolicyAvailable) { 193 return false; 194 } 195 if (this.scheduledExecutor == null) { 196 // Not initialized yet: return our setting for the time being. 197 return this.removeOnCancelPolicy; 198 } 199 return getScheduledThreadPoolExecutor().getRemoveOnCancelPolicy(); 200 } 201 202 /** 203 * Return the number of currently active threads. 204 * <p>Requires an underlying {@link ScheduledThreadPoolExecutor}. 205 * @see #getScheduledThreadPoolExecutor() 206 * @see java.util.concurrent.ScheduledThreadPoolExecutor#getActiveCount() 207 */ 208 public int getActiveCount() { 209 if (this.scheduledExecutor == null) { 210 // Not initialized yet: assume no active threads. 211 return 0; 212 } 213 return getScheduledThreadPoolExecutor().getActiveCount(); 214 } 215 216 217 // SchedulingTaskExecutor implementation 218 219 @Override 220 public void execute(Runnable task) { 221 Executor executor = getScheduledExecutor(); 222 try { 223 executor.execute(errorHandlingTask(task, false)); 224 } 225 catch (RejectedExecutionException ex) { 226 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 227 } 228 } 229 230 @Override 231 public void execute(Runnable task, long startTimeout) { 232 execute(task); 233 } 234 235 @Override 236 public Future<?> submit(Runnable task) { 237 ExecutorService executor = getScheduledExecutor(); 238 try { 239 return executor.submit(errorHandlingTask(task, false)); 240 } 241 catch (RejectedExecutionException ex) { 242 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 243 } 244 } 245 246 @Override 247 public <T> Future<T> submit(Callable<T> task) { 248 ExecutorService executor = getScheduledExecutor(); 249 try { 250 Callable<T> taskToUse = task; 251 if (this.errorHandler != null) { 252 taskToUse = new DelegatingErrorHandlingCallable<T>(task, this.errorHandler); 253 } 254 return executor.submit(taskToUse); 255 } 256 catch (RejectedExecutionException ex) { 257 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 258 } 259 } 260 261 @Override 262 public ListenableFuture<?> submitListenable(Runnable task) { 263 ExecutorService executor = getScheduledExecutor(); 264 try { 265 ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null); 266 executor.execute(errorHandlingTask(future, false)); 267 return future; 268 } 269 catch (RejectedExecutionException ex) { 270 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 271 } 272 } 273 274 @Override 275 public <T> ListenableFuture<T> submitListenable(Callable<T> task) { 276 ExecutorService executor = getScheduledExecutor(); 277 try { 278 ListenableFutureTask<T> future = new ListenableFutureTask<T>(task); 279 executor.execute(errorHandlingTask(future, false)); 280 return future; 281 } 282 catch (RejectedExecutionException ex) { 283 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 284 } 285 } 286 287 @Override 288 public boolean prefersShortLivedTasks() { 289 return true; 290 } 291 292 293 // TaskScheduler implementation 294 295 @Override 296 public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) { 297 ScheduledExecutorService executor = getScheduledExecutor(); 298 try { 299 ErrorHandler errorHandler = 300 (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true)); 301 return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule(); 302 } 303 catch (RejectedExecutionException ex) { 304 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 305 } 306 } 307 308 @Override 309 public ScheduledFuture<?> schedule(Runnable task, Date startTime) { 310 ScheduledExecutorService executor = getScheduledExecutor(); 311 long initialDelay = startTime.getTime() - System.currentTimeMillis(); 312 try { 313 return executor.schedule(errorHandlingTask(task, false), initialDelay, TimeUnit.MILLISECONDS); 314 } 315 catch (RejectedExecutionException ex) { 316 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 317 } 318 } 319 320 @Override 321 public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period) { 322 ScheduledExecutorService executor = getScheduledExecutor(); 323 long initialDelay = startTime.getTime() - System.currentTimeMillis(); 324 try { 325 return executor.scheduleAtFixedRate(errorHandlingTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS); 326 } 327 catch (RejectedExecutionException ex) { 328 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 329 } 330 } 331 332 @Override 333 public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) { 334 ScheduledExecutorService executor = getScheduledExecutor(); 335 try { 336 return executor.scheduleAtFixedRate(errorHandlingTask(task, true), 0, period, TimeUnit.MILLISECONDS); 337 } 338 catch (RejectedExecutionException ex) { 339 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 340 } 341 } 342 343 @Override 344 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay) { 345 ScheduledExecutorService executor = getScheduledExecutor(); 346 long initialDelay = startTime.getTime() - System.currentTimeMillis(); 347 try { 348 return executor.scheduleWithFixedDelay(errorHandlingTask(task, true), initialDelay, delay, TimeUnit.MILLISECONDS); 349 } 350 catch (RejectedExecutionException ex) { 351 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 352 } 353 } 354 355 @Override 356 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay) { 357 ScheduledExecutorService executor = getScheduledExecutor(); 358 try { 359 return executor.scheduleWithFixedDelay(errorHandlingTask(task, true), 0, delay, TimeUnit.MILLISECONDS); 360 } 361 catch (RejectedExecutionException ex) { 362 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 363 } 364 } 365 366 367 private Runnable errorHandlingTask(Runnable task, boolean isRepeatingTask) { 368 return TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask); 369 } 370 371 372 private static class DelegatingErrorHandlingCallable<V> implements Callable<V> { 373 374 private final Callable<V> delegate; 375 376 private final ErrorHandler errorHandler; 377 378 public DelegatingErrorHandlingCallable(Callable<V> delegate, ErrorHandler errorHandler) { 379 this.delegate = delegate; 380 this.errorHandler = errorHandler; 381 } 382 383 @Override 384 public V call() throws Exception { 385 try { 386 return this.delegate.call(); 387 } 388 catch (Throwable t) { 389 this.errorHandler.handleError(t); 390 return null; 391 } 392 } 393 } 394 395}