001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.util.concurrent; 018 019import java.util.concurrent.Callable; 020import java.util.concurrent.CompletableFuture; 021import java.util.concurrent.ExecutionException; 022import java.util.concurrent.FutureTask; 023 024import org.springframework.lang.Nullable; 025 026/** 027 * Extension of {@link FutureTask} that implements {@link ListenableFuture}. 028 * 029 * @author Arjen Poutsma 030 * @since 4.0 031 * @param <T> the result type returned by this Future's {@code get} method 032 */ 033public class ListenableFutureTask<T> extends FutureTask<T> implements ListenableFuture<T> { 034 035 private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry<>(); 036 037 038 /** 039 * Create a new {@code ListenableFutureTask} that will, upon running, 040 * execute the given {@link Callable}. 041 * @param callable the callable task 042 */ 043 public ListenableFutureTask(Callable<T> callable) { 044 super(callable); 045 } 046 047 /** 048 * Create a {@code ListenableFutureTask} that will, upon running, 049 * execute the given {@link Runnable}, and arrange that {@link #get()} 050 * will return the given result on successful completion. 051 * @param runnable the runnable task 052 * @param result the result to return on successful completion 053 */ 054 public ListenableFutureTask(Runnable runnable, @Nullable T result) { 055 super(runnable, result); 056 } 057 058 059 @Override 060 public void addCallback(ListenableFutureCallback<? super T> callback) { 061 this.callbacks.addCallback(callback); 062 } 063 064 @Override 065 public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) { 066 this.callbacks.addSuccessCallback(successCallback); 067 this.callbacks.addFailureCallback(failureCallback); 068 } 069 070 @Override 071 public CompletableFuture<T> completable() { 072 CompletableFuture<T> completable = new DelegatingCompletableFuture<>(this); 073 this.callbacks.addSuccessCallback(completable::complete); 074 this.callbacks.addFailureCallback(completable::completeExceptionally); 075 return completable; 076 } 077 078 079 @Override 080 protected void done() { 081 Throwable cause; 082 try { 083 T result = get(); 084 this.callbacks.success(result); 085 return; 086 } 087 catch (InterruptedException ex) { 088 Thread.currentThread().interrupt(); 089 return; 090 } 091 catch (ExecutionException ex) { 092 cause = ex.getCause(); 093 if (cause == null) { 094 cause = ex; 095 } 096 } 097 catch (Throwable ex) { 098 cause = ex; 099 } 100 this.callbacks.failure(cause); 101 } 102 103}