001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.core;
018
019import java.lang.reflect.Method;
020import java.util.ArrayList;
021import java.util.List;
022import java.util.Optional;
023import java.util.concurrent.CompletableFuture;
024import java.util.concurrent.CompletionStage;
025import java.util.function.Function;
026
027import io.reactivex.BackpressureStrategy;
028import io.reactivex.Flowable;
029import kotlinx.coroutines.CompletableDeferredKt;
030import kotlinx.coroutines.Deferred;
031import org.reactivestreams.Publisher;
032import reactor.blockhound.BlockHound;
033import reactor.blockhound.integration.BlockHoundIntegration;
034import reactor.core.publisher.Flux;
035import reactor.core.publisher.Mono;
036import rx.RxReactiveStreams;
037
038import org.springframework.lang.Nullable;
039import org.springframework.util.ClassUtils;
040import org.springframework.util.ConcurrentReferenceHashMap;
041import org.springframework.util.ReflectionUtils;
042
043/**
044 * A registry of adapters to adapt Reactive Streams {@link Publisher} to/from
045 * various async/reactive types such as {@code CompletableFuture}, RxJava
046 * {@code Observable}, and others.
047 *
048 * <p>By default, depending on classpath availability, adapters are registered
049 * for Reactor, RxJava 1, RxJava 2 types, {@link CompletableFuture}, Java 9+
050 * {@code Flow.Publisher} and Kotlin Coroutines {@code Deferred} and {@code Flow}.
051 *
052 * @author Rossen Stoyanchev
053 * @author Sebastien Deleuze
054 * @since 5.0
055 */
056public class ReactiveAdapterRegistry {
057
058        @Nullable
059        private static volatile ReactiveAdapterRegistry sharedInstance;
060
061        private final boolean reactorPresent;
062
063        private final List<ReactiveAdapter> adapters = new ArrayList<>();
064
065
066        /**
067         * Create a registry and auto-register default adapters.
068         * @see #getSharedInstance()
069         */
070        public ReactiveAdapterRegistry() {
071                ClassLoader classLoader = ReactiveAdapterRegistry.class.getClassLoader();
072
073                // Reactor
074                boolean reactorRegistered = false;
075                if (ClassUtils.isPresent("reactor.core.publisher.Flux", classLoader)) {
076                        new ReactorRegistrar().registerAdapters(this);
077                        reactorRegistered = true;
078                }
079                this.reactorPresent = reactorRegistered;
080
081                // RxJava1
082                if (ClassUtils.isPresent("rx.Observable", classLoader) &&
083                                ClassUtils.isPresent("rx.RxReactiveStreams", classLoader)) {
084                        new RxJava1Registrar().registerAdapters(this);
085                }
086
087                // RxJava2
088                if (ClassUtils.isPresent("io.reactivex.Flowable", classLoader)) {
089                        new RxJava2Registrar().registerAdapters(this);
090                }
091
092                // Java 9+ Flow.Publisher
093                if (ClassUtils.isPresent("java.util.concurrent.Flow.Publisher", classLoader)) {
094                        new ReactorJdkFlowAdapterRegistrar().registerAdapter(this);
095                }
096                // If not present, do nothing for the time being...
097                // We can fall back on "reactive-streams-flow-bridge" (once released)
098
099                // Coroutines
100                if (this.reactorPresent && ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader)) {
101                        new CoroutinesRegistrar().registerAdapters(this);
102                }
103        }
104
105
106        /**
107         * Whether the registry has any adapters which would be the case if any of
108         * Reactor, RxJava 2, or RxJava 1 (+ RxJava Reactive Streams bridge) are
109         * present on the classpath.
110         */
111        public boolean hasAdapters() {
112                return !this.adapters.isEmpty();
113        }
114
115        /**
116         * Register a reactive type along with functions to adapt to and from a
117         * Reactive Streams {@link Publisher}. The function arguments assume that
118         * their input is neither {@code null} nor {@link Optional}.
119         */
120        public void registerReactiveType(ReactiveTypeDescriptor descriptor,
121                        Function<Object, Publisher<?>> toAdapter, Function<Publisher<?>, Object> fromAdapter) {
122
123                if (this.reactorPresent) {
124                        this.adapters.add(new ReactorAdapter(descriptor, toAdapter, fromAdapter));
125                }
126                else {
127                        this.adapters.add(new ReactiveAdapter(descriptor, toAdapter, fromAdapter));
128                }
129        }
130
131        /**
132         * Get the adapter for the given reactive type.
133         * @return the corresponding adapter, or {@code null} if none available
134         */
135        @Nullable
136        public ReactiveAdapter getAdapter(Class<?> reactiveType) {
137                return getAdapter(reactiveType, null);
138        }
139
140        /**
141         * Get the adapter for the given reactive type. Or if a "source" object is
142         * provided, its actual type is used instead.
143         * @param reactiveType the reactive type
144         * (may be {@code null} if a concrete source object is given)
145         * @param source an instance of the reactive type
146         * (i.e. to adapt from; may be {@code null} if the reactive type is specified)
147         * @return the corresponding adapter, or {@code null} if none available
148         */
149        @Nullable
150        public ReactiveAdapter getAdapter(@Nullable Class<?> reactiveType, @Nullable Object source) {
151                if (this.adapters.isEmpty()) {
152                        return null;
153                }
154
155                Object sourceToUse = (source instanceof Optional ? ((Optional<?>) source).orElse(null) : source);
156                Class<?> clazz = (sourceToUse != null ? sourceToUse.getClass() : reactiveType);
157                if (clazz == null) {
158                        return null;
159                }
160                for (ReactiveAdapter adapter : this.adapters) {
161                        if (adapter.getReactiveType() == clazz) {
162                                return adapter;
163                        }
164                }
165                for (ReactiveAdapter adapter : this.adapters) {
166                        if (adapter.getReactiveType().isAssignableFrom(clazz)) {
167                                return adapter;
168                        }
169                }
170                return null;
171        }
172
173
174        /**
175         * Return a shared default {@code ReactiveAdapterRegistry} instance,
176         * lazily building it once needed.
177         * <p><b>NOTE:</b> We highly recommend passing a long-lived, pre-configured
178         * {@code ReactiveAdapterRegistry} instance for customization purposes.
179         * This accessor is only meant as a fallback for code paths that want to
180         * fall back on a default instance if one isn't provided.
181         * @return the shared {@code ReactiveAdapterRegistry} instance
182         * @since 5.0.2
183         */
184        public static ReactiveAdapterRegistry getSharedInstance() {
185                ReactiveAdapterRegistry registry = sharedInstance;
186                if (registry == null) {
187                        synchronized (ReactiveAdapterRegistry.class) {
188                                registry = sharedInstance;
189                                if (registry == null) {
190                                        registry = new ReactiveAdapterRegistry();
191                                        sharedInstance = registry;
192                                }
193                        }
194                }
195                return registry;
196        }
197
198
199        private static class ReactorRegistrar {
200
201                void registerAdapters(ReactiveAdapterRegistry registry) {
202                        // Register Flux and Mono before Publisher...
203
204                        registry.registerReactiveType(
205                                        ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty),
206                                        source -> (Mono<?>) source,
207                                        Mono::from
208                        );
209
210                        registry.registerReactiveType(
211                                        ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty),
212                                        source -> (Flux<?>) source,
213                                        Flux::from);
214
215                        registry.registerReactiveType(
216                                        ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty),
217                                        source -> (Publisher<?>) source,
218                                        source -> source);
219
220                        registry.registerReactiveType(
221                                        ReactiveTypeDescriptor.nonDeferredAsyncValue(CompletionStage.class, EmptyCompletableFuture::new),
222                                        source -> Mono.fromCompletionStage((CompletionStage<?>) source),
223                                        source -> Mono.from(source).toFuture()
224                        );
225                }
226        }
227
228
229        private static class RxJava1Registrar {
230
231                void registerAdapters(ReactiveAdapterRegistry registry) {
232                        registry.registerReactiveType(
233                                        ReactiveTypeDescriptor.multiValue(rx.Observable.class, rx.Observable::empty),
234                                        source -> RxReactiveStreams.toPublisher((rx.Observable<?>) source),
235                                        RxReactiveStreams::toObservable
236                        );
237                        registry.registerReactiveType(
238                                        ReactiveTypeDescriptor.singleRequiredValue(rx.Single.class),
239                                        source -> RxReactiveStreams.toPublisher((rx.Single<?>) source),
240                                        RxReactiveStreams::toSingle
241                        );
242                        registry.registerReactiveType(
243                                        ReactiveTypeDescriptor.noValue(rx.Completable.class, rx.Completable::complete),
244                                        source -> RxReactiveStreams.toPublisher((rx.Completable) source),
245                                        RxReactiveStreams::toCompletable
246                        );
247                }
248        }
249
250
251        private static class RxJava2Registrar {
252
253                void registerAdapters(ReactiveAdapterRegistry registry) {
254                        registry.registerReactiveType(
255                                        ReactiveTypeDescriptor.multiValue(io.reactivex.Flowable.class, io.reactivex.Flowable::empty),
256                                        source -> (io.reactivex.Flowable<?>) source,
257                                        Flowable::fromPublisher
258                        );
259                        registry.registerReactiveType(
260                                        ReactiveTypeDescriptor.multiValue(io.reactivex.Observable.class, io.reactivex.Observable::empty),
261                                        source -> ((io.reactivex.Observable<?>) source).toFlowable(BackpressureStrategy.BUFFER),
262                                        source -> io.reactivex.Flowable.fromPublisher(source).toObservable()
263                        );
264                        registry.registerReactiveType(
265                                        ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class),
266                                        source -> ((io.reactivex.Single<?>) source).toFlowable(),
267                                        source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement().toSingle()
268                        );
269                        registry.registerReactiveType(
270                                        ReactiveTypeDescriptor.singleOptionalValue(io.reactivex.Maybe.class, io.reactivex.Maybe::empty),
271                                        source -> ((io.reactivex.Maybe<?>) source).toFlowable(),
272                                        source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement()
273                        );
274                        registry.registerReactiveType(
275                                        ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class, io.reactivex.Completable::complete),
276                                        source -> ((io.reactivex.Completable) source).toFlowable(),
277                                        source -> io.reactivex.Flowable.fromPublisher(source).toObservable().ignoreElements()
278                        );
279                }
280        }
281
282
283        private static class ReactorJdkFlowAdapterRegistrar {
284
285                void registerAdapter(ReactiveAdapterRegistry registry) {
286                        // TODO: remove reflection when build requires JDK 9+
287
288                        try {
289                                String publisherName = "java.util.concurrent.Flow.Publisher";
290                                Class<?> publisherClass = ClassUtils.forName(publisherName, getClass().getClassLoader());
291
292                                String adapterName = "reactor.adapter.JdkFlowAdapter";
293                                Class<?> flowAdapterClass = ClassUtils.forName(adapterName,  getClass().getClassLoader());
294
295                                Method toFluxMethod = flowAdapterClass.getMethod("flowPublisherToFlux", publisherClass);
296                                Method toFlowMethod = flowAdapterClass.getMethod("publisherToFlowPublisher", Publisher.class);
297                                Object emptyFlow = ReflectionUtils.invokeMethod(toFlowMethod, null, Flux.empty());
298
299                                registry.registerReactiveType(
300                                                ReactiveTypeDescriptor.multiValue(publisherClass, () -> emptyFlow),
301                                                source -> (Publisher<?>) ReflectionUtils.invokeMethod(toFluxMethod, null, source),
302                                                publisher -> ReflectionUtils.invokeMethod(toFlowMethod, null, publisher)
303                                );
304                        }
305                        catch (Throwable ex) {
306                                // Ignore
307                        }
308                }
309        }
310
311
312        /**
313         * ReactiveAdapter variant that wraps adapted Publishers as {@link Flux} or
314         * {@link Mono} depending on {@link ReactiveTypeDescriptor#isMultiValue()}.
315         * This is important in places where only the stream and stream element type
316         * information is available like encoders and decoders.
317         */
318        private static class ReactorAdapter extends ReactiveAdapter {
319
320                ReactorAdapter(ReactiveTypeDescriptor descriptor,
321                                Function<Object, Publisher<?>> toPublisherFunction,
322                                Function<Publisher<?>, Object> fromPublisherFunction) {
323
324                        super(descriptor, toPublisherFunction, fromPublisherFunction);
325                }
326
327                @Override
328                public <T> Publisher<T> toPublisher(@Nullable Object source) {
329                        Publisher<T> publisher = super.toPublisher(source);
330                        return (isMultiValue() ? Flux.from(publisher) : Mono.from(publisher));
331                }
332        }
333
334
335        private static class EmptyCompletableFuture<T> extends CompletableFuture<T> {
336
337                EmptyCompletableFuture() {
338                        complete(null);
339                }
340        }
341
342
343        private static class CoroutinesRegistrar {
344
345                @SuppressWarnings("KotlinInternalInJava")
346                void registerAdapters(ReactiveAdapterRegistry registry) {
347                        registry.registerReactiveType(
348                                        ReactiveTypeDescriptor.singleOptionalValue(Deferred.class,
349                                                        () -> CompletableDeferredKt.CompletableDeferred(null)),
350                                        source -> CoroutinesUtils.deferredToMono((Deferred<?>) source),
351                                        source -> CoroutinesUtils.monoToDeferred(Mono.from(source)));
352
353                        registry.registerReactiveType(
354                                        ReactiveTypeDescriptor.multiValue(kotlinx.coroutines.flow.Flow.class, kotlinx.coroutines.flow.FlowKt::emptyFlow),
355                                        source -> kotlinx.coroutines.reactor.ReactorFlowKt.asFlux((kotlinx.coroutines.flow.Flow<?>) source),
356                                        kotlinx.coroutines.reactive.ReactiveFlowKt::asFlow
357                        );
358                }
359        }
360
361
362        /**
363         * {@code BlockHoundIntegration} for spring-core classes.
364         * <p>Explicitly allow the following:
365         * <ul>
366         * <li>Reading class info via {@link LocalVariableTableParameterNameDiscoverer}.
367         * <li>Locking within {@link ConcurrentReferenceHashMap}.
368         * </ul>
369         * @since 5.2.4
370         */
371        public static class SpringCoreBlockHoundIntegration implements BlockHoundIntegration {
372
373                @Override
374                public void applyTo(BlockHound.Builder builder) {
375
376                        // Avoid hard references potentially anywhere in spring-core (no need for structural dependency)
377
378                        builder.allowBlockingCallsInside(
379                                        "org.springframework.core.LocalVariableTableParameterNameDiscoverer", "inspectClass");
380
381                        String className = "org.springframework.util.ConcurrentReferenceHashMap$Segment";
382                        builder.allowBlockingCallsInside(className, "doTask");
383                        builder.allowBlockingCallsInside(className, "clear");
384                        builder.allowBlockingCallsInside(className, "restructure");
385                }
386        }
387
388}