001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.scheduling.concurrent; 018 019import java.util.HashMap; 020import java.util.Map; 021import java.util.concurrent.Callable; 022import java.util.concurrent.Executor; 023import java.util.concurrent.Executors; 024import java.util.concurrent.Future; 025 026import javax.enterprise.concurrent.ManagedExecutors; 027import javax.enterprise.concurrent.ManagedTask; 028 029import org.springframework.core.task.AsyncListenableTaskExecutor; 030import org.springframework.core.task.TaskDecorator; 031import org.springframework.core.task.support.TaskExecutorAdapter; 032import org.springframework.lang.Nullable; 033import org.springframework.scheduling.SchedulingAwareRunnable; 034import org.springframework.scheduling.SchedulingTaskExecutor; 035import org.springframework.util.ClassUtils; 036import org.springframework.util.concurrent.ListenableFuture; 037 038/** 039 * Adapter that takes a {@code java.util.concurrent.Executor} and exposes 040 * a Spring {@link org.springframework.core.task.TaskExecutor} for it. 041 * Also detects an extended {@code java.util.concurrent.ExecutorService}, adapting 042 * the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly. 043 * 044 * <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService} 045 * in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it, 046 * exposing a long-running hint based on {@link SchedulingAwareRunnable} and an identity 047 * name based on the given Runnable/Callable's {@code toString()}. For JSR-236 style 048 * lookup in a Java EE 7 environment, consider using {@link DefaultManagedTaskExecutor}. 049 * 050 * <p>Note that there is a pre-built {@link ThreadPoolTaskExecutor} that allows 051 * for defining a {@link java.util.concurrent.ThreadPoolExecutor} in bean style, 052 * exposing it as a Spring {@link org.springframework.core.task.TaskExecutor} directly. 053 * This is a convenient alternative to a raw ThreadPoolExecutor definition with 054 * a separate definition of the present adapter class. 055 * 056 * @author Juergen Hoeller 057 * @since 2.0 058 * @see java.util.concurrent.Executor 059 * @see java.util.concurrent.ExecutorService 060 * @see java.util.concurrent.ThreadPoolExecutor 061 * @see java.util.concurrent.Executors 062 * @see DefaultManagedTaskExecutor 063 * @see ThreadPoolTaskExecutor 064 */ 065public class ConcurrentTaskExecutor implements AsyncListenableTaskExecutor, SchedulingTaskExecutor { 066 067 @Nullable 068 private static Class<?> managedExecutorServiceClass; 069 070 static { 071 try { 072 managedExecutorServiceClass = ClassUtils.forName( 073 "javax.enterprise.concurrent.ManagedExecutorService", 074 ConcurrentTaskScheduler.class.getClassLoader()); 075 } 076 catch (ClassNotFoundException ex) { 077 // JSR-236 API not available... 078 managedExecutorServiceClass = null; 079 } 080 } 081 082 private Executor concurrentExecutor; 083 084 private TaskExecutorAdapter adaptedExecutor; 085 086 087 /** 088 * Create a new ConcurrentTaskExecutor, using a single thread executor as default. 089 * @see java.util.concurrent.Executors#newSingleThreadExecutor() 090 */ 091 public ConcurrentTaskExecutor() { 092 this.concurrentExecutor = Executors.newSingleThreadExecutor(); 093 this.adaptedExecutor = new TaskExecutorAdapter(this.concurrentExecutor); 094 } 095 096 /** 097 * Create a new ConcurrentTaskExecutor, using the given {@link java.util.concurrent.Executor}. 098 * <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService} 099 * in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it. 100 * @param executor the {@link java.util.concurrent.Executor} to delegate to 101 */ 102 public ConcurrentTaskExecutor(@Nullable Executor executor) { 103 this.concurrentExecutor = (executor != null ? executor : Executors.newSingleThreadExecutor()); 104 this.adaptedExecutor = getAdaptedExecutor(this.concurrentExecutor); 105 } 106 107 108 /** 109 * Specify the {@link java.util.concurrent.Executor} to delegate to. 110 * <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService} 111 * in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it. 112 */ 113 public final void setConcurrentExecutor(@Nullable Executor executor) { 114 this.concurrentExecutor = (executor != null ? executor : Executors.newSingleThreadExecutor()); 115 this.adaptedExecutor = getAdaptedExecutor(this.concurrentExecutor); 116 } 117 118 /** 119 * Return the {@link java.util.concurrent.Executor} that this adapter delegates to. 120 */ 121 public final Executor getConcurrentExecutor() { 122 return this.concurrentExecutor; 123 } 124 125 /** 126 * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable} 127 * about to be executed. 128 * <p>Note that such a decorator is not necessarily being applied to the 129 * user-supplied {@code Runnable}/{@code Callable} but rather to the actual 130 * execution callback (which may be a wrapper around the user-supplied task). 131 * <p>The primary use case is to set some execution context around the task's 132 * invocation, or to provide some monitoring/statistics for task execution. 133 * <p><b>NOTE:</b> Exception handling in {@code TaskDecorator} implementations 134 * is limited to plain {@code Runnable} execution via {@code execute} calls. 135 * In case of {@code #submit} calls, the exposed {@code Runnable} will be a 136 * {@code FutureTask} which does not propagate any exceptions; you might 137 * have to cast it and call {@code Future#get} to evaluate exceptions. 138 * @since 4.3 139 */ 140 public final void setTaskDecorator(TaskDecorator taskDecorator) { 141 this.adaptedExecutor.setTaskDecorator(taskDecorator); 142 } 143 144 145 @Override 146 public void execute(Runnable task) { 147 this.adaptedExecutor.execute(task); 148 } 149 150 @Override 151 public void execute(Runnable task, long startTimeout) { 152 this.adaptedExecutor.execute(task, startTimeout); 153 } 154 155 @Override 156 public Future<?> submit(Runnable task) { 157 return this.adaptedExecutor.submit(task); 158 } 159 160 @Override 161 public <T> Future<T> submit(Callable<T> task) { 162 return this.adaptedExecutor.submit(task); 163 } 164 165 @Override 166 public ListenableFuture<?> submitListenable(Runnable task) { 167 return this.adaptedExecutor.submitListenable(task); 168 } 169 170 @Override 171 public <T> ListenableFuture<T> submitListenable(Callable<T> task) { 172 return this.adaptedExecutor.submitListenable(task); 173 } 174 175 176 private static TaskExecutorAdapter getAdaptedExecutor(Executor concurrentExecutor) { 177 if (managedExecutorServiceClass != null && managedExecutorServiceClass.isInstance(concurrentExecutor)) { 178 return new ManagedTaskExecutorAdapter(concurrentExecutor); 179 } 180 return new TaskExecutorAdapter(concurrentExecutor); 181 } 182 183 184 /** 185 * TaskExecutorAdapter subclass that wraps all provided Runnables and Callables 186 * with a JSR-236 ManagedTask, exposing a long-running hint based on 187 * {@link SchedulingAwareRunnable} and an identity name based on the task's 188 * {@code toString()} representation. 189 */ 190 private static class ManagedTaskExecutorAdapter extends TaskExecutorAdapter { 191 192 public ManagedTaskExecutorAdapter(Executor concurrentExecutor) { 193 super(concurrentExecutor); 194 } 195 196 @Override 197 public void execute(Runnable task) { 198 super.execute(ManagedTaskBuilder.buildManagedTask(task, task.toString())); 199 } 200 201 @Override 202 public Future<?> submit(Runnable task) { 203 return super.submit(ManagedTaskBuilder.buildManagedTask(task, task.toString())); 204 } 205 206 @Override 207 public <T> Future<T> submit(Callable<T> task) { 208 return super.submit(ManagedTaskBuilder.buildManagedTask(task, task.toString())); 209 } 210 211 @Override 212 public ListenableFuture<?> submitListenable(Runnable task) { 213 return super.submitListenable(ManagedTaskBuilder.buildManagedTask(task, task.toString())); 214 } 215 216 @Override 217 public <T> ListenableFuture<T> submitListenable(Callable<T> task) { 218 return super.submitListenable(ManagedTaskBuilder.buildManagedTask(task, task.toString())); 219 } 220 } 221 222 223 /** 224 * Delegate that wraps a given Runnable/Callable with a JSR-236 ManagedTask, 225 * exposing a long-running hint based on {@link SchedulingAwareRunnable} 226 * and a given identity name. 227 */ 228 protected static class ManagedTaskBuilder { 229 230 public static Runnable buildManagedTask(Runnable task, String identityName) { 231 Map<String, String> properties; 232 if (task instanceof SchedulingAwareRunnable) { 233 properties = new HashMap<>(4); 234 properties.put(ManagedTask.LONGRUNNING_HINT, 235 Boolean.toString(((SchedulingAwareRunnable) task).isLongLived())); 236 } 237 else { 238 properties = new HashMap<>(2); 239 } 240 properties.put(ManagedTask.IDENTITY_NAME, identityName); 241 return ManagedExecutors.managedTask(task, properties, null); 242 } 243 244 public static <T> Callable<T> buildManagedTask(Callable<T> task, String identityName) { 245 Map<String, String> properties = new HashMap<>(2); 246 properties.put(ManagedTask.IDENTITY_NAME, identityName); 247 return ManagedExecutors.managedTask(task, properties, null); 248 } 249 } 250 251}