001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.scheduling.concurrent;
018
019import java.util.HashMap;
020import java.util.Map;
021import java.util.concurrent.Callable;
022import java.util.concurrent.Executor;
023import java.util.concurrent.Executors;
024import java.util.concurrent.Future;
025
026import javax.enterprise.concurrent.ManagedExecutors;
027import javax.enterprise.concurrent.ManagedTask;
028
029import org.springframework.core.task.AsyncListenableTaskExecutor;
030import org.springframework.core.task.TaskDecorator;
031import org.springframework.core.task.support.TaskExecutorAdapter;
032import org.springframework.lang.Nullable;
033import org.springframework.scheduling.SchedulingAwareRunnable;
034import org.springframework.scheduling.SchedulingTaskExecutor;
035import org.springframework.util.ClassUtils;
036import org.springframework.util.concurrent.ListenableFuture;
037
038/**
039 * Adapter that takes a {@code java.util.concurrent.Executor} and exposes
040 * a Spring {@link org.springframework.core.task.TaskExecutor} for it.
041 * Also detects an extended {@code java.util.concurrent.ExecutorService}, adapting
042 * the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly.
043 *
044 * <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService}
045 * in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it,
046 * exposing a long-running hint based on {@link SchedulingAwareRunnable} and an identity
047 * name based on the given Runnable/Callable's {@code toString()}. For JSR-236 style
048 * lookup in a Java EE 7 environment, consider using {@link DefaultManagedTaskExecutor}.
049 *
050 * <p>Note that there is a pre-built {@link ThreadPoolTaskExecutor} that allows
051 * for defining a {@link java.util.concurrent.ThreadPoolExecutor} in bean style,
052 * exposing it as a Spring {@link org.springframework.core.task.TaskExecutor} directly.
053 * This is a convenient alternative to a raw ThreadPoolExecutor definition with
054 * a separate definition of the present adapter class.
055 *
056 * @author Juergen Hoeller
057 * @since 2.0
058 * @see java.util.concurrent.Executor
059 * @see java.util.concurrent.ExecutorService
060 * @see java.util.concurrent.ThreadPoolExecutor
061 * @see java.util.concurrent.Executors
062 * @see DefaultManagedTaskExecutor
063 * @see ThreadPoolTaskExecutor
064 */
065public class ConcurrentTaskExecutor implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
066
067        @Nullable
068        private static Class<?> managedExecutorServiceClass;
069
070        static {
071                try {
072                        managedExecutorServiceClass = ClassUtils.forName(
073                                        "javax.enterprise.concurrent.ManagedExecutorService",
074                                        ConcurrentTaskScheduler.class.getClassLoader());
075                }
076                catch (ClassNotFoundException ex) {
077                        // JSR-236 API not available...
078                        managedExecutorServiceClass = null;
079                }
080        }
081
082        private Executor concurrentExecutor;
083
084        private TaskExecutorAdapter adaptedExecutor;
085
086
087        /**
088         * Create a new ConcurrentTaskExecutor, using a single thread executor as default.
089         * @see java.util.concurrent.Executors#newSingleThreadExecutor()
090         */
091        public ConcurrentTaskExecutor() {
092                this.concurrentExecutor = Executors.newSingleThreadExecutor();
093                this.adaptedExecutor = new TaskExecutorAdapter(this.concurrentExecutor);
094        }
095
096        /**
097         * Create a new ConcurrentTaskExecutor, using the given {@link java.util.concurrent.Executor}.
098         * <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService}
099         * in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it.
100         * @param executor the {@link java.util.concurrent.Executor} to delegate to
101         */
102        public ConcurrentTaskExecutor(@Nullable Executor executor) {
103                this.concurrentExecutor = (executor != null ? executor : Executors.newSingleThreadExecutor());
104                this.adaptedExecutor = getAdaptedExecutor(this.concurrentExecutor);
105        }
106
107
108        /**
109         * Specify the {@link java.util.concurrent.Executor} to delegate to.
110         * <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService}
111         * in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it.
112         */
113        public final void setConcurrentExecutor(@Nullable Executor executor) {
114                this.concurrentExecutor = (executor != null ? executor : Executors.newSingleThreadExecutor());
115                this.adaptedExecutor = getAdaptedExecutor(this.concurrentExecutor);
116        }
117
118        /**
119         * Return the {@link java.util.concurrent.Executor} that this adapter delegates to.
120         */
121        public final Executor getConcurrentExecutor() {
122                return this.concurrentExecutor;
123        }
124
125        /**
126         * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
127         * about to be executed.
128         * <p>Note that such a decorator is not necessarily being applied to the
129         * user-supplied {@code Runnable}/{@code Callable} but rather to the actual
130         * execution callback (which may be a wrapper around the user-supplied task).
131         * <p>The primary use case is to set some execution context around the task's
132         * invocation, or to provide some monitoring/statistics for task execution.
133         * <p><b>NOTE:</b> Exception handling in {@code TaskDecorator} implementations
134         * is limited to plain {@code Runnable} execution via {@code execute} calls.
135         * In case of {@code #submit} calls, the exposed {@code Runnable} will be a
136         * {@code FutureTask} which does not propagate any exceptions; you might
137         * have to cast it and call {@code Future#get} to evaluate exceptions.
138         * @since 4.3
139         */
140        public final void setTaskDecorator(TaskDecorator taskDecorator) {
141                this.adaptedExecutor.setTaskDecorator(taskDecorator);
142        }
143
144
145        @Override
146        public void execute(Runnable task) {
147                this.adaptedExecutor.execute(task);
148        }
149
150        @Override
151        public void execute(Runnable task, long startTimeout) {
152                this.adaptedExecutor.execute(task, startTimeout);
153        }
154
155        @Override
156        public Future<?> submit(Runnable task) {
157                return this.adaptedExecutor.submit(task);
158        }
159
160        @Override
161        public <T> Future<T> submit(Callable<T> task) {
162                return this.adaptedExecutor.submit(task);
163        }
164
165        @Override
166        public ListenableFuture<?> submitListenable(Runnable task) {
167                return this.adaptedExecutor.submitListenable(task);
168        }
169
170        @Override
171        public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
172                return this.adaptedExecutor.submitListenable(task);
173        }
174
175
176        private static TaskExecutorAdapter getAdaptedExecutor(Executor concurrentExecutor) {
177                if (managedExecutorServiceClass != null && managedExecutorServiceClass.isInstance(concurrentExecutor)) {
178                        return new ManagedTaskExecutorAdapter(concurrentExecutor);
179                }
180                return new TaskExecutorAdapter(concurrentExecutor);
181        }
182
183
184        /**
185         * TaskExecutorAdapter subclass that wraps all provided Runnables and Callables
186         * with a JSR-236 ManagedTask, exposing a long-running hint based on
187         * {@link SchedulingAwareRunnable} and an identity name based on the task's
188         * {@code toString()} representation.
189         */
190        private static class ManagedTaskExecutorAdapter extends TaskExecutorAdapter {
191
192                public ManagedTaskExecutorAdapter(Executor concurrentExecutor) {
193                        super(concurrentExecutor);
194                }
195
196                @Override
197                public void execute(Runnable task) {
198                        super.execute(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
199                }
200
201                @Override
202                public Future<?> submit(Runnable task) {
203                        return super.submit(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
204                }
205
206                @Override
207                public <T> Future<T> submit(Callable<T> task) {
208                        return super.submit(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
209                }
210
211                @Override
212                public ListenableFuture<?> submitListenable(Runnable task) {
213                        return super.submitListenable(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
214                }
215
216                @Override
217                public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
218                        return super.submitListenable(ManagedTaskBuilder.buildManagedTask(task, task.toString()));
219                }
220        }
221
222
223        /**
224         * Delegate that wraps a given Runnable/Callable  with a JSR-236 ManagedTask,
225         * exposing a long-running hint based on {@link SchedulingAwareRunnable}
226         * and a given identity name.
227         */
228        protected static class ManagedTaskBuilder {
229
230                public static Runnable buildManagedTask(Runnable task, String identityName) {
231                        Map<String, String> properties;
232                        if (task instanceof SchedulingAwareRunnable) {
233                                properties = new HashMap<>(4);
234                                properties.put(ManagedTask.LONGRUNNING_HINT,
235                                                Boolean.toString(((SchedulingAwareRunnable) task).isLongLived()));
236                        }
237                        else {
238                                properties = new HashMap<>(2);
239                        }
240                        properties.put(ManagedTask.IDENTITY_NAME, identityName);
241                        return ManagedExecutors.managedTask(task, properties, null);
242                }
243
244                public static <T> Callable<T> buildManagedTask(Callable<T> task, String identityName) {
245                        Map<String, String> properties = new HashMap<>(2);
246                        properties.put(ManagedTask.IDENTITY_NAME, identityName);
247                        return ManagedExecutors.managedTask(task, properties, null);
248                }
249        }
250
251}