001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.core.task.support;
018
019import java.util.concurrent.Callable;
020import java.util.concurrent.Executor;
021import java.util.concurrent.ExecutorService;
022import java.util.concurrent.Future;
023import java.util.concurrent.FutureTask;
024import java.util.concurrent.RejectedExecutionException;
025
026import org.springframework.core.task.AsyncListenableTaskExecutor;
027import org.springframework.core.task.TaskDecorator;
028import org.springframework.core.task.TaskRejectedException;
029import org.springframework.util.Assert;
030import org.springframework.util.concurrent.ListenableFuture;
031import org.springframework.util.concurrent.ListenableFutureTask;
032
033/**
034 * Adapter that takes a JDK {@code java.util.concurrent.Executor} and
035 * exposes a Spring {@link org.springframework.core.task.TaskExecutor} for it.
036 * Also detects an extended {@code java.util.concurrent.ExecutorService}, adapting
037 * the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly.
038 *
039 * @author Juergen Hoeller
040 * @since 3.0
041 * @see java.util.concurrent.Executor
042 * @see java.util.concurrent.ExecutorService
043 * @see java.util.concurrent.Executors
044 */
045public class TaskExecutorAdapter implements AsyncListenableTaskExecutor {
046
047        private final Executor concurrentExecutor;
048
049        private TaskDecorator taskDecorator;
050
051
052        /**
053         * Create a new TaskExecutorAdapter,
054         * using the given JDK concurrent executor.
055         * @param concurrentExecutor the JDK concurrent executor to delegate to
056         */
057        public TaskExecutorAdapter(Executor concurrentExecutor) {
058                Assert.notNull(concurrentExecutor, "Executor must not be null");
059                this.concurrentExecutor = concurrentExecutor;
060        }
061
062
063        /**
064         * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
065         * about to be executed.
066         * <p>Note that such a decorator is not necessarily being applied to the
067         * user-supplied {@code Runnable}/{@code Callable} but rather to the actual
068         * execution callback (which may be a wrapper around the user-supplied task).
069         * <p>The primary use case is to set some execution context around the task's
070         * invocation, or to provide some monitoring/statistics for task execution.
071         * <p><b>NOTE:</b> Exception handling in {@code TaskDecorator} implementations
072         * is limited to plain {@code Runnable} execution via {@code execute} calls.
073         * In case of {@code #submit} calls, the exposed {@code Runnable} will be a
074         * {@code FutureTask} which does not propagate any exceptions; you might
075         * have to cast it and call {@code Future#get} to evaluate exceptions.
076         * @since 4.3
077         */
078        public final void setTaskDecorator(TaskDecorator taskDecorator) {
079                this.taskDecorator = taskDecorator;
080        }
081
082
083        /**
084         * Delegates to the specified JDK concurrent executor.
085         * @see java.util.concurrent.Executor#execute(Runnable)
086         */
087        @Override
088        public void execute(Runnable task) {
089                try {
090                        doExecute(this.concurrentExecutor, this.taskDecorator, task);
091                }
092                catch (RejectedExecutionException ex) {
093                        throw new TaskRejectedException(
094                                        "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
095                }
096        }
097
098        @Override
099        public void execute(Runnable task, long startTimeout) {
100                execute(task);
101        }
102
103        @Override
104        public Future<?> submit(Runnable task) {
105                try {
106                        if (this.taskDecorator == null && this.concurrentExecutor instanceof ExecutorService) {
107                                return ((ExecutorService) this.concurrentExecutor).submit(task);
108                        }
109                        else {
110                                FutureTask<Object> future = new FutureTask<Object>(task, null);
111                                doExecute(this.concurrentExecutor, this.taskDecorator, future);
112                                return future;
113                        }
114                }
115                catch (RejectedExecutionException ex) {
116                        throw new TaskRejectedException(
117                                        "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
118                }
119        }
120
121        @Override
122        public <T> Future<T> submit(Callable<T> task) {
123                try {
124                        if (this.taskDecorator == null && this.concurrentExecutor instanceof ExecutorService) {
125                                return ((ExecutorService) this.concurrentExecutor).submit(task);
126                        }
127                        else {
128                                FutureTask<T> future = new FutureTask<T>(task);
129                                doExecute(this.concurrentExecutor, this.taskDecorator, future);
130                                return future;
131                        }
132                }
133                catch (RejectedExecutionException ex) {
134                        throw new TaskRejectedException(
135                                        "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
136                }
137        }
138
139        @Override
140        public ListenableFuture<?> submitListenable(Runnable task) {
141                try {
142                        ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null);
143                        doExecute(this.concurrentExecutor, this.taskDecorator, future);
144                        return future;
145                }
146                catch (RejectedExecutionException ex) {
147                        throw new TaskRejectedException(
148                                        "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
149                }
150        }
151
152        @Override
153        public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
154                try {
155                        ListenableFutureTask<T> future = new ListenableFutureTask<T>(task);
156                        doExecute(this.concurrentExecutor, this.taskDecorator, future);
157                        return future;
158                }
159                catch (RejectedExecutionException ex) {
160                        throw new TaskRejectedException(
161                                        "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
162                }
163        }
164
165
166        /**
167         * Actually execute the given {@code Runnable} (which may be a user-supplied task
168         * or a wrapper around a user-supplied task) with the given executor.
169         * @param concurrentExecutor the underlying JDK concurrent executor to delegate to
170         * @param taskDecorator the specified decorator to be applied, if any
171         * @param runnable the runnable to execute
172         * @throws RejectedExecutionException if the given runnable cannot be accepted
173         * @since 4.3
174         */
175        protected void doExecute(Executor concurrentExecutor, TaskDecorator taskDecorator, Runnable runnable)
176                        throws RejectedExecutionException{
177
178                concurrentExecutor.execute(taskDecorator != null ? taskDecorator.decorate(runnable) : runnable);
179        }
180
181}