001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.util.concurrent; 018 019import java.util.LinkedList; 020import java.util.Queue; 021 022import org.springframework.lang.Nullable; 023import org.springframework.util.Assert; 024 025/** 026 * Helper class for {@link ListenableFuture} implementations that maintains a 027 * of success and failure callbacks and helps to notify them. 028 * 029 * <p>Inspired by {@code com.google.common.util.concurrent.ExecutionList}. 030 * 031 * @author Arjen Poutsma 032 * @author Sebastien Deleuze 033 * @author Rossen Stoyanchev 034 * @since 4.0 035 * @param <T> the callback result type 036 */ 037public class ListenableFutureCallbackRegistry<T> { 038 039 private final Queue<SuccessCallback<? super T>> successCallbacks = new LinkedList<>(); 040 041 private final Queue<FailureCallback> failureCallbacks = new LinkedList<>(); 042 043 private State state = State.NEW; 044 045 @Nullable 046 private Object result; 047 048 private final Object mutex = new Object(); 049 050 051 /** 052 * Add the given callback to this registry. 053 * @param callback the callback to add 054 */ 055 public void addCallback(ListenableFutureCallback<? super T> callback) { 056 Assert.notNull(callback, "'callback' must not be null"); 057 synchronized (this.mutex) { 058 switch (this.state) { 059 case NEW: 060 this.successCallbacks.add(callback); 061 this.failureCallbacks.add(callback); 062 break; 063 case SUCCESS: 064 notifySuccess(callback); 065 break; 066 case FAILURE: 067 notifyFailure(callback); 068 break; 069 } 070 } 071 } 072 073 @SuppressWarnings("unchecked") 074 private void notifySuccess(SuccessCallback<? super T> callback) { 075 try { 076 callback.onSuccess((T) this.result); 077 } 078 catch (Throwable ex) { 079 // Ignore 080 } 081 } 082 083 private void notifyFailure(FailureCallback callback) { 084 Assert.state(this.result instanceof Throwable, "No Throwable result for failure state"); 085 try { 086 callback.onFailure((Throwable) this.result); 087 } 088 catch (Throwable ex) { 089 // Ignore 090 } 091 } 092 093 /** 094 * Add the given success callback to this registry. 095 * @param callback the success callback to add 096 * @since 4.1 097 */ 098 public void addSuccessCallback(SuccessCallback<? super T> callback) { 099 Assert.notNull(callback, "'callback' must not be null"); 100 synchronized (this.mutex) { 101 switch (this.state) { 102 case NEW: 103 this.successCallbacks.add(callback); 104 break; 105 case SUCCESS: 106 notifySuccess(callback); 107 break; 108 } 109 } 110 } 111 112 /** 113 * Add the given failure callback to this registry. 114 * @param callback the failure callback to add 115 * @since 4.1 116 */ 117 public void addFailureCallback(FailureCallback callback) { 118 Assert.notNull(callback, "'callback' must not be null"); 119 synchronized (this.mutex) { 120 switch (this.state) { 121 case NEW: 122 this.failureCallbacks.add(callback); 123 break; 124 case FAILURE: 125 notifyFailure(callback); 126 break; 127 } 128 } 129 } 130 131 /** 132 * Trigger a {@link ListenableFutureCallback#onSuccess(Object)} call on all 133 * added callbacks with the given result. 134 * @param result the result to trigger the callbacks with 135 */ 136 public void success(@Nullable T result) { 137 synchronized (this.mutex) { 138 this.state = State.SUCCESS; 139 this.result = result; 140 SuccessCallback<? super T> callback; 141 while ((callback = this.successCallbacks.poll()) != null) { 142 notifySuccess(callback); 143 } 144 } 145 } 146 147 /** 148 * Trigger a {@link ListenableFutureCallback#onFailure(Throwable)} call on all 149 * added callbacks with the given {@code Throwable}. 150 * @param ex the exception to trigger the callbacks with 151 */ 152 public void failure(Throwable ex) { 153 synchronized (this.mutex) { 154 this.state = State.FAILURE; 155 this.result = ex; 156 FailureCallback callback; 157 while ((callback = this.failureCallbacks.poll()) != null) { 158 notifyFailure(callback); 159 } 160 } 161 } 162 163 164 private enum State {NEW, SUCCESS, FAILURE} 165 166}