001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.core.task.support; 018 019import java.util.concurrent.Callable; 020import java.util.concurrent.Executor; 021import java.util.concurrent.ExecutorService; 022import java.util.concurrent.Future; 023import java.util.concurrent.FutureTask; 024import java.util.concurrent.RejectedExecutionException; 025 026import org.springframework.core.task.AsyncListenableTaskExecutor; 027import org.springframework.core.task.TaskDecorator; 028import org.springframework.core.task.TaskRejectedException; 029import org.springframework.util.Assert; 030import org.springframework.util.concurrent.ListenableFuture; 031import org.springframework.util.concurrent.ListenableFutureTask; 032 033/** 034 * Adapter that takes a JDK {@code java.util.concurrent.Executor} and 035 * exposes a Spring {@link org.springframework.core.task.TaskExecutor} for it. 036 * Also detects an extended {@code java.util.concurrent.ExecutorService}, adapting 037 * the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly. 038 * 039 * @author Juergen Hoeller 040 * @since 3.0 041 * @see java.util.concurrent.Executor 042 * @see java.util.concurrent.ExecutorService 043 * @see java.util.concurrent.Executors 044 */ 045public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { 046 047 private final Executor concurrentExecutor; 048 049 private TaskDecorator taskDecorator; 050 051 052 /** 053 * Create a new TaskExecutorAdapter, 054 * using the given JDK concurrent executor. 055 * @param concurrentExecutor the JDK concurrent executor to delegate to 056 */ 057 public TaskExecutorAdapter(Executor concurrentExecutor) { 058 Assert.notNull(concurrentExecutor, "Executor must not be null"); 059 this.concurrentExecutor = concurrentExecutor; 060 } 061 062 063 /** 064 * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable} 065 * about to be executed. 066 * <p>Note that such a decorator is not necessarily being applied to the 067 * user-supplied {@code Runnable}/{@code Callable} but rather to the actual 068 * execution callback (which may be a wrapper around the user-supplied task). 069 * <p>The primary use case is to set some execution context around the task's 070 * invocation, or to provide some monitoring/statistics for task execution. 071 * <p><b>NOTE:</b> Exception handling in {@code TaskDecorator} implementations 072 * is limited to plain {@code Runnable} execution via {@code execute} calls. 073 * In case of {@code #submit} calls, the exposed {@code Runnable} will be a 074 * {@code FutureTask} which does not propagate any exceptions; you might 075 * have to cast it and call {@code Future#get} to evaluate exceptions. 076 * @since 4.3 077 */ 078 public final void setTaskDecorator(TaskDecorator taskDecorator) { 079 this.taskDecorator = taskDecorator; 080 } 081 082 083 /** 084 * Delegates to the specified JDK concurrent executor. 085 * @see java.util.concurrent.Executor#execute(Runnable) 086 */ 087 @Override 088 public void execute(Runnable task) { 089 try { 090 doExecute(this.concurrentExecutor, this.taskDecorator, task); 091 } 092 catch (RejectedExecutionException ex) { 093 throw new TaskRejectedException( 094 "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex); 095 } 096 } 097 098 @Override 099 public void execute(Runnable task, long startTimeout) { 100 execute(task); 101 } 102 103 @Override 104 public Future<?> submit(Runnable task) { 105 try { 106 if (this.taskDecorator == null && this.concurrentExecutor instanceof ExecutorService) { 107 return ((ExecutorService) this.concurrentExecutor).submit(task); 108 } 109 else { 110 FutureTask<Object> future = new FutureTask<Object>(task, null); 111 doExecute(this.concurrentExecutor, this.taskDecorator, future); 112 return future; 113 } 114 } 115 catch (RejectedExecutionException ex) { 116 throw new TaskRejectedException( 117 "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex); 118 } 119 } 120 121 @Override 122 public <T> Future<T> submit(Callable<T> task) { 123 try { 124 if (this.taskDecorator == null && this.concurrentExecutor instanceof ExecutorService) { 125 return ((ExecutorService) this.concurrentExecutor).submit(task); 126 } 127 else { 128 FutureTask<T> future = new FutureTask<T>(task); 129 doExecute(this.concurrentExecutor, this.taskDecorator, future); 130 return future; 131 } 132 } 133 catch (RejectedExecutionException ex) { 134 throw new TaskRejectedException( 135 "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex); 136 } 137 } 138 139 @Override 140 public ListenableFuture<?> submitListenable(Runnable task) { 141 try { 142 ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null); 143 doExecute(this.concurrentExecutor, this.taskDecorator, future); 144 return future; 145 } 146 catch (RejectedExecutionException ex) { 147 throw new TaskRejectedException( 148 "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex); 149 } 150 } 151 152 @Override 153 public <T> ListenableFuture<T> submitListenable(Callable<T> task) { 154 try { 155 ListenableFutureTask<T> future = new ListenableFutureTask<T>(task); 156 doExecute(this.concurrentExecutor, this.taskDecorator, future); 157 return future; 158 } 159 catch (RejectedExecutionException ex) { 160 throw new TaskRejectedException( 161 "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex); 162 } 163 } 164 165 166 /** 167 * Actually execute the given {@code Runnable} (which may be a user-supplied task 168 * or a wrapper around a user-supplied task) with the given executor. 169 * @param concurrentExecutor the underlying JDK concurrent executor to delegate to 170 * @param taskDecorator the specified decorator to be applied, if any 171 * @param runnable the runnable to execute 172 * @throws RejectedExecutionException if the given runnable cannot be accepted 173 * @since 4.3 174 */ 175 protected void doExecute(Executor concurrentExecutor, TaskDecorator taskDecorator, Runnable runnable) 176 throws RejectedExecutionException{ 177 178 concurrentExecutor.execute(taskDecorator != null ? taskDecorator.decorate(runnable) : runnable); 179 } 180 181}