001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.util.concurrent;
018
019import java.util.LinkedList;
020import java.util.Queue;
021
022import org.springframework.util.Assert;
023
024/**
025 * Helper class for {@link ListenableFuture} implementations that maintains a
026 * of success and failure callbacks and helps to notify them.
027 *
028 * <p>Inspired by {@code com.google.common.util.concurrent.ExecutionList}.
029 *
030 * @author Arjen Poutsma
031 * @author Sebastien Deleuze
032 * @author Rossen Stoyanchev
033 * @since 4.0
034 */
035public class ListenableFutureCallbackRegistry<T> {
036
037        private final Queue<SuccessCallback<? super T>> successCallbacks = new LinkedList<SuccessCallback<? super T>>();
038
039        private final Queue<FailureCallback> failureCallbacks = new LinkedList<FailureCallback>();
040
041        private State state = State.NEW;
042
043        private Object result = null;
044
045        private final Object mutex = new Object();
046
047
048        /**
049         * Add the given callback to this registry.
050         * @param callback the callback to add
051         */
052        public void addCallback(ListenableFutureCallback<? super T> callback) {
053                Assert.notNull(callback, "'callback' must not be null");
054                synchronized (this.mutex) {
055                        switch (this.state) {
056                                case NEW:
057                                        this.successCallbacks.add(callback);
058                                        this.failureCallbacks.add(callback);
059                                        break;
060                                case SUCCESS:
061                                        notifySuccess(callback);
062                                        break;
063                                case FAILURE:
064                                        notifyFailure(callback);
065                                        break;
066                        }
067                }
068        }
069
070        @SuppressWarnings("unchecked")
071        private void notifySuccess(SuccessCallback<? super T> callback) {
072                try {
073                        callback.onSuccess((T) this.result);
074                }
075                catch (Throwable ex) {
076                        // Ignore
077                }
078        }
079
080        private void notifyFailure(FailureCallback callback) {
081                try {
082                        callback.onFailure((Throwable) this.result);
083                }
084                catch (Throwable ex) {
085                        // Ignore
086                }
087        }
088
089        /**
090         * Add the given success callback to this registry.
091         * @param callback the success callback to add
092         * @since 4.1
093         */
094        @SuppressWarnings("unchecked")
095        public void addSuccessCallback(SuccessCallback<? super T> callback) {
096                Assert.notNull(callback, "'callback' must not be null");
097                synchronized (this.mutex) {
098                        switch (this.state) {
099                                case NEW:
100                                        this.successCallbacks.add(callback);
101                                        break;
102                                case SUCCESS:
103                                        notifySuccess(callback);
104                                        break;
105                        }
106                }
107        }
108
109        /**
110         * Add the given failure callback to this registry.
111         * @param callback the failure callback to add
112         * @since 4.1
113         */
114        public void addFailureCallback(FailureCallback callback) {
115                Assert.notNull(callback, "'callback' must not be null");
116                synchronized (this.mutex) {
117                        switch (this.state) {
118                                case NEW:
119                                        this.failureCallbacks.add(callback);
120                                        break;
121                                case FAILURE:
122                                        notifyFailure(callback);
123                                        break;
124                        }
125                }
126        }
127
128        /**
129         * Trigger a {@link ListenableFutureCallback#onSuccess(Object)} call on all
130         * added callbacks with the given result.
131         * @param result the result to trigger the callbacks with
132         */
133        public void success(T result) {
134                synchronized (this.mutex) {
135                        this.state = State.SUCCESS;
136                        this.result = result;
137                        SuccessCallback<? super T> callback;
138                        while ((callback = this.successCallbacks.poll()) != null) {
139                                notifySuccess(callback);
140                        }
141                }
142        }
143
144        /**
145         * Trigger a {@link ListenableFutureCallback#onFailure(Throwable)} call on all
146         * added callbacks with the given {@code Throwable}.
147         * @param ex the exception to trigger the callbacks with
148         */
149        public void failure(Throwable ex) {
150                synchronized (this.mutex) {
151                        this.state = State.FAILURE;
152                        this.result = ex;
153                        FailureCallback callback;
154                        while ((callback = this.failureCallbacks.poll()) != null) {
155                                notifyFailure(callback);
156                        }
157                }
158        }
159
160
161        private enum State {NEW, SUCCESS, FAILURE}
162
163}