001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.core.task.support;
018
019import java.util.concurrent.Callable;
020import java.util.concurrent.Executor;
021import java.util.concurrent.ExecutorService;
022import java.util.concurrent.Future;
023import java.util.concurrent.FutureTask;
024import java.util.concurrent.RejectedExecutionException;
025
026import org.springframework.core.task.AsyncListenableTaskExecutor;
027import org.springframework.core.task.TaskDecorator;
028import org.springframework.core.task.TaskRejectedException;
029import org.springframework.lang.Nullable;
030import org.springframework.util.Assert;
031import org.springframework.util.concurrent.ListenableFuture;
032import org.springframework.util.concurrent.ListenableFutureTask;
033
034/**
035 * Adapter that takes a JDK {@code java.util.concurrent.Executor} and
036 * exposes a Spring {@link org.springframework.core.task.TaskExecutor} for it.
037 * Also detects an extended {@code java.util.concurrent.ExecutorService}, adapting
038 * the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly.
039 *
040 * @author Juergen Hoeller
041 * @since 3.0
042 * @see java.util.concurrent.Executor
043 * @see java.util.concurrent.ExecutorService
044 * @see java.util.concurrent.Executors
045 */
046public class TaskExecutorAdapter implements AsyncListenableTaskExecutor {
047
048        private final Executor concurrentExecutor;
049
050        @Nullable
051        private TaskDecorator taskDecorator;
052
053
054        /**
055         * Create a new TaskExecutorAdapter,
056         * using the given JDK concurrent executor.
057         * @param concurrentExecutor the JDK concurrent executor to delegate to
058         */
059        public TaskExecutorAdapter(Executor concurrentExecutor) {
060                Assert.notNull(concurrentExecutor, "Executor must not be null");
061                this.concurrentExecutor = concurrentExecutor;
062        }
063
064
065        /**
066         * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
067         * about to be executed.
068         * <p>Note that such a decorator is not necessarily being applied to the
069         * user-supplied {@code Runnable}/{@code Callable} but rather to the actual
070         * execution callback (which may be a wrapper around the user-supplied task).
071         * <p>The primary use case is to set some execution context around the task's
072         * invocation, or to provide some monitoring/statistics for task execution.
073         * <p><b>NOTE:</b> Exception handling in {@code TaskDecorator} implementations
074         * is limited to plain {@code Runnable} execution via {@code execute} calls.
075         * In case of {@code #submit} calls, the exposed {@code Runnable} will be a
076         * {@code FutureTask} which does not propagate any exceptions; you might
077         * have to cast it and call {@code Future#get} to evaluate exceptions.
078         * @since 4.3
079         */
080        public final void setTaskDecorator(TaskDecorator taskDecorator) {
081                this.taskDecorator = taskDecorator;
082        }
083
084
085        /**
086         * Delegates to the specified JDK concurrent executor.
087         * @see java.util.concurrent.Executor#execute(Runnable)
088         */
089        @Override
090        public void execute(Runnable task) {
091                try {
092                        doExecute(this.concurrentExecutor, this.taskDecorator, task);
093                }
094                catch (RejectedExecutionException ex) {
095                        throw new TaskRejectedException(
096                                        "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
097                }
098        }
099
100        @Override
101        public void execute(Runnable task, long startTimeout) {
102                execute(task);
103        }
104
105        @Override
106        public Future<?> submit(Runnable task) {
107                try {
108                        if (this.taskDecorator == null && this.concurrentExecutor instanceof ExecutorService) {
109                                return ((ExecutorService) this.concurrentExecutor).submit(task);
110                        }
111                        else {
112                                FutureTask<Object> future = new FutureTask<>(task, null);
113                                doExecute(this.concurrentExecutor, this.taskDecorator, future);
114                                return future;
115                        }
116                }
117                catch (RejectedExecutionException ex) {
118                        throw new TaskRejectedException(
119                                        "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
120                }
121        }
122
123        @Override
124        public <T> Future<T> submit(Callable<T> task) {
125                try {
126                        if (this.taskDecorator == null && this.concurrentExecutor instanceof ExecutorService) {
127                                return ((ExecutorService) this.concurrentExecutor).submit(task);
128                        }
129                        else {
130                                FutureTask<T> future = new FutureTask<>(task);
131                                doExecute(this.concurrentExecutor, this.taskDecorator, future);
132                                return future;
133                        }
134                }
135                catch (RejectedExecutionException ex) {
136                        throw new TaskRejectedException(
137                                        "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
138                }
139        }
140
141        @Override
142        public ListenableFuture<?> submitListenable(Runnable task) {
143                try {
144                        ListenableFutureTask<Object> future = new ListenableFutureTask<>(task, null);
145                        doExecute(this.concurrentExecutor, this.taskDecorator, future);
146                        return future;
147                }
148                catch (RejectedExecutionException ex) {
149                        throw new TaskRejectedException(
150                                        "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
151                }
152        }
153
154        @Override
155        public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
156                try {
157                        ListenableFutureTask<T> future = new ListenableFutureTask<>(task);
158                        doExecute(this.concurrentExecutor, this.taskDecorator, future);
159                        return future;
160                }
161                catch (RejectedExecutionException ex) {
162                        throw new TaskRejectedException(
163                                        "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
164                }
165        }
166
167
168        /**
169         * Actually execute the given {@code Runnable} (which may be a user-supplied task
170         * or a wrapper around a user-supplied task) with the given executor.
171         * @param concurrentExecutor the underlying JDK concurrent executor to delegate to
172         * @param taskDecorator the specified decorator to be applied, if any
173         * @param runnable the runnable to execute
174         * @throws RejectedExecutionException if the given runnable cannot be accepted
175         * @since 4.3
176         */
177        protected void doExecute(Executor concurrentExecutor, @Nullable TaskDecorator taskDecorator, Runnable runnable)
178                        throws RejectedExecutionException{
179
180                concurrentExecutor.execute(taskDecorator != null ? taskDecorator.decorate(runnable) : runnable);
181        }
182
183}