001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.transaction.reactive; 018 019import java.util.ArrayDeque; 020import java.util.function.Function; 021 022import reactor.core.publisher.Flux; 023import reactor.core.publisher.Mono; 024import reactor.util.context.Context; 025 026import org.springframework.transaction.NoTransactionException; 027 028/** 029 * Delegate to register and obtain transactional contexts. 030 * 031 * <p>Typically used by components that intercept or orchestrate transactional flows 032 * such as AOP interceptors or transactional operators. 033 * 034 * @author Mark Paluch 035 * @since 5.2 036 * @see TransactionSynchronization 037 */ 038public abstract class TransactionContextManager { 039 040 private TransactionContextManager() { 041 } 042 043 044 /** 045 * Obtain the current {@link TransactionContext} from the subscriber context or the 046 * transactional context holder. Context retrieval fails with NoTransactionException 047 * if no context or context holder is registered. 048 * @return the current {@link TransactionContext} 049 * @throws NoTransactionException if no TransactionContext was found in the subscriber context 050 * or no context found in a holder 051 */ 052 public static Mono<TransactionContext> currentContext() throws NoTransactionException { 053 return Mono.subscriberContext().handle((ctx, sink) -> { 054 if (ctx.hasKey(TransactionContext.class)) { 055 sink.next(ctx.get(TransactionContext.class)); 056 return; 057 } 058 if (ctx.hasKey(TransactionContextHolder.class)) { 059 TransactionContextHolder holder = ctx.get(TransactionContextHolder.class); 060 if (holder.hasContext()) { 061 sink.next(holder.currentContext()); 062 return; 063 } 064 } 065 sink.error(new NoTransactionInContextException()); 066 }); 067 } 068 069 /** 070 * Create a {@link TransactionContext} and register it in the subscriber {@link Context}. 071 * @return functional context registration. 072 * @throws IllegalStateException if a transaction context is already associated. 073 * @see Mono#subscriberContext(Function) 074 * @see Flux#subscriberContext(Function) 075 */ 076 public static Function<Context, Context> createTransactionContext() { 077 return context -> context.put(TransactionContext.class, new TransactionContext()); 078 } 079 080 /** 081 * Return a {@link Function} to create or associate a new {@link TransactionContext}. 082 * Interaction with transactional resources through 083 * {@link TransactionSynchronizationManager} requires a TransactionContext 084 * to be registered in the subscriber context. 085 * @return functional context registration. 086 */ 087 public static Function<Context, Context> getOrCreateContext() { 088 return context -> { 089 TransactionContextHolder holder = context.get(TransactionContextHolder.class); 090 if (holder.hasContext()) { 091 return context.put(TransactionContext.class, holder.currentContext()); 092 } 093 return context.put(TransactionContext.class, holder.createContext()); 094 }; 095 } 096 097 /** 098 * Return a {@link Function} to create or associate a new 099 * {@link TransactionContextHolder}. Creation and release of transactions 100 * within a reactive flow requires a mutable holder that follows a top to 101 * down execution scheme. Reactor's subscriber context follows a down to top 102 * approach regarding mutation visibility. 103 * @return functional context registration. 104 */ 105 public static Function<Context, Context> getOrCreateContextHolder() { 106 return context -> { 107 if (!context.hasKey(TransactionContextHolder.class)) { 108 return context.put(TransactionContextHolder.class, new TransactionContextHolder(new ArrayDeque<>())); 109 } 110 return context; 111 }; 112 } 113 114 115 /** 116 * Stackless variant of {@link NoTransactionException} for reactive flows. 117 */ 118 @SuppressWarnings("serial") 119 private static class NoTransactionInContextException extends NoTransactionException { 120 121 public NoTransactionInContextException() { 122 super("No transaction in context"); 123 } 124 125 @Override 126 public synchronized Throwable fillInStackTrace() { 127 // stackless exception 128 return this; 129 } 130 } 131 132}