001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.util.concurrent;
018
019import java.util.LinkedList;
020import java.util.Queue;
021
022import org.springframework.lang.Nullable;
023import org.springframework.util.Assert;
024
025/**
026 * Helper class for {@link ListenableFuture} implementations that maintains a
027 * of success and failure callbacks and helps to notify them.
028 *
029 * <p>Inspired by {@code com.google.common.util.concurrent.ExecutionList}.
030 *
031 * @author Arjen Poutsma
032 * @author Sebastien Deleuze
033 * @author Rossen Stoyanchev
034 * @since 4.0
035 * @param <T> the callback result type
036 */
037public class ListenableFutureCallbackRegistry<T> {
038
039        private final Queue<SuccessCallback<? super T>> successCallbacks = new LinkedList<>();
040
041        private final Queue<FailureCallback> failureCallbacks = new LinkedList<>();
042
043        private State state = State.NEW;
044
045        @Nullable
046        private Object result;
047
048        private final Object mutex = new Object();
049
050
051        /**
052         * Add the given callback to this registry.
053         * @param callback the callback to add
054         */
055        public void addCallback(ListenableFutureCallback<? super T> callback) {
056                Assert.notNull(callback, "'callback' must not be null");
057                synchronized (this.mutex) {
058                        switch (this.state) {
059                                case NEW:
060                                        this.successCallbacks.add(callback);
061                                        this.failureCallbacks.add(callback);
062                                        break;
063                                case SUCCESS:
064                                        notifySuccess(callback);
065                                        break;
066                                case FAILURE:
067                                        notifyFailure(callback);
068                                        break;
069                        }
070                }
071        }
072
073        @SuppressWarnings("unchecked")
074        private void notifySuccess(SuccessCallback<? super T> callback) {
075                try {
076                        callback.onSuccess((T) this.result);
077                }
078                catch (Throwable ex) {
079                        // Ignore
080                }
081        }
082
083        private void notifyFailure(FailureCallback callback) {
084                Assert.state(this.result instanceof Throwable, "No Throwable result for failure state");
085                try {
086                        callback.onFailure((Throwable) this.result);
087                }
088                catch (Throwable ex) {
089                        // Ignore
090                }
091        }
092
093        /**
094         * Add the given success callback to this registry.
095         * @param callback the success callback to add
096         * @since 4.1
097         */
098        public void addSuccessCallback(SuccessCallback<? super T> callback) {
099                Assert.notNull(callback, "'callback' must not be null");
100                synchronized (this.mutex) {
101                        switch (this.state) {
102                                case NEW:
103                                        this.successCallbacks.add(callback);
104                                        break;
105                                case SUCCESS:
106                                        notifySuccess(callback);
107                                        break;
108                        }
109                }
110        }
111
112        /**
113         * Add the given failure callback to this registry.
114         * @param callback the failure callback to add
115         * @since 4.1
116         */
117        public void addFailureCallback(FailureCallback callback) {
118                Assert.notNull(callback, "'callback' must not be null");
119                synchronized (this.mutex) {
120                        switch (this.state) {
121                                case NEW:
122                                        this.failureCallbacks.add(callback);
123                                        break;
124                                case FAILURE:
125                                        notifyFailure(callback);
126                                        break;
127                        }
128                }
129        }
130
131        /**
132         * Trigger a {@link ListenableFutureCallback#onSuccess(Object)} call on all
133         * added callbacks with the given result.
134         * @param result the result to trigger the callbacks with
135         */
136        public void success(@Nullable T result) {
137                synchronized (this.mutex) {
138                        this.state = State.SUCCESS;
139                        this.result = result;
140                        SuccessCallback<? super T> callback;
141                        while ((callback = this.successCallbacks.poll()) != null) {
142                                notifySuccess(callback);
143                        }
144                }
145        }
146
147        /**
148         * Trigger a {@link ListenableFutureCallback#onFailure(Throwable)} call on all
149         * added callbacks with the given {@code Throwable}.
150         * @param ex the exception to trigger the callbacks with
151         */
152        public void failure(Throwable ex) {
153                synchronized (this.mutex) {
154                        this.state = State.FAILURE;
155                        this.result = ex;
156                        FailureCallback callback;
157                        while ((callback = this.failureCallbacks.poll()) != null) {
158                                notifyFailure(callback);
159                        }
160                }
161        }
162
163
164        private enum State {NEW, SUCCESS, FAILURE}
165
166}