001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.scheduling.concurrent;
018
019import java.util.Date;
020import java.util.Map;
021import java.util.concurrent.Callable;
022import java.util.concurrent.Executor;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.Future;
025import java.util.concurrent.RejectedExecutionException;
026import java.util.concurrent.RejectedExecutionHandler;
027import java.util.concurrent.ScheduledExecutorService;
028import java.util.concurrent.ScheduledFuture;
029import java.util.concurrent.ScheduledThreadPoolExecutor;
030import java.util.concurrent.ThreadFactory;
031import java.util.concurrent.TimeUnit;
032
033import org.springframework.core.task.AsyncListenableTaskExecutor;
034import org.springframework.core.task.TaskRejectedException;
035import org.springframework.lang.Nullable;
036import org.springframework.scheduling.SchedulingTaskExecutor;
037import org.springframework.scheduling.TaskScheduler;
038import org.springframework.scheduling.Trigger;
039import org.springframework.scheduling.support.TaskUtils;
040import org.springframework.util.Assert;
041import org.springframework.util.ConcurrentReferenceHashMap;
042import org.springframework.util.ErrorHandler;
043import org.springframework.util.concurrent.ListenableFuture;
044import org.springframework.util.concurrent.ListenableFutureTask;
045
046/**
047 * Implementation of Spring's {@link TaskScheduler} interface, wrapping
048 * a native {@link java.util.concurrent.ScheduledThreadPoolExecutor}.
049 *
050 * @author Juergen Hoeller
051 * @author Mark Fisher
052 * @since 3.0
053 * @see #setPoolSize
054 * @see #setRemoveOnCancelPolicy
055 * @see #setThreadFactory
056 * @see #setErrorHandler
057 */
058@SuppressWarnings("serial")
059public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
060                implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler {
061
062        private volatile int poolSize = 1;
063
064        private volatile boolean removeOnCancelPolicy = false;
065
066        @Nullable
067        private volatile ErrorHandler errorHandler;
068
069        @Nullable
070        private ScheduledExecutorService scheduledExecutor;
071
072        // Underlying ScheduledFutureTask to user-level ListenableFuture handle, if any
073        private final Map<Object, ListenableFuture<?>> listenableFutureMap =
074                        new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
075
076
077        /**
078         * Set the ScheduledExecutorService's pool size.
079         * Default is 1.
080         * <p><b>This setting can be modified at runtime, for example through JMX.</b>
081         */
082        public void setPoolSize(int poolSize) {
083                Assert.isTrue(poolSize > 0, "'poolSize' must be 1 or higher");
084                this.poolSize = poolSize;
085                if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) {
086                        ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setCorePoolSize(poolSize);
087                }
088        }
089
090        /**
091         * Set the remove-on-cancel mode on {@link ScheduledThreadPoolExecutor}.
092         * <p>Default is {@code false}. If set to {@code true}, the target executor will be
093         * switched into remove-on-cancel mode (if possible, with a soft fallback otherwise).
094         * <p><b>This setting can be modified at runtime, for example through JMX.</b>
095         */
096        public void setRemoveOnCancelPolicy(boolean removeOnCancelPolicy) {
097                this.removeOnCancelPolicy = removeOnCancelPolicy;
098                if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) {
099                        ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(removeOnCancelPolicy);
100                }
101                else if (removeOnCancelPolicy && this.scheduledExecutor != null) {
102                        logger.debug("Could not apply remove-on-cancel policy - not a ScheduledThreadPoolExecutor");
103                }
104        }
105
106        /**
107         * Set a custom {@link ErrorHandler} strategy.
108         */
109        public void setErrorHandler(ErrorHandler errorHandler) {
110                this.errorHandler = errorHandler;
111        }
112
113
114        @Override
115        protected ExecutorService initializeExecutor(
116                        ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
117
118                this.scheduledExecutor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);
119
120                if (this.removeOnCancelPolicy) {
121                        if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) {
122                                ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(true);
123                        }
124                        else {
125                                logger.debug("Could not apply remove-on-cancel policy - not a ScheduledThreadPoolExecutor");
126                        }
127                }
128
129                return this.scheduledExecutor;
130        }
131
132        /**
133         * Create a new {@link ScheduledExecutorService} instance.
134         * <p>The default implementation creates a {@link ScheduledThreadPoolExecutor}.
135         * Can be overridden in subclasses to provide custom {@link ScheduledExecutorService} instances.
136         * @param poolSize the specified pool size
137         * @param threadFactory the ThreadFactory to use
138         * @param rejectedExecutionHandler the RejectedExecutionHandler to use
139         * @return a new ScheduledExecutorService instance
140         * @see #afterPropertiesSet()
141         * @see java.util.concurrent.ScheduledThreadPoolExecutor
142         */
143        protected ScheduledExecutorService createExecutor(
144                        int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
145
146                return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler);
147        }
148
149        /**
150         * Return the underlying ScheduledExecutorService for native access.
151         * @return the underlying ScheduledExecutorService (never {@code null})
152         * @throws IllegalStateException if the ThreadPoolTaskScheduler hasn't been initialized yet
153         */
154        public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException {
155                Assert.state(this.scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized");
156                return this.scheduledExecutor;
157        }
158
159        /**
160         * Return the underlying ScheduledThreadPoolExecutor, if available.
161         * @return the underlying ScheduledExecutorService (never {@code null})
162         * @throws IllegalStateException if the ThreadPoolTaskScheduler hasn't been initialized yet
163         * or if the underlying ScheduledExecutorService isn't a ScheduledThreadPoolExecutor
164         * @see #getScheduledExecutor()
165         */
166        public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() throws IllegalStateException {
167                Assert.state(this.scheduledExecutor instanceof ScheduledThreadPoolExecutor,
168                                "No ScheduledThreadPoolExecutor available");
169                return (ScheduledThreadPoolExecutor) this.scheduledExecutor;
170        }
171
172        /**
173         * Return the current pool size.
174         * <p>Requires an underlying {@link ScheduledThreadPoolExecutor}.
175         * @see #getScheduledThreadPoolExecutor()
176         * @see java.util.concurrent.ScheduledThreadPoolExecutor#getPoolSize()
177         */
178        public int getPoolSize() {
179                if (this.scheduledExecutor == null) {
180                        // Not initialized yet: assume initial pool size.
181                        return this.poolSize;
182                }
183                return getScheduledThreadPoolExecutor().getPoolSize();
184        }
185
186        /**
187         * Return the current setting for the remove-on-cancel mode.
188         * <p>Requires an underlying {@link ScheduledThreadPoolExecutor}.
189         */
190        public boolean isRemoveOnCancelPolicy() {
191                if (this.scheduledExecutor == null) {
192                        // Not initialized yet: return our setting for the time being.
193                        return this.removeOnCancelPolicy;
194                }
195                return getScheduledThreadPoolExecutor().getRemoveOnCancelPolicy();
196        }
197
198        /**
199         * Return the number of currently active threads.
200         * <p>Requires an underlying {@link ScheduledThreadPoolExecutor}.
201         * @see #getScheduledThreadPoolExecutor()
202         * @see java.util.concurrent.ScheduledThreadPoolExecutor#getActiveCount()
203         */
204        public int getActiveCount() {
205                if (this.scheduledExecutor == null) {
206                        // Not initialized yet: assume no active threads.
207                        return 0;
208                }
209                return getScheduledThreadPoolExecutor().getActiveCount();
210        }
211
212
213        // SchedulingTaskExecutor implementation
214
215        @Override
216        public void execute(Runnable task) {
217                Executor executor = getScheduledExecutor();
218                try {
219                        executor.execute(errorHandlingTask(task, false));
220                }
221                catch (RejectedExecutionException ex) {
222                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
223                }
224        }
225
226        @Override
227        public void execute(Runnable task, long startTimeout) {
228                execute(task);
229        }
230
231        @Override
232        public Future<?> submit(Runnable task) {
233                ExecutorService executor = getScheduledExecutor();
234                try {
235                        return executor.submit(errorHandlingTask(task, false));
236                }
237                catch (RejectedExecutionException ex) {
238                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
239                }
240        }
241
242        @Override
243        public <T> Future<T> submit(Callable<T> task) {
244                ExecutorService executor = getScheduledExecutor();
245                try {
246                        Callable<T> taskToUse = task;
247                        ErrorHandler errorHandler = this.errorHandler;
248                        if (errorHandler != null) {
249                                taskToUse = new DelegatingErrorHandlingCallable<>(task, errorHandler);
250                        }
251                        return executor.submit(taskToUse);
252                }
253                catch (RejectedExecutionException ex) {
254                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
255                }
256        }
257
258        @Override
259        public ListenableFuture<?> submitListenable(Runnable task) {
260                ExecutorService executor = getScheduledExecutor();
261                try {
262                        ListenableFutureTask<Object> listenableFuture = new ListenableFutureTask<>(task, null);
263                        executeAndTrack(executor, listenableFuture);
264                        return listenableFuture;
265                }
266                catch (RejectedExecutionException ex) {
267                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
268                }
269        }
270
271        @Override
272        public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
273                ExecutorService executor = getScheduledExecutor();
274                try {
275                        ListenableFutureTask<T> listenableFuture = new ListenableFutureTask<>(task);
276                        executeAndTrack(executor, listenableFuture);
277                        return listenableFuture;
278                }
279                catch (RejectedExecutionException ex) {
280                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
281                }
282        }
283
284        private void executeAndTrack(ExecutorService executor, ListenableFutureTask<?> listenableFuture) {
285                Future<?> scheduledFuture = executor.submit(errorHandlingTask(listenableFuture, false));
286                this.listenableFutureMap.put(scheduledFuture, listenableFuture);
287                listenableFuture.addCallback(result -> this.listenableFutureMap.remove(scheduledFuture),
288                                ex -> this.listenableFutureMap.remove(scheduledFuture));
289        }
290
291        @Override
292        protected void cancelRemainingTask(Runnable task) {
293                super.cancelRemainingTask(task);
294                // Cancel associated user-level ListenableFuture handle as well
295                ListenableFuture<?> listenableFuture = this.listenableFutureMap.get(task);
296                if (listenableFuture != null) {
297                        listenableFuture.cancel(true);
298                }
299        }
300
301
302        // TaskScheduler implementation
303
304        @Override
305        @Nullable
306        public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
307                ScheduledExecutorService executor = getScheduledExecutor();
308                try {
309                        ErrorHandler errorHandler = this.errorHandler;
310                        if (errorHandler == null) {
311                                errorHandler = TaskUtils.getDefaultErrorHandler(true);
312                        }
313                        return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule();
314                }
315                catch (RejectedExecutionException ex) {
316                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
317                }
318        }
319
320        @Override
321        public ScheduledFuture<?> schedule(Runnable task, Date startTime) {
322                ScheduledExecutorService executor = getScheduledExecutor();
323                long initialDelay = startTime.getTime() - System.currentTimeMillis();
324                try {
325                        return executor.schedule(errorHandlingTask(task, false), initialDelay, TimeUnit.MILLISECONDS);
326                }
327                catch (RejectedExecutionException ex) {
328                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
329                }
330        }
331
332        @Override
333        public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period) {
334                ScheduledExecutorService executor = getScheduledExecutor();
335                long initialDelay = startTime.getTime() - System.currentTimeMillis();
336                try {
337                        return executor.scheduleAtFixedRate(errorHandlingTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS);
338                }
339                catch (RejectedExecutionException ex) {
340                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
341                }
342        }
343
344        @Override
345        public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) {
346                ScheduledExecutorService executor = getScheduledExecutor();
347                try {
348                        return executor.scheduleAtFixedRate(errorHandlingTask(task, true), 0, period, TimeUnit.MILLISECONDS);
349                }
350                catch (RejectedExecutionException ex) {
351                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
352                }
353        }
354
355        @Override
356        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay) {
357                ScheduledExecutorService executor = getScheduledExecutor();
358                long initialDelay = startTime.getTime() - System.currentTimeMillis();
359                try {
360                        return executor.scheduleWithFixedDelay(errorHandlingTask(task, true), initialDelay, delay, TimeUnit.MILLISECONDS);
361                }
362                catch (RejectedExecutionException ex) {
363                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
364                }
365        }
366
367        @Override
368        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay) {
369                ScheduledExecutorService executor = getScheduledExecutor();
370                try {
371                        return executor.scheduleWithFixedDelay(errorHandlingTask(task, true), 0, delay, TimeUnit.MILLISECONDS);
372                }
373                catch (RejectedExecutionException ex) {
374                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
375                }
376        }
377
378
379        private Runnable errorHandlingTask(Runnable task, boolean isRepeatingTask) {
380                return TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask);
381        }
382
383
384        private static class DelegatingErrorHandlingCallable<V> implements Callable<V> {
385
386                private final Callable<V> delegate;
387
388                private final ErrorHandler errorHandler;
389
390                public DelegatingErrorHandlingCallable(Callable<V> delegate, ErrorHandler errorHandler) {
391                        this.delegate = delegate;
392                        this.errorHandler = errorHandler;
393                }
394
395                @Override
396                @Nullable
397                public V call() throws Exception {
398                        try {
399                                return this.delegate.call();
400                        }
401                        catch (Throwable ex) {
402                                this.errorHandler.handleError(ex);
403                                return null;
404                        }
405                }
406        }
407
408}