001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.core.task.support; 018 019import java.util.concurrent.Callable; 020import java.util.concurrent.Executor; 021import java.util.concurrent.ExecutorService; 022import java.util.concurrent.Future; 023import java.util.concurrent.FutureTask; 024import java.util.concurrent.RejectedExecutionException; 025 026import org.springframework.core.task.AsyncListenableTaskExecutor; 027import org.springframework.core.task.TaskDecorator; 028import org.springframework.core.task.TaskRejectedException; 029import org.springframework.lang.Nullable; 030import org.springframework.util.Assert; 031import org.springframework.util.concurrent.ListenableFuture; 032import org.springframework.util.concurrent.ListenableFutureTask; 033 034/** 035 * Adapter that takes a JDK {@code java.util.concurrent.Executor} and 036 * exposes a Spring {@link org.springframework.core.task.TaskExecutor} for it. 037 * Also detects an extended {@code java.util.concurrent.ExecutorService}, adapting 038 * the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly. 039 * 040 * @author Juergen Hoeller 041 * @since 3.0 042 * @see java.util.concurrent.Executor 043 * @see java.util.concurrent.ExecutorService 044 * @see java.util.concurrent.Executors 045 */ 046public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { 047 048 private final Executor concurrentExecutor; 049 050 @Nullable 051 private TaskDecorator taskDecorator; 052 053 054 /** 055 * Create a new TaskExecutorAdapter, 056 * using the given JDK concurrent executor. 057 * @param concurrentExecutor the JDK concurrent executor to delegate to 058 */ 059 public TaskExecutorAdapter(Executor concurrentExecutor) { 060 Assert.notNull(concurrentExecutor, "Executor must not be null"); 061 this.concurrentExecutor = concurrentExecutor; 062 } 063 064 065 /** 066 * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable} 067 * about to be executed. 068 * <p>Note that such a decorator is not necessarily being applied to the 069 * user-supplied {@code Runnable}/{@code Callable} but rather to the actual 070 * execution callback (which may be a wrapper around the user-supplied task). 071 * <p>The primary use case is to set some execution context around the task's 072 * invocation, or to provide some monitoring/statistics for task execution. 073 * <p><b>NOTE:</b> Exception handling in {@code TaskDecorator} implementations 074 * is limited to plain {@code Runnable} execution via {@code execute} calls. 075 * In case of {@code #submit} calls, the exposed {@code Runnable} will be a 076 * {@code FutureTask} which does not propagate any exceptions; you might 077 * have to cast it and call {@code Future#get} to evaluate exceptions. 078 * @since 4.3 079 */ 080 public final void setTaskDecorator(TaskDecorator taskDecorator) { 081 this.taskDecorator = taskDecorator; 082 } 083 084 085 /** 086 * Delegates to the specified JDK concurrent executor. 087 * @see java.util.concurrent.Executor#execute(Runnable) 088 */ 089 @Override 090 public void execute(Runnable task) { 091 try { 092 doExecute(this.concurrentExecutor, this.taskDecorator, task); 093 } 094 catch (RejectedExecutionException ex) { 095 throw new TaskRejectedException( 096 "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex); 097 } 098 } 099 100 @Override 101 public void execute(Runnable task, long startTimeout) { 102 execute(task); 103 } 104 105 @Override 106 public Future<?> submit(Runnable task) { 107 try { 108 if (this.taskDecorator == null && this.concurrentExecutor instanceof ExecutorService) { 109 return ((ExecutorService) this.concurrentExecutor).submit(task); 110 } 111 else { 112 FutureTask<Object> future = new FutureTask<>(task, null); 113 doExecute(this.concurrentExecutor, this.taskDecorator, future); 114 return future; 115 } 116 } 117 catch (RejectedExecutionException ex) { 118 throw new TaskRejectedException( 119 "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex); 120 } 121 } 122 123 @Override 124 public <T> Future<T> submit(Callable<T> task) { 125 try { 126 if (this.taskDecorator == null && this.concurrentExecutor instanceof ExecutorService) { 127 return ((ExecutorService) this.concurrentExecutor).submit(task); 128 } 129 else { 130 FutureTask<T> future = new FutureTask<>(task); 131 doExecute(this.concurrentExecutor, this.taskDecorator, future); 132 return future; 133 } 134 } 135 catch (RejectedExecutionException ex) { 136 throw new TaskRejectedException( 137 "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex); 138 } 139 } 140 141 @Override 142 public ListenableFuture<?> submitListenable(Runnable task) { 143 try { 144 ListenableFutureTask<Object> future = new ListenableFutureTask<>(task, null); 145 doExecute(this.concurrentExecutor, this.taskDecorator, future); 146 return future; 147 } 148 catch (RejectedExecutionException ex) { 149 throw new TaskRejectedException( 150 "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex); 151 } 152 } 153 154 @Override 155 public <T> ListenableFuture<T> submitListenable(Callable<T> task) { 156 try { 157 ListenableFutureTask<T> future = new ListenableFutureTask<>(task); 158 doExecute(this.concurrentExecutor, this.taskDecorator, future); 159 return future; 160 } 161 catch (RejectedExecutionException ex) { 162 throw new TaskRejectedException( 163 "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex); 164 } 165 } 166 167 168 /** 169 * Actually execute the given {@code Runnable} (which may be a user-supplied task 170 * or a wrapper around a user-supplied task) with the given executor. 171 * @param concurrentExecutor the underlying JDK concurrent executor to delegate to 172 * @param taskDecorator the specified decorator to be applied, if any 173 * @param runnable the runnable to execute 174 * @throws RejectedExecutionException if the given runnable cannot be accepted 175 * @since 4.3 176 */ 177 protected void doExecute(Executor concurrentExecutor, @Nullable TaskDecorator taskDecorator, Runnable runnable) 178 throws RejectedExecutionException{ 179 180 concurrentExecutor.execute(taskDecorator != null ? taskDecorator.decorate(runnable) : runnable); 181 } 182 183}