001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.scheduling.concurrent;
018
019import java.util.Date;
020import java.util.concurrent.Callable;
021import java.util.concurrent.Executor;
022import java.util.concurrent.ExecutorService;
023import java.util.concurrent.Future;
024import java.util.concurrent.RejectedExecutionException;
025import java.util.concurrent.RejectedExecutionHandler;
026import java.util.concurrent.ScheduledExecutorService;
027import java.util.concurrent.ScheduledFuture;
028import java.util.concurrent.ScheduledThreadPoolExecutor;
029import java.util.concurrent.ThreadFactory;
030import java.util.concurrent.TimeUnit;
031
032import org.springframework.core.task.AsyncListenableTaskExecutor;
033import org.springframework.core.task.TaskRejectedException;
034import org.springframework.lang.UsesJava7;
035import org.springframework.scheduling.SchedulingTaskExecutor;
036import org.springframework.scheduling.TaskScheduler;
037import org.springframework.scheduling.Trigger;
038import org.springframework.scheduling.support.TaskUtils;
039import org.springframework.util.Assert;
040import org.springframework.util.ClassUtils;
041import org.springframework.util.ErrorHandler;
042import org.springframework.util.concurrent.ListenableFuture;
043import org.springframework.util.concurrent.ListenableFutureTask;
044
045/**
046 * Implementation of Spring's {@link TaskScheduler} interface, wrapping
047 * a native {@link java.util.concurrent.ScheduledThreadPoolExecutor}.
048 *
049 * @author Juergen Hoeller
050 * @author Mark Fisher
051 * @since 3.0
052 * @see #setPoolSize
053 * @see #setRemoveOnCancelPolicy
054 * @see #setThreadFactory
055 * @see #setErrorHandler
056 */
057@SuppressWarnings("serial")
058public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
059                implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler {
060
061        // ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(boolean) only available on JDK 7+
062        private static final boolean setRemoveOnCancelPolicyAvailable =
063                        ClassUtils.hasMethod(ScheduledThreadPoolExecutor.class, "setRemoveOnCancelPolicy", boolean.class);
064
065
066        private volatile int poolSize = 1;
067
068        private volatile boolean removeOnCancelPolicy = false;
069
070        private volatile ErrorHandler errorHandler;
071
072        private volatile ScheduledExecutorService scheduledExecutor;
073
074
075        /**
076         * Set the ScheduledExecutorService's pool size.
077         * Default is 1.
078         * <p><b>This setting can be modified at runtime, for example through JMX.</b>
079         */
080        public void setPoolSize(int poolSize) {
081                Assert.isTrue(poolSize > 0, "'poolSize' must be 1 or higher");
082                this.poolSize = poolSize;
083                if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) {
084                        ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setCorePoolSize(poolSize);
085                }
086        }
087
088        /**
089         * Set the remove-on-cancel mode on {@link ScheduledThreadPoolExecutor} (JDK 7+).
090         * <p>Default is {@code false}. If set to {@code true}, the target executor will be
091         * switched into remove-on-cancel mode (if possible, with a soft fallback otherwise).
092         * <p><b>This setting can be modified at runtime, for example through JMX.</b>
093         */
094        @UsesJava7
095        public void setRemoveOnCancelPolicy(boolean removeOnCancelPolicy) {
096                this.removeOnCancelPolicy = removeOnCancelPolicy;
097                if (setRemoveOnCancelPolicyAvailable && this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) {
098                        ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(removeOnCancelPolicy);
099                }
100                else if (removeOnCancelPolicy && this.scheduledExecutor != null) {
101                        logger.info("Could not apply remove-on-cancel policy - not a Java 7+ ScheduledThreadPoolExecutor");
102                }
103        }
104
105        /**
106         * Set a custom {@link ErrorHandler} strategy.
107         */
108        public void setErrorHandler(ErrorHandler errorHandler) {
109                this.errorHandler = errorHandler;
110        }
111
112
113        @UsesJava7
114        @Override
115        protected ExecutorService initializeExecutor(
116                        ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
117
118                this.scheduledExecutor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);
119
120                if (this.removeOnCancelPolicy) {
121                        if (setRemoveOnCancelPolicyAvailable && this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) {
122                                ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(true);
123                        }
124                        else {
125                                logger.info("Could not apply remove-on-cancel policy - not a Java 7+ ScheduledThreadPoolExecutor");
126                        }
127                }
128
129                return this.scheduledExecutor;
130        }
131
132        /**
133         * Create a new {@link ScheduledExecutorService} instance.
134         * <p>The default implementation creates a {@link ScheduledThreadPoolExecutor}.
135         * Can be overridden in subclasses to provide custom {@link ScheduledExecutorService} instances.
136         * @param poolSize the specified pool size
137         * @param threadFactory the ThreadFactory to use
138         * @param rejectedExecutionHandler the RejectedExecutionHandler to use
139         * @return a new ScheduledExecutorService instance
140         * @see #afterPropertiesSet()
141         * @see java.util.concurrent.ScheduledThreadPoolExecutor
142         */
143        protected ScheduledExecutorService createExecutor(
144                        int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
145
146                return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler);
147        }
148
149        /**
150         * Return the underlying ScheduledExecutorService for native access.
151         * @return the underlying ScheduledExecutorService (never {@code null})
152         * @throws IllegalStateException if the ThreadPoolTaskScheduler hasn't been initialized yet
153         */
154        public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException {
155                Assert.state(this.scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized");
156                return this.scheduledExecutor;
157        }
158
159        /**
160         * Return the underlying ScheduledThreadPoolExecutor, if available.
161         * @return the underlying ScheduledExecutorService (never {@code null})
162         * @throws IllegalStateException if the ThreadPoolTaskScheduler hasn't been initialized yet
163         * or if the underlying ScheduledExecutorService isn't a ScheduledThreadPoolExecutor
164         * @see #getScheduledExecutor()
165         */
166        public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() throws IllegalStateException {
167                Assert.state(this.scheduledExecutor instanceof ScheduledThreadPoolExecutor,
168                                "No ScheduledThreadPoolExecutor available");
169                return (ScheduledThreadPoolExecutor) this.scheduledExecutor;
170        }
171
172        /**
173         * Return the current pool size.
174         * <p>Requires an underlying {@link ScheduledThreadPoolExecutor}.
175         * @see #getScheduledThreadPoolExecutor()
176         * @see java.util.concurrent.ScheduledThreadPoolExecutor#getPoolSize()
177         */
178        public int getPoolSize() {
179                if (this.scheduledExecutor == null) {
180                        // Not initialized yet: assume initial pool size.
181                        return this.poolSize;
182                }
183                return getScheduledThreadPoolExecutor().getPoolSize();
184        }
185
186        /**
187         * Return the current setting for the remove-on-cancel mode.
188         * <p>Requires an underlying {@link ScheduledThreadPoolExecutor}.
189         */
190        @UsesJava7
191        public boolean isRemoveOnCancelPolicy() {
192                if (!setRemoveOnCancelPolicyAvailable) {
193                        return false;
194                }
195                if (this.scheduledExecutor == null) {
196                        // Not initialized yet: return our setting for the time being.
197                        return this.removeOnCancelPolicy;
198                }
199                return getScheduledThreadPoolExecutor().getRemoveOnCancelPolicy();
200        }
201
202        /**
203         * Return the number of currently active threads.
204         * <p>Requires an underlying {@link ScheduledThreadPoolExecutor}.
205         * @see #getScheduledThreadPoolExecutor()
206         * @see java.util.concurrent.ScheduledThreadPoolExecutor#getActiveCount()
207         */
208        public int getActiveCount() {
209                if (this.scheduledExecutor == null) {
210                        // Not initialized yet: assume no active threads.
211                        return 0;
212                }
213                return getScheduledThreadPoolExecutor().getActiveCount();
214        }
215
216
217        // SchedulingTaskExecutor implementation
218
219        @Override
220        public void execute(Runnable task) {
221                Executor executor = getScheduledExecutor();
222                try {
223                        executor.execute(errorHandlingTask(task, false));
224                }
225                catch (RejectedExecutionException ex) {
226                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
227                }
228        }
229
230        @Override
231        public void execute(Runnable task, long startTimeout) {
232                execute(task);
233        }
234
235        @Override
236        public Future<?> submit(Runnable task) {
237                ExecutorService executor = getScheduledExecutor();
238                try {
239                        return executor.submit(errorHandlingTask(task, false));
240                }
241                catch (RejectedExecutionException ex) {
242                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
243                }
244        }
245
246        @Override
247        public <T> Future<T> submit(Callable<T> task) {
248                ExecutorService executor = getScheduledExecutor();
249                try {
250                        Callable<T> taskToUse = task;
251                        if (this.errorHandler != null) {
252                                taskToUse = new DelegatingErrorHandlingCallable<T>(task, this.errorHandler);
253                        }
254                        return executor.submit(taskToUse);
255                }
256                catch (RejectedExecutionException ex) {
257                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
258                }
259        }
260
261        @Override
262        public ListenableFuture<?> submitListenable(Runnable task) {
263                ExecutorService executor = getScheduledExecutor();
264                try {
265                        ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null);
266                        executor.execute(errorHandlingTask(future, false));
267                        return future;
268                }
269                catch (RejectedExecutionException ex) {
270                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
271                }
272        }
273
274        @Override
275        public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
276                ExecutorService executor = getScheduledExecutor();
277                try {
278                        ListenableFutureTask<T> future = new ListenableFutureTask<T>(task);
279                        executor.execute(errorHandlingTask(future, false));
280                        return future;
281                }
282                catch (RejectedExecutionException ex) {
283                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
284                }
285        }
286
287        @Override
288        public boolean prefersShortLivedTasks() {
289                return true;
290        }
291
292
293        // TaskScheduler implementation
294
295        @Override
296        public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
297                ScheduledExecutorService executor = getScheduledExecutor();
298                try {
299                        ErrorHandler errorHandler =
300                                        (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
301                        return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule();
302                }
303                catch (RejectedExecutionException ex) {
304                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
305                }
306        }
307
308        @Override
309        public ScheduledFuture<?> schedule(Runnable task, Date startTime) {
310                ScheduledExecutorService executor = getScheduledExecutor();
311                long initialDelay = startTime.getTime() - System.currentTimeMillis();
312                try {
313                        return executor.schedule(errorHandlingTask(task, false), initialDelay, TimeUnit.MILLISECONDS);
314                }
315                catch (RejectedExecutionException ex) {
316                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
317                }
318        }
319
320        @Override
321        public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period) {
322                ScheduledExecutorService executor = getScheduledExecutor();
323                long initialDelay = startTime.getTime() - System.currentTimeMillis();
324                try {
325                        return executor.scheduleAtFixedRate(errorHandlingTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS);
326                }
327                catch (RejectedExecutionException ex) {
328                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
329                }
330        }
331
332        @Override
333        public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) {
334                ScheduledExecutorService executor = getScheduledExecutor();
335                try {
336                        return executor.scheduleAtFixedRate(errorHandlingTask(task, true), 0, period, TimeUnit.MILLISECONDS);
337                }
338                catch (RejectedExecutionException ex) {
339                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
340                }
341        }
342
343        @Override
344        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay) {
345                ScheduledExecutorService executor = getScheduledExecutor();
346                long initialDelay = startTime.getTime() - System.currentTimeMillis();
347                try {
348                        return executor.scheduleWithFixedDelay(errorHandlingTask(task, true), initialDelay, delay, TimeUnit.MILLISECONDS);
349                }
350                catch (RejectedExecutionException ex) {
351                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
352                }
353        }
354
355        @Override
356        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay) {
357                ScheduledExecutorService executor = getScheduledExecutor();
358                try {
359                        return executor.scheduleWithFixedDelay(errorHandlingTask(task, true), 0, delay, TimeUnit.MILLISECONDS);
360                }
361                catch (RejectedExecutionException ex) {
362                        throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
363                }
364        }
365
366
367        private Runnable errorHandlingTask(Runnable task, boolean isRepeatingTask) {
368                return TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask);
369        }
370
371
372        private static class DelegatingErrorHandlingCallable<V> implements Callable<V> {
373
374                private final Callable<V> delegate;
375
376                private final ErrorHandler errorHandler;
377
378                public DelegatingErrorHandlingCallable(Callable<V> delegate, ErrorHandler errorHandler) {
379                        this.delegate = delegate;
380                        this.errorHandler = errorHandler;
381                }
382
383                @Override
384                public V call() throws Exception {
385                        try {
386                                return this.delegate.call();
387                        }
388                        catch (Throwable t) {
389                                this.errorHandler.handleError(t);
390                                return null;
391                        }
392                }
393        }
394
395}