001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.util.concurrent; 018 019import java.util.LinkedList; 020import java.util.Queue; 021 022import org.springframework.util.Assert; 023 024/** 025 * Helper class for {@link ListenableFuture} implementations that maintains a 026 * of success and failure callbacks and helps to notify them. 027 * 028 * <p>Inspired by {@code com.google.common.util.concurrent.ExecutionList}. 029 * 030 * @author Arjen Poutsma 031 * @author Sebastien Deleuze 032 * @author Rossen Stoyanchev 033 * @since 4.0 034 */ 035public class ListenableFutureCallbackRegistry<T> { 036 037 private final Queue<SuccessCallback<? super T>> successCallbacks = new LinkedList<SuccessCallback<? super T>>(); 038 039 private final Queue<FailureCallback> failureCallbacks = new LinkedList<FailureCallback>(); 040 041 private State state = State.NEW; 042 043 private Object result = null; 044 045 private final Object mutex = new Object(); 046 047 048 /** 049 * Add the given callback to this registry. 050 * @param callback the callback to add 051 */ 052 public void addCallback(ListenableFutureCallback<? super T> callback) { 053 Assert.notNull(callback, "'callback' must not be null"); 054 synchronized (this.mutex) { 055 switch (this.state) { 056 case NEW: 057 this.successCallbacks.add(callback); 058 this.failureCallbacks.add(callback); 059 break; 060 case SUCCESS: 061 notifySuccess(callback); 062 break; 063 case FAILURE: 064 notifyFailure(callback); 065 break; 066 } 067 } 068 } 069 070 @SuppressWarnings("unchecked") 071 private void notifySuccess(SuccessCallback<? super T> callback) { 072 try { 073 callback.onSuccess((T) this.result); 074 } 075 catch (Throwable ex) { 076 // Ignore 077 } 078 } 079 080 private void notifyFailure(FailureCallback callback) { 081 try { 082 callback.onFailure((Throwable) this.result); 083 } 084 catch (Throwable ex) { 085 // Ignore 086 } 087 } 088 089 /** 090 * Add the given success callback to this registry. 091 * @param callback the success callback to add 092 * @since 4.1 093 */ 094 @SuppressWarnings("unchecked") 095 public void addSuccessCallback(SuccessCallback<? super T> callback) { 096 Assert.notNull(callback, "'callback' must not be null"); 097 synchronized (this.mutex) { 098 switch (this.state) { 099 case NEW: 100 this.successCallbacks.add(callback); 101 break; 102 case SUCCESS: 103 notifySuccess(callback); 104 break; 105 } 106 } 107 } 108 109 /** 110 * Add the given failure callback to this registry. 111 * @param callback the failure callback to add 112 * @since 4.1 113 */ 114 public void addFailureCallback(FailureCallback callback) { 115 Assert.notNull(callback, "'callback' must not be null"); 116 synchronized (this.mutex) { 117 switch (this.state) { 118 case NEW: 119 this.failureCallbacks.add(callback); 120 break; 121 case FAILURE: 122 notifyFailure(callback); 123 break; 124 } 125 } 126 } 127 128 /** 129 * Trigger a {@link ListenableFutureCallback#onSuccess(Object)} call on all 130 * added callbacks with the given result. 131 * @param result the result to trigger the callbacks with 132 */ 133 public void success(T result) { 134 synchronized (this.mutex) { 135 this.state = State.SUCCESS; 136 this.result = result; 137 SuccessCallback<? super T> callback; 138 while ((callback = this.successCallbacks.poll()) != null) { 139 notifySuccess(callback); 140 } 141 } 142 } 143 144 /** 145 * Trigger a {@link ListenableFutureCallback#onFailure(Throwable)} call on all 146 * added callbacks with the given {@code Throwable}. 147 * @param ex the exception to trigger the callbacks with 148 */ 149 public void failure(Throwable ex) { 150 synchronized (this.mutex) { 151 this.state = State.FAILURE; 152 this.result = ex; 153 FailureCallback callback; 154 while ((callback = this.failureCallbacks.poll()) != null) { 155 notifyFailure(callback); 156 } 157 } 158 } 159 160 161 private enum State {NEW, SUCCESS, FAILURE} 162 163}