001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.core;
018
019import java.util.function.Function;
020
021import org.reactivestreams.Publisher;
022
023import org.springframework.lang.Nullable;
024import org.springframework.util.Assert;
025
026/**
027 * Adapter for a Reactive Streams {@link Publisher} to and from an async/reactive
028 * type such as {@code CompletableFuture}, RxJava {@code Observable}, and others.
029 *
030 * <p>An adapter is typically obtained via {@link ReactiveAdapterRegistry}.
031 *
032 * @author Rossen Stoyanchev
033 * @since 5.0
034 */
035public class ReactiveAdapter {
036
037        private final ReactiveTypeDescriptor descriptor;
038
039        private final Function<Object, Publisher<?>> toPublisherFunction;
040
041        private final Function<Publisher<?>, Object> fromPublisherFunction;
042
043
044        /**
045         * Constructor for an adapter with functions to convert the target reactive
046         * or async type to and from a Reactive Streams Publisher.
047         * @param descriptor the reactive type descriptor
048         * @param toPublisherFunction adapter to a Publisher
049         * @param fromPublisherFunction adapter from a Publisher
050         */
051        public ReactiveAdapter(ReactiveTypeDescriptor descriptor,
052                        Function<Object, Publisher<?>> toPublisherFunction,
053                        Function<Publisher<?>, Object> fromPublisherFunction) {
054
055                Assert.notNull(descriptor, "'descriptor' is required");
056                Assert.notNull(toPublisherFunction, "'toPublisherFunction' is required");
057                Assert.notNull(fromPublisherFunction, "'fromPublisherFunction' is required");
058
059                this.descriptor = descriptor;
060                this.toPublisherFunction = toPublisherFunction;
061                this.fromPublisherFunction = fromPublisherFunction;
062        }
063
064
065        /**
066         * Return the descriptor of the reactive type for the adapter.
067         */
068        public ReactiveTypeDescriptor getDescriptor() {
069                return this.descriptor;
070        }
071
072        /**
073         * Shortcut for {@code getDescriptor().getReactiveType()}.
074         */
075        public Class<?> getReactiveType() {
076                return getDescriptor().getReactiveType();
077        }
078
079        /**
080         * Shortcut for {@code getDescriptor().isMultiValue()}.
081         */
082        public boolean isMultiValue() {
083                return getDescriptor().isMultiValue();
084        }
085
086        /**
087         * Shortcut for {@code getDescriptor().isNoValue()}.
088         */
089        public boolean isNoValue() {
090                return getDescriptor().isNoValue();
091        }
092
093        /**
094         * Shortcut for {@code getDescriptor().supportsEmpty()}.
095         */
096        public boolean supportsEmpty() {
097                return getDescriptor().supportsEmpty();
098        }
099
100
101        /**
102         * Adapt the given instance to a Reactive Streams {@code Publisher}.
103         * @param source the source object to adapt from; if the given object is
104         * {@code null}, {@link ReactiveTypeDescriptor#getEmptyValue()} is used.
105         * @return the Publisher representing the adaptation
106         */
107        @SuppressWarnings("unchecked")
108        public <T> Publisher<T> toPublisher(@Nullable Object source) {
109                if (source == null) {
110                        source = getDescriptor().getEmptyValue();
111                }
112                return (Publisher<T>) this.toPublisherFunction.apply(source);
113        }
114
115        /**
116         * Adapt from the given Reactive Streams Publisher.
117         * @param publisher the publisher to adapt from
118         * @return the reactive type instance representing the adapted publisher
119         */
120        public Object fromPublisher(Publisher<?> publisher) {
121                return this.fromPublisherFunction.apply(publisher);
122        }
123
124}