001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.util.concurrent; 018 019import java.time.Duration; 020import java.util.concurrent.TimeUnit; 021 022import reactor.core.publisher.Mono; 023import reactor.core.publisher.MonoProcessor; 024 025import org.springframework.lang.Nullable; 026import org.springframework.util.Assert; 027 028/** 029 * Adapts a {@link Mono} into a {@link ListenableFuture}. 030 * 031 * @author Rossen Stoyanchev 032 * @author Stephane Maldini 033 * @since 5.1 034 * @param <T> the object type 035 */ 036public class MonoToListenableFutureAdapter<T> implements ListenableFuture<T> { 037 038 private final MonoProcessor<T> processor; 039 040 private final ListenableFutureCallbackRegistry<T> registry = new ListenableFutureCallbackRegistry<>(); 041 042 043 public MonoToListenableFutureAdapter(Mono<T> mono) { 044 Assert.notNull(mono, "Mono must not be null"); 045 this.processor = mono 046 .doOnSuccess(this.registry::success) 047 .doOnError(this.registry::failure) 048 .toProcessor(); 049 } 050 051 052 @Override 053 @Nullable 054 public T get() { 055 return this.processor.block(); 056 } 057 058 @Override 059 @Nullable 060 public T get(long timeout, TimeUnit unit) { 061 Assert.notNull(unit, "TimeUnit must not be null"); 062 Duration duration = Duration.ofMillis(TimeUnit.MILLISECONDS.convert(timeout, unit)); 063 return this.processor.block(duration); 064 } 065 066 @Override 067 public boolean cancel(boolean mayInterruptIfRunning) { 068 if (isCancelled()) { 069 return false; 070 } 071 this.processor.cancel(); 072 // isCancelled may still return false, if mono completed before the cancel 073 return this.processor.isCancelled(); 074 } 075 076 @Override 077 public boolean isCancelled() { 078 return this.processor.isCancelled(); 079 } 080 081 @Override 082 public boolean isDone() { 083 return this.processor.isTerminated(); 084 } 085 086 @Override 087 public void addCallback(ListenableFutureCallback<? super T> callback) { 088 this.registry.addCallback(callback); 089 } 090 091 @Override 092 public void addCallback(SuccessCallback<? super T> success, FailureCallback failure) { 093 this.registry.addSuccessCallback(success); 094 this.registry.addFailureCallback(failure); 095 } 096 097}