001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.util.concurrent;
018
019import java.util.concurrent.Callable;
020import java.util.concurrent.CompletableFuture;
021import java.util.concurrent.ExecutionException;
022import java.util.concurrent.TimeUnit;
023import java.util.concurrent.TimeoutException;
024
025import org.springframework.lang.Nullable;
026import org.springframework.util.Assert;
027
028/**
029 * A {@link ListenableFuture} whose value can be set via {@link #set(Object)}
030 * or {@link #setException(Throwable)}. It may also get cancelled.
031 *
032 * <p>Inspired by {@code com.google.common.util.concurrent.SettableFuture}.
033 *
034 * @author Mattias Severson
035 * @author Rossen Stoyanchev
036 * @author Juergen Hoeller
037 * @since 4.1
038 * @param <T> the result type returned by this Future's {@code get} method
039 */
040public class SettableListenableFuture<T> implements ListenableFuture<T> {
041
042        private static final Callable<Object> DUMMY_CALLABLE = () -> {
043                throw new IllegalStateException("Should never be called");
044        };
045
046
047        private final SettableTask<T> settableTask = new SettableTask<>();
048
049
050        /**
051         * Set the value of this future. This method will return {@code true} if the
052         * value was set successfully, or {@code false} if the future has already been
053         * set or cancelled.
054         * @param value the value that will be set
055         * @return {@code true} if the value was successfully set, else {@code false}
056         */
057        public boolean set(@Nullable T value) {
058                return this.settableTask.setResultValue(value);
059        }
060
061        /**
062         * Set the exception of this future. This method will return {@code true} if the
063         * exception was set successfully, or {@code false} if the future has already been
064         * set or cancelled.
065         * @param exception the value that will be set
066         * @return {@code true} if the exception was successfully set, else {@code false}
067         */
068        public boolean setException(Throwable exception) {
069                Assert.notNull(exception, "Exception must not be null");
070                return this.settableTask.setExceptionResult(exception);
071        }
072
073
074        @Override
075        public void addCallback(ListenableFutureCallback<? super T> callback) {
076                this.settableTask.addCallback(callback);
077        }
078
079        @Override
080        public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
081                this.settableTask.addCallback(successCallback, failureCallback);
082        }
083
084        @Override
085        public CompletableFuture<T> completable() {
086                return this.settableTask.completable();
087        }
088
089
090        @Override
091        public boolean cancel(boolean mayInterruptIfRunning) {
092                boolean cancelled = this.settableTask.cancel(mayInterruptIfRunning);
093                if (cancelled && mayInterruptIfRunning) {
094                        interruptTask();
095                }
096                return cancelled;
097        }
098
099        @Override
100        public boolean isCancelled() {
101                return this.settableTask.isCancelled();
102        }
103
104        @Override
105        public boolean isDone() {
106                return this.settableTask.isDone();
107        }
108
109        /**
110         * Retrieve the value.
111         * <p>This method returns the value if it has been set via {@link #set(Object)},
112         * throws an {@link java.util.concurrent.ExecutionException} if an exception has
113         * been set via {@link #setException(Throwable)}, or throws a
114         * {@link java.util.concurrent.CancellationException} if the future has been cancelled.
115         * @return the value associated with this future
116         */
117        @Override
118        public T get() throws InterruptedException, ExecutionException {
119                return this.settableTask.get();
120        }
121
122        /**
123         * Retrieve the value.
124         * <p>This method returns the value if it has been set via {@link #set(Object)},
125         * throws an {@link java.util.concurrent.ExecutionException} if an exception has
126         * been set via {@link #setException(Throwable)}, or throws a
127         * {@link java.util.concurrent.CancellationException} if the future has been cancelled.
128         * @param timeout the maximum time to wait
129         * @param unit the unit of the timeout argument
130         * @return the value associated with this future
131         */
132        @Override
133        public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
134                return this.settableTask.get(timeout, unit);
135        }
136
137        /**
138         * Subclasses can override this method to implement interruption of the future's
139         * computation. The method is invoked automatically by a successful call to
140         * {@link #cancel(boolean) cancel(true)}.
141         * <p>The default implementation is empty.
142         */
143        protected void interruptTask() {
144        }
145
146
147        private static class SettableTask<T> extends ListenableFutureTask<T> {
148
149                @Nullable
150                private volatile Thread completingThread;
151
152                @SuppressWarnings("unchecked")
153                public SettableTask() {
154                        super((Callable<T>) DUMMY_CALLABLE);
155                }
156
157                public boolean setResultValue(@Nullable T value) {
158                        set(value);
159                        return checkCompletingThread();
160                }
161
162                public boolean setExceptionResult(Throwable exception) {
163                        setException(exception);
164                        return checkCompletingThread();
165                }
166
167                @Override
168                protected void done() {
169                        if (!isCancelled()) {
170                                // Implicitly invoked by set/setException: store current thread for
171                                // determining whether the given result has actually triggered completion
172                                // (since FutureTask.set/setException unfortunately don't expose that)
173                                this.completingThread = Thread.currentThread();
174                        }
175                        super.done();
176                }
177
178                private boolean checkCompletingThread() {
179                        boolean check = (this.completingThread == Thread.currentThread());
180                        if (check) {
181                                this.completingThread = null;  // only first match actually counts
182                        }
183                        return check;
184                }
185        }
186
187}