001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.scheduling.concurrent; 018 019import java.util.Date; 020import java.util.Map; 021import java.util.concurrent.Callable; 022import java.util.concurrent.Executor; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.Future; 025import java.util.concurrent.RejectedExecutionException; 026import java.util.concurrent.RejectedExecutionHandler; 027import java.util.concurrent.ScheduledExecutorService; 028import java.util.concurrent.ScheduledFuture; 029import java.util.concurrent.ScheduledThreadPoolExecutor; 030import java.util.concurrent.ThreadFactory; 031import java.util.concurrent.TimeUnit; 032 033import org.springframework.core.task.AsyncListenableTaskExecutor; 034import org.springframework.core.task.TaskRejectedException; 035import org.springframework.lang.Nullable; 036import org.springframework.scheduling.SchedulingTaskExecutor; 037import org.springframework.scheduling.TaskScheduler; 038import org.springframework.scheduling.Trigger; 039import org.springframework.scheduling.support.TaskUtils; 040import org.springframework.util.Assert; 041import org.springframework.util.ConcurrentReferenceHashMap; 042import org.springframework.util.ErrorHandler; 043import org.springframework.util.concurrent.ListenableFuture; 044import org.springframework.util.concurrent.ListenableFutureTask; 045 046/** 047 * Implementation of Spring's {@link TaskScheduler} interface, wrapping 048 * a native {@link java.util.concurrent.ScheduledThreadPoolExecutor}. 049 * 050 * @author Juergen Hoeller 051 * @author Mark Fisher 052 * @since 3.0 053 * @see #setPoolSize 054 * @see #setRemoveOnCancelPolicy 055 * @see #setThreadFactory 056 * @see #setErrorHandler 057 */ 058@SuppressWarnings("serial") 059public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport 060 implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler { 061 062 private volatile int poolSize = 1; 063 064 private volatile boolean removeOnCancelPolicy = false; 065 066 @Nullable 067 private volatile ErrorHandler errorHandler; 068 069 @Nullable 070 private ScheduledExecutorService scheduledExecutor; 071 072 // Underlying ScheduledFutureTask to user-level ListenableFuture handle, if any 073 private final Map<Object, ListenableFuture<?>> listenableFutureMap = 074 new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); 075 076 077 /** 078 * Set the ScheduledExecutorService's pool size. 079 * Default is 1. 080 * <p><b>This setting can be modified at runtime, for example through JMX.</b> 081 */ 082 public void setPoolSize(int poolSize) { 083 Assert.isTrue(poolSize > 0, "'poolSize' must be 1 or higher"); 084 this.poolSize = poolSize; 085 if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) { 086 ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setCorePoolSize(poolSize); 087 } 088 } 089 090 /** 091 * Set the remove-on-cancel mode on {@link ScheduledThreadPoolExecutor}. 092 * <p>Default is {@code false}. If set to {@code true}, the target executor will be 093 * switched into remove-on-cancel mode (if possible, with a soft fallback otherwise). 094 * <p><b>This setting can be modified at runtime, for example through JMX.</b> 095 */ 096 public void setRemoveOnCancelPolicy(boolean removeOnCancelPolicy) { 097 this.removeOnCancelPolicy = removeOnCancelPolicy; 098 if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) { 099 ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(removeOnCancelPolicy); 100 } 101 else if (removeOnCancelPolicy && this.scheduledExecutor != null) { 102 logger.debug("Could not apply remove-on-cancel policy - not a ScheduledThreadPoolExecutor"); 103 } 104 } 105 106 /** 107 * Set a custom {@link ErrorHandler} strategy. 108 */ 109 public void setErrorHandler(ErrorHandler errorHandler) { 110 this.errorHandler = errorHandler; 111 } 112 113 114 @Override 115 protected ExecutorService initializeExecutor( 116 ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { 117 118 this.scheduledExecutor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler); 119 120 if (this.removeOnCancelPolicy) { 121 if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) { 122 ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(true); 123 } 124 else { 125 logger.debug("Could not apply remove-on-cancel policy - not a ScheduledThreadPoolExecutor"); 126 } 127 } 128 129 return this.scheduledExecutor; 130 } 131 132 /** 133 * Create a new {@link ScheduledExecutorService} instance. 134 * <p>The default implementation creates a {@link ScheduledThreadPoolExecutor}. 135 * Can be overridden in subclasses to provide custom {@link ScheduledExecutorService} instances. 136 * @param poolSize the specified pool size 137 * @param threadFactory the ThreadFactory to use 138 * @param rejectedExecutionHandler the RejectedExecutionHandler to use 139 * @return a new ScheduledExecutorService instance 140 * @see #afterPropertiesSet() 141 * @see java.util.concurrent.ScheduledThreadPoolExecutor 142 */ 143 protected ScheduledExecutorService createExecutor( 144 int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { 145 146 return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler); 147 } 148 149 /** 150 * Return the underlying ScheduledExecutorService for native access. 151 * @return the underlying ScheduledExecutorService (never {@code null}) 152 * @throws IllegalStateException if the ThreadPoolTaskScheduler hasn't been initialized yet 153 */ 154 public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException { 155 Assert.state(this.scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized"); 156 return this.scheduledExecutor; 157 } 158 159 /** 160 * Return the underlying ScheduledThreadPoolExecutor, if available. 161 * @return the underlying ScheduledExecutorService (never {@code null}) 162 * @throws IllegalStateException if the ThreadPoolTaskScheduler hasn't been initialized yet 163 * or if the underlying ScheduledExecutorService isn't a ScheduledThreadPoolExecutor 164 * @see #getScheduledExecutor() 165 */ 166 public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() throws IllegalStateException { 167 Assert.state(this.scheduledExecutor instanceof ScheduledThreadPoolExecutor, 168 "No ScheduledThreadPoolExecutor available"); 169 return (ScheduledThreadPoolExecutor) this.scheduledExecutor; 170 } 171 172 /** 173 * Return the current pool size. 174 * <p>Requires an underlying {@link ScheduledThreadPoolExecutor}. 175 * @see #getScheduledThreadPoolExecutor() 176 * @see java.util.concurrent.ScheduledThreadPoolExecutor#getPoolSize() 177 */ 178 public int getPoolSize() { 179 if (this.scheduledExecutor == null) { 180 // Not initialized yet: assume initial pool size. 181 return this.poolSize; 182 } 183 return getScheduledThreadPoolExecutor().getPoolSize(); 184 } 185 186 /** 187 * Return the current setting for the remove-on-cancel mode. 188 * <p>Requires an underlying {@link ScheduledThreadPoolExecutor}. 189 */ 190 public boolean isRemoveOnCancelPolicy() { 191 if (this.scheduledExecutor == null) { 192 // Not initialized yet: return our setting for the time being. 193 return this.removeOnCancelPolicy; 194 } 195 return getScheduledThreadPoolExecutor().getRemoveOnCancelPolicy(); 196 } 197 198 /** 199 * Return the number of currently active threads. 200 * <p>Requires an underlying {@link ScheduledThreadPoolExecutor}. 201 * @see #getScheduledThreadPoolExecutor() 202 * @see java.util.concurrent.ScheduledThreadPoolExecutor#getActiveCount() 203 */ 204 public int getActiveCount() { 205 if (this.scheduledExecutor == null) { 206 // Not initialized yet: assume no active threads. 207 return 0; 208 } 209 return getScheduledThreadPoolExecutor().getActiveCount(); 210 } 211 212 213 // SchedulingTaskExecutor implementation 214 215 @Override 216 public void execute(Runnable task) { 217 Executor executor = getScheduledExecutor(); 218 try { 219 executor.execute(errorHandlingTask(task, false)); 220 } 221 catch (RejectedExecutionException ex) { 222 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 223 } 224 } 225 226 @Override 227 public void execute(Runnable task, long startTimeout) { 228 execute(task); 229 } 230 231 @Override 232 public Future<?> submit(Runnable task) { 233 ExecutorService executor = getScheduledExecutor(); 234 try { 235 return executor.submit(errorHandlingTask(task, false)); 236 } 237 catch (RejectedExecutionException ex) { 238 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 239 } 240 } 241 242 @Override 243 public <T> Future<T> submit(Callable<T> task) { 244 ExecutorService executor = getScheduledExecutor(); 245 try { 246 Callable<T> taskToUse = task; 247 ErrorHandler errorHandler = this.errorHandler; 248 if (errorHandler != null) { 249 taskToUse = new DelegatingErrorHandlingCallable<>(task, errorHandler); 250 } 251 return executor.submit(taskToUse); 252 } 253 catch (RejectedExecutionException ex) { 254 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 255 } 256 } 257 258 @Override 259 public ListenableFuture<?> submitListenable(Runnable task) { 260 ExecutorService executor = getScheduledExecutor(); 261 try { 262 ListenableFutureTask<Object> listenableFuture = new ListenableFutureTask<>(task, null); 263 executeAndTrack(executor, listenableFuture); 264 return listenableFuture; 265 } 266 catch (RejectedExecutionException ex) { 267 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 268 } 269 } 270 271 @Override 272 public <T> ListenableFuture<T> submitListenable(Callable<T> task) { 273 ExecutorService executor = getScheduledExecutor(); 274 try { 275 ListenableFutureTask<T> listenableFuture = new ListenableFutureTask<>(task); 276 executeAndTrack(executor, listenableFuture); 277 return listenableFuture; 278 } 279 catch (RejectedExecutionException ex) { 280 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 281 } 282 } 283 284 private void executeAndTrack(ExecutorService executor, ListenableFutureTask<?> listenableFuture) { 285 Future<?> scheduledFuture = executor.submit(errorHandlingTask(listenableFuture, false)); 286 this.listenableFutureMap.put(scheduledFuture, listenableFuture); 287 listenableFuture.addCallback(result -> this.listenableFutureMap.remove(scheduledFuture), 288 ex -> this.listenableFutureMap.remove(scheduledFuture)); 289 } 290 291 @Override 292 protected void cancelRemainingTask(Runnable task) { 293 super.cancelRemainingTask(task); 294 // Cancel associated user-level ListenableFuture handle as well 295 ListenableFuture<?> listenableFuture = this.listenableFutureMap.get(task); 296 if (listenableFuture != null) { 297 listenableFuture.cancel(true); 298 } 299 } 300 301 302 // TaskScheduler implementation 303 304 @Override 305 @Nullable 306 public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) { 307 ScheduledExecutorService executor = getScheduledExecutor(); 308 try { 309 ErrorHandler errorHandler = this.errorHandler; 310 if (errorHandler == null) { 311 errorHandler = TaskUtils.getDefaultErrorHandler(true); 312 } 313 return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule(); 314 } 315 catch (RejectedExecutionException ex) { 316 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 317 } 318 } 319 320 @Override 321 public ScheduledFuture<?> schedule(Runnable task, Date startTime) { 322 ScheduledExecutorService executor = getScheduledExecutor(); 323 long initialDelay = startTime.getTime() - System.currentTimeMillis(); 324 try { 325 return executor.schedule(errorHandlingTask(task, false), initialDelay, TimeUnit.MILLISECONDS); 326 } 327 catch (RejectedExecutionException ex) { 328 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 329 } 330 } 331 332 @Override 333 public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period) { 334 ScheduledExecutorService executor = getScheduledExecutor(); 335 long initialDelay = startTime.getTime() - System.currentTimeMillis(); 336 try { 337 return executor.scheduleAtFixedRate(errorHandlingTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS); 338 } 339 catch (RejectedExecutionException ex) { 340 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 341 } 342 } 343 344 @Override 345 public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) { 346 ScheduledExecutorService executor = getScheduledExecutor(); 347 try { 348 return executor.scheduleAtFixedRate(errorHandlingTask(task, true), 0, period, TimeUnit.MILLISECONDS); 349 } 350 catch (RejectedExecutionException ex) { 351 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 352 } 353 } 354 355 @Override 356 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay) { 357 ScheduledExecutorService executor = getScheduledExecutor(); 358 long initialDelay = startTime.getTime() - System.currentTimeMillis(); 359 try { 360 return executor.scheduleWithFixedDelay(errorHandlingTask(task, true), initialDelay, delay, TimeUnit.MILLISECONDS); 361 } 362 catch (RejectedExecutionException ex) { 363 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 364 } 365 } 366 367 @Override 368 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay) { 369 ScheduledExecutorService executor = getScheduledExecutor(); 370 try { 371 return executor.scheduleWithFixedDelay(errorHandlingTask(task, true), 0, delay, TimeUnit.MILLISECONDS); 372 } 373 catch (RejectedExecutionException ex) { 374 throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); 375 } 376 } 377 378 379 private Runnable errorHandlingTask(Runnable task, boolean isRepeatingTask) { 380 return TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask); 381 } 382 383 384 private static class DelegatingErrorHandlingCallable<V> implements Callable<V> { 385 386 private final Callable<V> delegate; 387 388 private final ErrorHandler errorHandler; 389 390 public DelegatingErrorHandlingCallable(Callable<V> delegate, ErrorHandler errorHandler) { 391 this.delegate = delegate; 392 this.errorHandler = errorHandler; 393 } 394 395 @Override 396 @Nullable 397 public V call() throws Exception { 398 try { 399 return this.delegate.call(); 400 } 401 catch (Throwable ex) { 402 this.errorHandler.handleError(ex); 403 return null; 404 } 405 } 406 } 407 408}