001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.scheduling.concurrent;
018
019import java.util.HashMap;
020import java.util.Map;
021import java.util.concurrent.Callable;
022import java.util.concurrent.Executor;
023import java.util.concurrent.Executors;
024import java.util.concurrent.Future;
025import javax.enterprise.concurrent.ManagedExecutors;
026import javax.enterprise.concurrent.ManagedTask;
027
028import org.springframework.core.task.AsyncListenableTaskExecutor;
029import org.springframework.core.task.TaskDecorator;
030import org.springframework.core.task.support.TaskExecutorAdapter;
031import org.springframework.scheduling.SchedulingAwareRunnable;
032import org.springframework.scheduling.SchedulingTaskExecutor;
033import org.springframework.util.ClassUtils;
034import org.springframework.util.concurrent.ListenableFuture;
035
036/**
037 * Adapter that takes a {@code java.util.concurrent.Executor} and exposes
038 * a Spring {@link org.springframework.core.task.TaskExecutor} for it.
039 * Also detects an extended {@code java.util.concurrent.ExecutorService}, adapting
040 * the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly.
041 *
042 * <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService}
043 * in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it,
044 * exposing a long-running hint based on {@link SchedulingAwareRunnable} and an identity
045 * name based on the given Runnable/Callable's {@code toString()}. For JSR-236 style
046 * lookup in a Java EE 7 environment, consider using {@link DefaultManagedTaskExecutor}.
047 *
048 * <p>Note that there is a pre-built {@link ThreadPoolTaskExecutor} that allows
049 * for defining a {@link java.util.concurrent.ThreadPoolExecutor} in bean style,
050 * exposing it as a Spring {@link org.springframework.core.task.TaskExecutor} directly.
051 * This is a convenient alternative to a raw ThreadPoolExecutor definition with
052 * a separate definition of the present adapter class.
053 *
054 * @author Juergen Hoeller
055 * @since 2.0
056 * @see java.util.concurrent.Executor
057 * @see java.util.concurrent.ExecutorService
058 * @see java.util.concurrent.ThreadPoolExecutor
059 * @see java.util.concurrent.Executors
060 * @see DefaultManagedTaskExecutor
061 * @see ThreadPoolTaskExecutor
062 */
063public class ConcurrentTaskExecutor implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
064
065        private static Class<?> managedExecutorServiceClass;
066
067        static {
068                try {
069                        managedExecutorServiceClass = ClassUtils.forName(
070                                        "javax.enterprise.concurrent.ManagedExecutorService",
071                                        ConcurrentTaskScheduler.class.getClassLoader());
072                }
073                catch (ClassNotFoundException ex) {
074                        // JSR-236 API not available...
075                        managedExecutorServiceClass = null;
076                }
077        }
078
079        private Executor concurrentExecutor;
080
081        private TaskExecutorAdapter adaptedExecutor;
082
083
084        /**
085         * Create a new ConcurrentTaskExecutor, using a single thread executor as default.
086         * @see java.util.concurrent.Executors#newSingleThreadExecutor()
087         */
088        public ConcurrentTaskExecutor() {
089                setConcurrentExecutor(null);
090        }
091
092        /**
093         * Create a new ConcurrentTaskExecutor, using the given {@link java.util.concurrent.Executor}.
094         * <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService}
095         * in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it.
096         * @param concurrentExecutor the {@link java.util.concurrent.Executor} to delegate to
097         */
098        public ConcurrentTaskExecutor(Executor concurrentExecutor) {
099                setConcurrentExecutor(concurrentExecutor);
100        }
101
102
103        /**
104         * Specify the {@link java.util.concurrent.Executor} to delegate to.
105         * <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService}
106         * in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it.
107         */
108        public final void setConcurrentExecutor(Executor concurrentExecutor) {
109                if (concurrentExecutor != null) {
110                        this.concurrentExecutor = concurrentExecutor;
111                        if (managedExecutorServiceClass != null && managedExecutorServiceClass.isInstance(concurrentExecutor)) {
112                                this.adaptedExecutor = new ManagedTaskExecutorAdapter(concurrentExecutor);
113                        }
114                        else {
115                                this.adaptedExecutor = new TaskExecutorAdapter(concurrentExecutor);
116                        }
117                }
118                else {
119                        this.concurrentExecutor = Executors.newSingleThreadExecutor();
120                        this.adaptedExecutor = new TaskExecutorAdapter(this.concurrentExecutor);
121                }
122        }
123
124        /**
125         * Return the {@link java.util.concurrent.Executor} that this adapter delegates to.
126         */
127        public final Executor getConcurrentExecutor() {
128                return this.concurrentExecutor;
129        }
130
131        /**
132         * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
133         * about to be executed.
134         * <p>Note that such a decorator is not necessarily being applied to the
135         * user-supplied {@code Runnable}/{@code Callable} but rather to the actual
136         * execution callback (which may be a wrapper around the user-supplied task).
137         * <p>The primary use case is to set some execution context around the task's
138         * invocation, or to provide some monitoring/statistics for task execution.
139         * <p><b>NOTE:</b> Exception handling in {@code TaskDecorator} implementations
140         * is limited to plain {@code Runnable} execution via {@code execute} calls.
141         * In case of {@code #submit} calls, the exposed {@code Runnable} will be a
142         * {@code FutureTask} which does not propagate any exceptions; you might
143         * have to cast it and call {@code Future#get} to evaluate exceptions.
144         * @since 4.3
145         */
146        public final void setTaskDecorator(TaskDecorator taskDecorator) {
147                this.adaptedExecutor.setTaskDecorator(taskDecorator);
148        }
149
150
151        @Override
152        public void execute(Runnable task) {
153                this.adaptedExecutor.execute(task);
154        }
155
156        @Override
157        public void execute(Runnable task, long startTimeout) {
158                this.adaptedExecutor.execute(task, startTimeout);
159        }
160
161        @Override
162        public Future<?> submit(Runnable task) {
163                return this.adaptedExecutor.submit(task);
164        }
165
166        @Override
167        public <T> Future<T> submit(Callable<T> task) {
168                return this.adaptedExecutor.submit(task);
169        }
170
171        @Override
172        public ListenableFuture<?> submitListenable(Runnable task) {
173                return this.adaptedExecutor.submitListenable(task);
174        }
175
176        @Override
177        public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
178                return this.adaptedExecutor.submitListenable(task);
179        }
180
181        /**
182         * This task executor prefers short-lived work units.
183         */
184        @Override
185        public boolean prefersShortLivedTasks() {
186                return true;
187        }
188
189
190        /**
191         * TaskExecutorAdapter subclass that wraps all provided Runnables and Callables
192         * with a JSR-236 ManagedTask, exposing a long-running hint based on
193         * {@link SchedulingAwareRunnable} and an identity name based on the task's
194         * {@code toString()} representation.
195         */
196        private static class ManagedTaskExecutorAdapter extends TaskExecutorAdapter {
197
198                public ManagedTaskExecutorAdapter(Executor concurrentExecutor) {
199                        super(concurrentExecutor);
200                }
201
202                @Override
203                public void execute(Runnable task) {
204                        super.execute(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
205                }
206
207                @Override
208                public Future<?> submit(Runnable task) {
209                        return super.submit(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
210                }
211
212                @Override
213                public <T> Future<T> submit(Callable<T> task) {
214                        return super.submit(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
215                }
216
217                @Override
218                public ListenableFuture<?> submitListenable(Runnable task) {
219                        return super.submitListenable(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
220                }
221
222                @Override
223                public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
224                        return super.submitListenable(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
225                }
226        }
227
228
229        /**
230         * Delegate that wraps a given Runnable/Callable  with a JSR-236 ManagedTask,
231         * exposing a long-running hint based on {@link SchedulingAwareRunnable}
232         * and a given identity name.
233         */
234        protected static class ManagedTaskBuilder {
235
236                public static Runnable buildManagedTask(Runnable task, String identityName) {
237                        Map<String, String> properties = new HashMap<String, String>(2);
238                        if (task instanceof SchedulingAwareRunnable) {
239                                properties.put(ManagedTask.LONGRUNNING_HINT,
240                                                Boolean.toString(((SchedulingAwareRunnable) task).isLongLived()));
241                        }
242                        properties.put(ManagedTask.IDENTITY_NAME, identityName);
243                        return ManagedExecutors.managedTask(task, properties, null);
244                }
245
246                public static <T> Callable<T> buildManagedTask(Callable<T> task, String identityName) {
247                        Map<String, String> properties = new HashMap<String, String>(1);
248                        properties.put(ManagedTask.IDENTITY_NAME, identityName);
249                        return ManagedExecutors.managedTask(task, properties, null);
250                }
251        }
252
253}