001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.transaction.reactive;
018
019import java.util.ArrayDeque;
020import java.util.function.Function;
021
022import reactor.core.publisher.Flux;
023import reactor.core.publisher.Mono;
024import reactor.util.context.Context;
025
026import org.springframework.transaction.NoTransactionException;
027
028/**
029 * Delegate to register and obtain transactional contexts.
030 *
031 * <p>Typically used by components that intercept or orchestrate transactional flows
032 * such as AOP interceptors or transactional operators.
033 *
034 * @author Mark Paluch
035 * @since 5.2
036 * @see TransactionSynchronization
037 */
038public abstract class TransactionContextManager {
039
040        private TransactionContextManager() {
041        }
042
043
044        /**
045         * Obtain the current {@link TransactionContext} from the subscriber context or the
046         * transactional context holder. Context retrieval fails with NoTransactionException
047         * if no context or context holder is registered.
048         * @return the current {@link TransactionContext}
049         * @throws NoTransactionException if no TransactionContext was found in the subscriber context
050         * or no context found in a holder
051         */
052        public static Mono<TransactionContext> currentContext() throws NoTransactionException {
053                return Mono.subscriberContext().handle((ctx, sink) -> {
054                        if (ctx.hasKey(TransactionContext.class)) {
055                                sink.next(ctx.get(TransactionContext.class));
056                                return;
057                        }
058                        if (ctx.hasKey(TransactionContextHolder.class)) {
059                                TransactionContextHolder holder = ctx.get(TransactionContextHolder.class);
060                                if (holder.hasContext()) {
061                                        sink.next(holder.currentContext());
062                                        return;
063                                }
064                        }
065                        sink.error(new NoTransactionInContextException());
066                });
067        }
068
069        /**
070         * Create a {@link TransactionContext} and register it in the subscriber {@link Context}.
071         * @return functional context registration.
072         * @throws IllegalStateException if a transaction context is already associated.
073         * @see Mono#subscriberContext(Function)
074         * @see Flux#subscriberContext(Function)
075         */
076        public static Function<Context, Context> createTransactionContext() {
077                return context -> context.put(TransactionContext.class, new TransactionContext());
078        }
079
080        /**
081         * Return a {@link Function} to create or associate a new {@link TransactionContext}.
082         * Interaction with transactional resources through
083         * {@link TransactionSynchronizationManager} requires a TransactionContext
084         * to be registered in the subscriber context.
085         * @return functional context registration.
086         */
087        public static Function<Context, Context> getOrCreateContext() {
088                return context -> {
089                        TransactionContextHolder holder = context.get(TransactionContextHolder.class);
090                        if (holder.hasContext()) {
091                                return context.put(TransactionContext.class, holder.currentContext());
092                        }
093                        return context.put(TransactionContext.class, holder.createContext());
094                };
095        }
096
097        /**
098         * Return a {@link Function} to create or associate a new
099         * {@link TransactionContextHolder}. Creation and release of transactions
100         * within a reactive flow requires a mutable holder that follows a top to
101         * down execution scheme. Reactor's subscriber context follows a down to top
102         * approach regarding mutation visibility.
103         * @return functional context registration.
104         */
105        public static Function<Context, Context> getOrCreateContextHolder() {
106                return context -> {
107                        if (!context.hasKey(TransactionContextHolder.class)) {
108                                return context.put(TransactionContextHolder.class, new TransactionContextHolder(new ArrayDeque<>()));
109                        }
110                        return context;
111                };
112        }
113
114
115        /**
116         * Stackless variant of {@link NoTransactionException} for reactive flows.
117         */
118        @SuppressWarnings("serial")
119        private static class NoTransactionInContextException extends NoTransactionException {
120
121                public NoTransactionInContextException() {
122                        super("No transaction in context");
123                }
124
125                @Override
126                public synchronized Throwable fillInStackTrace() {
127                        // stackless exception
128                        return this;
129                }
130        }
131
132}