001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.core; 018 019import java.lang.reflect.Method; 020import java.util.ArrayList; 021import java.util.List; 022import java.util.Optional; 023import java.util.concurrent.CompletableFuture; 024import java.util.concurrent.CompletionStage; 025import java.util.function.Function; 026 027import io.reactivex.BackpressureStrategy; 028import io.reactivex.Flowable; 029import kotlinx.coroutines.CompletableDeferredKt; 030import kotlinx.coroutines.Deferred; 031import org.reactivestreams.Publisher; 032import reactor.blockhound.BlockHound; 033import reactor.blockhound.integration.BlockHoundIntegration; 034import reactor.core.publisher.Flux; 035import reactor.core.publisher.Mono; 036import rx.RxReactiveStreams; 037 038import org.springframework.lang.Nullable; 039import org.springframework.util.ClassUtils; 040import org.springframework.util.ConcurrentReferenceHashMap; 041import org.springframework.util.ReflectionUtils; 042 043/** 044 * A registry of adapters to adapt Reactive Streams {@link Publisher} to/from 045 * various async/reactive types such as {@code CompletableFuture}, RxJava 046 * {@code Observable}, and others. 047 * 048 * <p>By default, depending on classpath availability, adapters are registered 049 * for Reactor, RxJava 1, RxJava 2 types, {@link CompletableFuture}, Java 9+ 050 * {@code Flow.Publisher} and Kotlin Coroutines {@code Deferred} and {@code Flow}. 051 * 052 * @author Rossen Stoyanchev 053 * @author Sebastien Deleuze 054 * @since 5.0 055 */ 056public class ReactiveAdapterRegistry { 057 058 @Nullable 059 private static volatile ReactiveAdapterRegistry sharedInstance; 060 061 private final boolean reactorPresent; 062 063 private final List<ReactiveAdapter> adapters = new ArrayList<>(); 064 065 066 /** 067 * Create a registry and auto-register default adapters. 068 * @see #getSharedInstance() 069 */ 070 public ReactiveAdapterRegistry() { 071 ClassLoader classLoader = ReactiveAdapterRegistry.class.getClassLoader(); 072 073 // Reactor 074 boolean reactorRegistered = false; 075 if (ClassUtils.isPresent("reactor.core.publisher.Flux", classLoader)) { 076 new ReactorRegistrar().registerAdapters(this); 077 reactorRegistered = true; 078 } 079 this.reactorPresent = reactorRegistered; 080 081 // RxJava1 082 if (ClassUtils.isPresent("rx.Observable", classLoader) && 083 ClassUtils.isPresent("rx.RxReactiveStreams", classLoader)) { 084 new RxJava1Registrar().registerAdapters(this); 085 } 086 087 // RxJava2 088 if (ClassUtils.isPresent("io.reactivex.Flowable", classLoader)) { 089 new RxJava2Registrar().registerAdapters(this); 090 } 091 092 // Java 9+ Flow.Publisher 093 if (ClassUtils.isPresent("java.util.concurrent.Flow.Publisher", classLoader)) { 094 new ReactorJdkFlowAdapterRegistrar().registerAdapter(this); 095 } 096 // If not present, do nothing for the time being... 097 // We can fall back on "reactive-streams-flow-bridge" (once released) 098 099 // Coroutines 100 if (this.reactorPresent && ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader)) { 101 new CoroutinesRegistrar().registerAdapters(this); 102 } 103 } 104 105 106 /** 107 * Whether the registry has any adapters which would be the case if any of 108 * Reactor, RxJava 2, or RxJava 1 (+ RxJava Reactive Streams bridge) are 109 * present on the classpath. 110 */ 111 public boolean hasAdapters() { 112 return !this.adapters.isEmpty(); 113 } 114 115 /** 116 * Register a reactive type along with functions to adapt to and from a 117 * Reactive Streams {@link Publisher}. The function arguments assume that 118 * their input is neither {@code null} nor {@link Optional}. 119 */ 120 public void registerReactiveType(ReactiveTypeDescriptor descriptor, 121 Function<Object, Publisher<?>> toAdapter, Function<Publisher<?>, Object> fromAdapter) { 122 123 if (this.reactorPresent) { 124 this.adapters.add(new ReactorAdapter(descriptor, toAdapter, fromAdapter)); 125 } 126 else { 127 this.adapters.add(new ReactiveAdapter(descriptor, toAdapter, fromAdapter)); 128 } 129 } 130 131 /** 132 * Get the adapter for the given reactive type. 133 * @return the corresponding adapter, or {@code null} if none available 134 */ 135 @Nullable 136 public ReactiveAdapter getAdapter(Class<?> reactiveType) { 137 return getAdapter(reactiveType, null); 138 } 139 140 /** 141 * Get the adapter for the given reactive type. Or if a "source" object is 142 * provided, its actual type is used instead. 143 * @param reactiveType the reactive type 144 * (may be {@code null} if a concrete source object is given) 145 * @param source an instance of the reactive type 146 * (i.e. to adapt from; may be {@code null} if the reactive type is specified) 147 * @return the corresponding adapter, or {@code null} if none available 148 */ 149 @Nullable 150 public ReactiveAdapter getAdapter(@Nullable Class<?> reactiveType, @Nullable Object source) { 151 if (this.adapters.isEmpty()) { 152 return null; 153 } 154 155 Object sourceToUse = (source instanceof Optional ? ((Optional<?>) source).orElse(null) : source); 156 Class<?> clazz = (sourceToUse != null ? sourceToUse.getClass() : reactiveType); 157 if (clazz == null) { 158 return null; 159 } 160 for (ReactiveAdapter adapter : this.adapters) { 161 if (adapter.getReactiveType() == clazz) { 162 return adapter; 163 } 164 } 165 for (ReactiveAdapter adapter : this.adapters) { 166 if (adapter.getReactiveType().isAssignableFrom(clazz)) { 167 return adapter; 168 } 169 } 170 return null; 171 } 172 173 174 /** 175 * Return a shared default {@code ReactiveAdapterRegistry} instance, 176 * lazily building it once needed. 177 * <p><b>NOTE:</b> We highly recommend passing a long-lived, pre-configured 178 * {@code ReactiveAdapterRegistry} instance for customization purposes. 179 * This accessor is only meant as a fallback for code paths that want to 180 * fall back on a default instance if one isn't provided. 181 * @return the shared {@code ReactiveAdapterRegistry} instance 182 * @since 5.0.2 183 */ 184 public static ReactiveAdapterRegistry getSharedInstance() { 185 ReactiveAdapterRegistry registry = sharedInstance; 186 if (registry == null) { 187 synchronized (ReactiveAdapterRegistry.class) { 188 registry = sharedInstance; 189 if (registry == null) { 190 registry = new ReactiveAdapterRegistry(); 191 sharedInstance = registry; 192 } 193 } 194 } 195 return registry; 196 } 197 198 199 private static class ReactorRegistrar { 200 201 void registerAdapters(ReactiveAdapterRegistry registry) { 202 // Register Flux and Mono before Publisher... 203 204 registry.registerReactiveType( 205 ReactiveTypeDescriptor.singleOptionalValue(Mono.class, Mono::empty), 206 source -> (Mono<?>) source, 207 Mono::from 208 ); 209 210 registry.registerReactiveType( 211 ReactiveTypeDescriptor.multiValue(Flux.class, Flux::empty), 212 source -> (Flux<?>) source, 213 Flux::from); 214 215 registry.registerReactiveType( 216 ReactiveTypeDescriptor.multiValue(Publisher.class, Flux::empty), 217 source -> (Publisher<?>) source, 218 source -> source); 219 220 registry.registerReactiveType( 221 ReactiveTypeDescriptor.nonDeferredAsyncValue(CompletionStage.class, EmptyCompletableFuture::new), 222 source -> Mono.fromCompletionStage((CompletionStage<?>) source), 223 source -> Mono.from(source).toFuture() 224 ); 225 } 226 } 227 228 229 private static class RxJava1Registrar { 230 231 void registerAdapters(ReactiveAdapterRegistry registry) { 232 registry.registerReactiveType( 233 ReactiveTypeDescriptor.multiValue(rx.Observable.class, rx.Observable::empty), 234 source -> RxReactiveStreams.toPublisher((rx.Observable<?>) source), 235 RxReactiveStreams::toObservable 236 ); 237 registry.registerReactiveType( 238 ReactiveTypeDescriptor.singleRequiredValue(rx.Single.class), 239 source -> RxReactiveStreams.toPublisher((rx.Single<?>) source), 240 RxReactiveStreams::toSingle 241 ); 242 registry.registerReactiveType( 243 ReactiveTypeDescriptor.noValue(rx.Completable.class, rx.Completable::complete), 244 source -> RxReactiveStreams.toPublisher((rx.Completable) source), 245 RxReactiveStreams::toCompletable 246 ); 247 } 248 } 249 250 251 private static class RxJava2Registrar { 252 253 void registerAdapters(ReactiveAdapterRegistry registry) { 254 registry.registerReactiveType( 255 ReactiveTypeDescriptor.multiValue(io.reactivex.Flowable.class, io.reactivex.Flowable::empty), 256 source -> (io.reactivex.Flowable<?>) source, 257 Flowable::fromPublisher 258 ); 259 registry.registerReactiveType( 260 ReactiveTypeDescriptor.multiValue(io.reactivex.Observable.class, io.reactivex.Observable::empty), 261 source -> ((io.reactivex.Observable<?>) source).toFlowable(BackpressureStrategy.BUFFER), 262 source -> io.reactivex.Flowable.fromPublisher(source).toObservable() 263 ); 264 registry.registerReactiveType( 265 ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class), 266 source -> ((io.reactivex.Single<?>) source).toFlowable(), 267 source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement().toSingle() 268 ); 269 registry.registerReactiveType( 270 ReactiveTypeDescriptor.singleOptionalValue(io.reactivex.Maybe.class, io.reactivex.Maybe::empty), 271 source -> ((io.reactivex.Maybe<?>) source).toFlowable(), 272 source -> io.reactivex.Flowable.fromPublisher(source).toObservable().singleElement() 273 ); 274 registry.registerReactiveType( 275 ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class, io.reactivex.Completable::complete), 276 source -> ((io.reactivex.Completable) source).toFlowable(), 277 source -> io.reactivex.Flowable.fromPublisher(source).toObservable().ignoreElements() 278 ); 279 } 280 } 281 282 283 private static class ReactorJdkFlowAdapterRegistrar { 284 285 void registerAdapter(ReactiveAdapterRegistry registry) { 286 // TODO: remove reflection when build requires JDK 9+ 287 288 try { 289 String publisherName = "java.util.concurrent.Flow.Publisher"; 290 Class<?> publisherClass = ClassUtils.forName(publisherName, getClass().getClassLoader()); 291 292 String adapterName = "reactor.adapter.JdkFlowAdapter"; 293 Class<?> flowAdapterClass = ClassUtils.forName(adapterName, getClass().getClassLoader()); 294 295 Method toFluxMethod = flowAdapterClass.getMethod("flowPublisherToFlux", publisherClass); 296 Method toFlowMethod = flowAdapterClass.getMethod("publisherToFlowPublisher", Publisher.class); 297 Object emptyFlow = ReflectionUtils.invokeMethod(toFlowMethod, null, Flux.empty()); 298 299 registry.registerReactiveType( 300 ReactiveTypeDescriptor.multiValue(publisherClass, () -> emptyFlow), 301 source -> (Publisher<?>) ReflectionUtils.invokeMethod(toFluxMethod, null, source), 302 publisher -> ReflectionUtils.invokeMethod(toFlowMethod, null, publisher) 303 ); 304 } 305 catch (Throwable ex) { 306 // Ignore 307 } 308 } 309 } 310 311 312 /** 313 * ReactiveAdapter variant that wraps adapted Publishers as {@link Flux} or 314 * {@link Mono} depending on {@link ReactiveTypeDescriptor#isMultiValue()}. 315 * This is important in places where only the stream and stream element type 316 * information is available like encoders and decoders. 317 */ 318 private static class ReactorAdapter extends ReactiveAdapter { 319 320 ReactorAdapter(ReactiveTypeDescriptor descriptor, 321 Function<Object, Publisher<?>> toPublisherFunction, 322 Function<Publisher<?>, Object> fromPublisherFunction) { 323 324 super(descriptor, toPublisherFunction, fromPublisherFunction); 325 } 326 327 @Override 328 public <T> Publisher<T> toPublisher(@Nullable Object source) { 329 Publisher<T> publisher = super.toPublisher(source); 330 return (isMultiValue() ? Flux.from(publisher) : Mono.from(publisher)); 331 } 332 } 333 334 335 private static class EmptyCompletableFuture<T> extends CompletableFuture<T> { 336 337 EmptyCompletableFuture() { 338 complete(null); 339 } 340 } 341 342 343 private static class CoroutinesRegistrar { 344 345 @SuppressWarnings("KotlinInternalInJava") 346 void registerAdapters(ReactiveAdapterRegistry registry) { 347 registry.registerReactiveType( 348 ReactiveTypeDescriptor.singleOptionalValue(Deferred.class, 349 () -> CompletableDeferredKt.CompletableDeferred(null)), 350 source -> CoroutinesUtils.deferredToMono((Deferred<?>) source), 351 source -> CoroutinesUtils.monoToDeferred(Mono.from(source))); 352 353 registry.registerReactiveType( 354 ReactiveTypeDescriptor.multiValue(kotlinx.coroutines.flow.Flow.class, kotlinx.coroutines.flow.FlowKt::emptyFlow), 355 source -> kotlinx.coroutines.reactor.ReactorFlowKt.asFlux((kotlinx.coroutines.flow.Flow<?>) source), 356 kotlinx.coroutines.reactive.ReactiveFlowKt::asFlow 357 ); 358 } 359 } 360 361 362 /** 363 * {@code BlockHoundIntegration} for spring-core classes. 364 * <p>Explicitly allow the following: 365 * <ul> 366 * <li>Reading class info via {@link LocalVariableTableParameterNameDiscoverer}. 367 * <li>Locking within {@link ConcurrentReferenceHashMap}. 368 * </ul> 369 * @since 5.2.4 370 */ 371 public static class SpringCoreBlockHoundIntegration implements BlockHoundIntegration { 372 373 @Override 374 public void applyTo(BlockHound.Builder builder) { 375 376 // Avoid hard references potentially anywhere in spring-core (no need for structural dependency) 377 378 builder.allowBlockingCallsInside( 379 "org.springframework.core.LocalVariableTableParameterNameDiscoverer", "inspectClass"); 380 381 String className = "org.springframework.util.ConcurrentReferenceHashMap$Segment"; 382 builder.allowBlockingCallsInside(className, "doTask"); 383 builder.allowBlockingCallsInside(className, "clear"); 384 builder.allowBlockingCallsInside(className, "restructure"); 385 } 386 } 387 388}