001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.scheduling.concurrent; 018 019import java.util.HashMap; 020import java.util.Map; 021import java.util.concurrent.Callable; 022import java.util.concurrent.Executor; 023import java.util.concurrent.Executors; 024import java.util.concurrent.Future; 025import javax.enterprise.concurrent.ManagedExecutors; 026import javax.enterprise.concurrent.ManagedTask; 027 028import org.springframework.core.task.AsyncListenableTaskExecutor; 029import org.springframework.core.task.TaskDecorator; 030import org.springframework.core.task.support.TaskExecutorAdapter; 031import org.springframework.scheduling.SchedulingAwareRunnable; 032import org.springframework.scheduling.SchedulingTaskExecutor; 033import org.springframework.util.ClassUtils; 034import org.springframework.util.concurrent.ListenableFuture; 035 036/** 037 * Adapter that takes a {@code java.util.concurrent.Executor} and exposes 038 * a Spring {@link org.springframework.core.task.TaskExecutor} for it. 039 * Also detects an extended {@code java.util.concurrent.ExecutorService}, adapting 040 * the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly. 041 * 042 * <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService} 043 * in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it, 044 * exposing a long-running hint based on {@link SchedulingAwareRunnable} and an identity 045 * name based on the given Runnable/Callable's {@code toString()}. For JSR-236 style 046 * lookup in a Java EE 7 environment, consider using {@link DefaultManagedTaskExecutor}. 047 * 048 * <p>Note that there is a pre-built {@link ThreadPoolTaskExecutor} that allows 049 * for defining a {@link java.util.concurrent.ThreadPoolExecutor} in bean style, 050 * exposing it as a Spring {@link org.springframework.core.task.TaskExecutor} directly. 051 * This is a convenient alternative to a raw ThreadPoolExecutor definition with 052 * a separate definition of the present adapter class. 053 * 054 * @author Juergen Hoeller 055 * @since 2.0 056 * @see java.util.concurrent.Executor 057 * @see java.util.concurrent.ExecutorService 058 * @see java.util.concurrent.ThreadPoolExecutor 059 * @see java.util.concurrent.Executors 060 * @see DefaultManagedTaskExecutor 061 * @see ThreadPoolTaskExecutor 062 */ 063public class ConcurrentTaskExecutor implements AsyncListenableTaskExecutor, SchedulingTaskExecutor { 064 065 private static Class<?> managedExecutorServiceClass; 066 067 static { 068 try { 069 managedExecutorServiceClass = ClassUtils.forName( 070 "javax.enterprise.concurrent.ManagedExecutorService", 071 ConcurrentTaskScheduler.class.getClassLoader()); 072 } 073 catch (ClassNotFoundException ex) { 074 // JSR-236 API not available... 075 managedExecutorServiceClass = null; 076 } 077 } 078 079 private Executor concurrentExecutor; 080 081 private TaskExecutorAdapter adaptedExecutor; 082 083 084 /** 085 * Create a new ConcurrentTaskExecutor, using a single thread executor as default. 086 * @see java.util.concurrent.Executors#newSingleThreadExecutor() 087 */ 088 public ConcurrentTaskExecutor() { 089 setConcurrentExecutor(null); 090 } 091 092 /** 093 * Create a new ConcurrentTaskExecutor, using the given {@link java.util.concurrent.Executor}. 094 * <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService} 095 * in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it. 096 * @param concurrentExecutor the {@link java.util.concurrent.Executor} to delegate to 097 */ 098 public ConcurrentTaskExecutor(Executor concurrentExecutor) { 099 setConcurrentExecutor(concurrentExecutor); 100 } 101 102 103 /** 104 * Specify the {@link java.util.concurrent.Executor} to delegate to. 105 * <p>Autodetects a JSR-236 {@link javax.enterprise.concurrent.ManagedExecutorService} 106 * in order to expose {@link javax.enterprise.concurrent.ManagedTask} adapters for it. 107 */ 108 public final void setConcurrentExecutor(Executor concurrentExecutor) { 109 if (concurrentExecutor != null) { 110 this.concurrentExecutor = concurrentExecutor; 111 if (managedExecutorServiceClass != null && managedExecutorServiceClass.isInstance(concurrentExecutor)) { 112 this.adaptedExecutor = new ManagedTaskExecutorAdapter(concurrentExecutor); 113 } 114 else { 115 this.adaptedExecutor = new TaskExecutorAdapter(concurrentExecutor); 116 } 117 } 118 else { 119 this.concurrentExecutor = Executors.newSingleThreadExecutor(); 120 this.adaptedExecutor = new TaskExecutorAdapter(this.concurrentExecutor); 121 } 122 } 123 124 /** 125 * Return the {@link java.util.concurrent.Executor} that this adapter delegates to. 126 */ 127 public final Executor getConcurrentExecutor() { 128 return this.concurrentExecutor; 129 } 130 131 /** 132 * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable} 133 * about to be executed. 134 * <p>Note that such a decorator is not necessarily being applied to the 135 * user-supplied {@code Runnable}/{@code Callable} but rather to the actual 136 * execution callback (which may be a wrapper around the user-supplied task). 137 * <p>The primary use case is to set some execution context around the task's 138 * invocation, or to provide some monitoring/statistics for task execution. 139 * <p><b>NOTE:</b> Exception handling in {@code TaskDecorator} implementations 140 * is limited to plain {@code Runnable} execution via {@code execute} calls. 141 * In case of {@code #submit} calls, the exposed {@code Runnable} will be a 142 * {@code FutureTask} which does not propagate any exceptions; you might 143 * have to cast it and call {@code Future#get} to evaluate exceptions. 144 * @since 4.3 145 */ 146 public final void setTaskDecorator(TaskDecorator taskDecorator) { 147 this.adaptedExecutor.setTaskDecorator(taskDecorator); 148 } 149 150 151 @Override 152 public void execute(Runnable task) { 153 this.adaptedExecutor.execute(task); 154 } 155 156 @Override 157 public void execute(Runnable task, long startTimeout) { 158 this.adaptedExecutor.execute(task, startTimeout); 159 } 160 161 @Override 162 public Future<?> submit(Runnable task) { 163 return this.adaptedExecutor.submit(task); 164 } 165 166 @Override 167 public <T> Future<T> submit(Callable<T> task) { 168 return this.adaptedExecutor.submit(task); 169 } 170 171 @Override 172 public ListenableFuture<?> submitListenable(Runnable task) { 173 return this.adaptedExecutor.submitListenable(task); 174 } 175 176 @Override 177 public <T> ListenableFuture<T> submitListenable(Callable<T> task) { 178 return this.adaptedExecutor.submitListenable(task); 179 } 180 181 /** 182 * This task executor prefers short-lived work units. 183 */ 184 @Override 185 public boolean prefersShortLivedTasks() { 186 return true; 187 } 188 189 190 /** 191 * TaskExecutorAdapter subclass that wraps all provided Runnables and Callables 192 * with a JSR-236 ManagedTask, exposing a long-running hint based on 193 * {@link SchedulingAwareRunnable} and an identity name based on the task's 194 * {@code toString()} representation. 195 */ 196 private static class ManagedTaskExecutorAdapter extends TaskExecutorAdapter { 197 198 public ManagedTaskExecutorAdapter(Executor concurrentExecutor) { 199 super(concurrentExecutor); 200 } 201 202 @Override 203 public void execute(Runnable task) { 204 super.execute(ManagedTaskBuilder.buildManagedTask(task, task.toString())); 205 } 206 207 @Override 208 public Future<?> submit(Runnable task) { 209 return super.submit(ManagedTaskBuilder.buildManagedTask(task, task.toString())); 210 } 211 212 @Override 213 public <T> Future<T> submit(Callable<T> task) { 214 return super.submit(ManagedTaskBuilder.buildManagedTask(task, task.toString())); 215 } 216 217 @Override 218 public ListenableFuture<?> submitListenable(Runnable task) { 219 return super.submitListenable(ManagedTaskBuilder.buildManagedTask(task, task.toString())); 220 } 221 222 @Override 223 public <T> ListenableFuture<T> submitListenable(Callable<T> task) { 224 return super.submitListenable(ManagedTaskBuilder.buildManagedTask(task, task.toString())); 225 } 226 } 227 228 229 /** 230 * Delegate that wraps a given Runnable/Callable with a JSR-236 ManagedTask, 231 * exposing a long-running hint based on {@link SchedulingAwareRunnable} 232 * and a given identity name. 233 */ 234 protected static class ManagedTaskBuilder { 235 236 public static Runnable buildManagedTask(Runnable task, String identityName) { 237 Map<String, String> properties = new HashMap<String, String>(2); 238 if (task instanceof SchedulingAwareRunnable) { 239 properties.put(ManagedTask.LONGRUNNING_HINT, 240 Boolean.toString(((SchedulingAwareRunnable) task).isLongLived())); 241 } 242 properties.put(ManagedTask.IDENTITY_NAME, identityName); 243 return ManagedExecutors.managedTask(task, properties, null); 244 } 245 246 public static <T> Callable<T> buildManagedTask(Callable<T> task, String identityName) { 247 Map<String, String> properties = new HashMap<String, String>(1); 248 properties.put(ManagedTask.IDENTITY_NAME, identityName); 249 return ManagedExecutors.managedTask(task, properties, null); 250 } 251 } 252 253}