001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.transaction.reactive;
018
019import java.io.IOException;
020import java.io.ObjectInputStream;
021import java.io.Serializable;
022import java.util.List;
023import java.util.Optional;
024import java.util.concurrent.atomic.AtomicBoolean;
025import java.util.function.Predicate;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import reactor.core.publisher.Flux;
030import reactor.core.publisher.Mono;
031
032import org.springframework.lang.Nullable;
033import org.springframework.transaction.IllegalTransactionStateException;
034import org.springframework.transaction.InvalidTimeoutException;
035import org.springframework.transaction.ReactiveTransaction;
036import org.springframework.transaction.ReactiveTransactionManager;
037import org.springframework.transaction.TransactionDefinition;
038import org.springframework.transaction.TransactionException;
039import org.springframework.transaction.TransactionSuspensionNotSupportedException;
040import org.springframework.transaction.UnexpectedRollbackException;
041
042/**
043 * Abstract base class that implements Spring's standard reactive transaction workflow,
044 * serving as basis for concrete platform transaction managers.
045 *
046 * <p>This base class provides the following workflow handling:
047 * <ul>
048 * <li>determines if there is an existing transaction;
049 * <li>applies the appropriate propagation behavior;
050 * <li>suspends and resumes transactions if necessary;
051 * <li>checks the rollback-only flag on commit;
052 * <li>applies the appropriate modification on rollback
053 * (actual rollback or setting rollback-only);
054 * <li>triggers registered synchronization callbacks.
055 * </ul>
056 *
057 * <p>Subclasses have to implement specific template methods for specific
058 * states of a transaction, e.g.: begin, suspend, resume, commit, rollback.
059 * The most important of them are abstract and must be provided by a concrete
060 * implementation; for the rest, defaults are provided, so overriding is optional.
061 *
062 * <p>Transaction synchronization is a generic mechanism for registering callbacks
063 * that get invoked at transaction completion time. This is mainly used internally
064 * by the data access support classes for R2DBC, MongoDB, etc. The same mechanism can
065 * also be leveraged for custom synchronization needs in an application.
066 *
067 * <p>The state of this class is serializable, to allow for serializing the
068 * transaction strategy along with proxies that carry a transaction interceptor.
069 * It is up to subclasses if they wish to make their state to be serializable too.
070 * They should implement the {@code java.io.Serializable} marker interface in
071 * that case, and potentially a private {@code readObject()} method (according
072 * to Java serialization rules) if they need to restore any transient state.
073 *
074 * @author Mark Paluch
075 * @author Juergen Hoeller
076 * @since 5.2
077 * @see TransactionSynchronizationManager
078 */
079@SuppressWarnings("serial")
080public abstract class AbstractReactiveTransactionManager implements ReactiveTransactionManager, Serializable {
081
082        protected transient Log logger = LogFactory.getLog(getClass());
083
084
085        //---------------------------------------------------------------------
086        // Implementation of ReactiveTransactionManager
087        //---------------------------------------------------------------------
088
089        /**
090         * This implementation handles propagation behavior. Delegates to
091         * {@code doGetTransaction}, {@code isExistingTransaction}
092         * and {@code doBegin}.
093         * @see #doGetTransaction
094         * @see #isExistingTransaction
095         * @see #doBegin
096         */
097        @Override
098        public final Mono<ReactiveTransaction> getReactiveTransaction(@Nullable TransactionDefinition definition)
099                        throws TransactionException {
100
101                // Use defaults if no transaction definition given.
102                TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
103
104                return TransactionSynchronizationManager.forCurrentTransaction()
105                                .flatMap(synchronizationManager -> {
106
107                        Object transaction = doGetTransaction(synchronizationManager);
108
109                        // Cache debug flag to avoid repeated checks.
110                        boolean debugEnabled = logger.isDebugEnabled();
111
112                        if (isExistingTransaction(transaction)) {
113                                // Existing transaction found -> check propagation behavior to find out how to behave.
114                                return handleExistingTransaction(synchronizationManager, def, transaction, debugEnabled);
115                        }
116
117                        // Check definition settings for new transaction.
118                        if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
119                                return Mono.error(new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout()));
120                        }
121
122                        // No existing transaction found -> check propagation behavior to find out how to proceed.
123                        if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
124                                return Mono.error(new IllegalTransactionStateException(
125                                                "No existing transaction found for transaction marked with propagation 'mandatory'"));
126                        }
127                        else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
128                                        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
129                                        def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
130
131                                return TransactionContextManager.currentContext()
132                                                .map(TransactionSynchronizationManager::new)
133                                                .flatMap(nestedSynchronizationManager ->
134                                                                suspend(nestedSynchronizationManager, null)
135                                                                .map(Optional::of)
136                                                                .defaultIfEmpty(Optional.empty())
137                                                                .flatMap(suspendedResources -> {
138                                                        if (debugEnabled) {
139                                                                logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
140                                                        }
141                                                        return Mono.defer(() -> {
142                                                                GenericReactiveTransaction status = newReactiveTransaction(
143                                                                                nestedSynchronizationManager, def, transaction, true,
144                                                                                debugEnabled, suspendedResources.orElse(null));
145                                                                return doBegin(nestedSynchronizationManager, transaction, def)
146                                                                                .doOnSuccess(ignore -> prepareSynchronization(nestedSynchronizationManager, status, def))
147                                                                                .thenReturn(status);
148                                                        }).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR,
149                                                                        ex -> resume(nestedSynchronizationManager, null, suspendedResources.orElse(null))
150                                                                        .then(Mono.error(ex)));
151                                                }));
152                        }
153                        else {
154                                // Create "empty" transaction: no actual transaction, but potentially synchronization.
155                                if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
156                                        logger.warn("Custom isolation level specified but no actual transaction initiated; " +
157                                                        "isolation level will effectively be ignored: " + def);
158                                }
159                                return Mono.just(prepareReactiveTransaction(synchronizationManager, def, null, true, debugEnabled, null));
160                        }
161                });
162        }
163
164        /**
165         * Create a ReactiveTransaction for an existing transaction.
166         */
167        private Mono<ReactiveTransaction> handleExistingTransaction(TransactionSynchronizationManager synchronizationManager,
168                        TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {
169
170                if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
171                        return Mono.error(new IllegalTransactionStateException(
172                                        "Existing transaction found for transaction marked with propagation 'never'"));
173                }
174
175                if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
176                        if (debugEnabled) {
177                                logger.debug("Suspending current transaction");
178                        }
179                        Mono<SuspendedResourcesHolder> suspend = suspend(synchronizationManager, transaction);
180                        return suspend.map(suspendedResources -> prepareReactiveTransaction(synchronizationManager,
181                                        definition, null, false, debugEnabled, suspendedResources)) //
182                                        .switchIfEmpty(Mono.fromSupplier(() -> prepareReactiveTransaction(synchronizationManager,
183                                                        definition, null, false, debugEnabled, null)))
184                                        .cast(ReactiveTransaction.class);
185                }
186
187                if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
188                        if (debugEnabled) {
189                                logger.debug("Suspending current transaction, creating new transaction with name [" +
190                                                definition.getName() + "]");
191                        }
192                        Mono<SuspendedResourcesHolder> suspendedResources = suspend(synchronizationManager, transaction);
193                        return suspendedResources.flatMap(suspendedResourcesHolder -> {
194                                GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager,
195                                                definition, transaction, true, debugEnabled, suspendedResourcesHolder);
196                                return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore ->
197                                                prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status)
198                                                .onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, beginEx ->
199                                                                resumeAfterBeginException(synchronizationManager, transaction, suspendedResourcesHolder, beginEx)
200                                                                                .then(Mono.error(beginEx)));
201                        });
202                }
203
204                if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
205                        if (debugEnabled) {
206                                logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
207                        }
208                        // Nested transaction through nested begin and commit/rollback calls.
209                        GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager,
210                                        definition, transaction, true, debugEnabled, null);
211                        return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore ->
212                                        prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status);
213                }
214
215                // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
216                if (debugEnabled) {
217                        logger.debug("Participating in existing transaction");
218                }
219                return Mono.just(prepareReactiveTransaction(synchronizationManager, definition, transaction, false, debugEnabled, null));
220        }
221
222        /**
223         * Create a new ReactiveTransaction for the given arguments,
224         * also initializing transaction synchronization as appropriate.
225         * @see #newReactiveTransaction
226         * @see #prepareReactiveTransaction
227         */
228        private GenericReactiveTransaction prepareReactiveTransaction(
229                        TransactionSynchronizationManager synchronizationManager, TransactionDefinition definition,
230                        @Nullable Object transaction, boolean newTransaction, boolean debug, @Nullable Object suspendedResources) {
231
232                GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager,
233                                definition, transaction, newTransaction, debug, suspendedResources);
234                prepareSynchronization(synchronizationManager, status, definition);
235                return status;
236        }
237
238        /**
239         * Create a ReactiveTransaction instance for the given arguments.
240         */
241        private GenericReactiveTransaction newReactiveTransaction(
242                        TransactionSynchronizationManager synchronizationManager, TransactionDefinition definition,
243                        @Nullable Object transaction, boolean newTransaction, boolean debug, @Nullable Object suspendedResources) {
244
245                return new GenericReactiveTransaction(transaction, newTransaction,
246                                !synchronizationManager.isSynchronizationActive(),
247                                definition.isReadOnly(), debug, suspendedResources);
248        }
249
250        /**
251         * Initialize transaction synchronization as appropriate.
252         */
253        private void prepareSynchronization(TransactionSynchronizationManager synchronizationManager,
254                        GenericReactiveTransaction status, TransactionDefinition definition) {
255
256                if (status.isNewSynchronization()) {
257                        synchronizationManager.setActualTransactionActive(status.hasTransaction());
258                        synchronizationManager.setCurrentTransactionIsolationLevel(
259                                        definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
260                                                        definition.getIsolationLevel() : null);
261                        synchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
262                        synchronizationManager.setCurrentTransactionName(definition.getName());
263                        synchronizationManager.initSynchronization();
264                }
265        }
266
267
268        /**
269         * Suspend the given transaction. Suspends transaction synchronization first,
270         * then delegates to the {@code doSuspend} template method.
271         * @param synchronizationManager the synchronization manager bound to the current transaction
272         * @param transaction the current transaction object
273         * (or {@code null} to just suspend active synchronizations, if any)
274         * @return an object that holds suspended resources
275         * (or {@code null} if neither transaction nor synchronization active)
276         * @see #doSuspend
277         * @see #resume
278         */
279        private Mono<SuspendedResourcesHolder> suspend(TransactionSynchronizationManager synchronizationManager,
280                        @Nullable Object transaction) throws TransactionException {
281
282                if (synchronizationManager.isSynchronizationActive()) {
283                        Mono<List<TransactionSynchronization>> suspendedSynchronizations = doSuspendSynchronization(synchronizationManager);
284                        return suspendedSynchronizations.flatMap(synchronizations -> {
285                                Mono<Optional<Object>> suspendedResources = (transaction != null ?
286                                                doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty()) :
287                                                Mono.just(Optional.empty()));
288                                return suspendedResources.map(it -> {
289                                        String name = synchronizationManager.getCurrentTransactionName();
290                                        synchronizationManager.setCurrentTransactionName(null);
291                                        boolean readOnly = synchronizationManager.isCurrentTransactionReadOnly();
292                                        synchronizationManager.setCurrentTransactionReadOnly(false);
293                                        Integer isolationLevel = synchronizationManager.getCurrentTransactionIsolationLevel();
294                                        synchronizationManager.setCurrentTransactionIsolationLevel(null);
295                                        boolean wasActive = synchronizationManager.isActualTransactionActive();
296                                        synchronizationManager.setActualTransactionActive(false);
297                                        return new SuspendedResourcesHolder(
298                                                        it.orElse(null), synchronizations, name, readOnly, isolationLevel, wasActive);
299                                }).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR,
300                                                ex -> doResumeSynchronization(synchronizationManager, synchronizations)
301                                                                .cast(SuspendedResourcesHolder.class));
302                        });
303                }
304                else if (transaction != null) {
305                        // Transaction active but no synchronization active.
306                        Mono<Optional<Object>> suspendedResources =
307                                        doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty());
308                        return suspendedResources.map(it -> new SuspendedResourcesHolder(it.orElse(null)));
309                }
310                else {
311                        // Neither transaction nor synchronization active.
312                        return Mono.empty();
313                }
314        }
315
316        /**
317         * Resume the given transaction. Delegates to the {@code doResume}
318         * template method first, then resuming transaction synchronization.
319         * @param synchronizationManager the synchronization manager bound to the current transaction
320         * @param transaction the current transaction object
321         * @param resourcesHolder the object that holds suspended resources,
322         * as returned by {@code suspend} (or {@code null} to just
323         * resume synchronizations, if any)
324         * @see #doResume
325         * @see #suspend
326         */
327        private Mono<Void> resume(TransactionSynchronizationManager synchronizationManager,
328                        @Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
329                        throws TransactionException {
330
331                Mono<Void> resume = Mono.empty();
332
333                if (resourcesHolder != null) {
334                        Object suspendedResources = resourcesHolder.suspendedResources;
335                        if (suspendedResources != null) {
336                                resume =  doResume(synchronizationManager, transaction, suspendedResources);
337                        }
338                        List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
339                        if (suspendedSynchronizations != null) {
340                                synchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
341                                synchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
342                                synchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
343                                synchronizationManager.setCurrentTransactionName(resourcesHolder.name);
344                                return resume.then(doResumeSynchronization(synchronizationManager, suspendedSynchronizations));
345                        }
346                }
347
348                return resume;
349        }
350
351        /**
352         * Resume outer transaction after inner transaction begin failed.
353         */
354        private Mono<Void> resumeAfterBeginException(TransactionSynchronizationManager synchronizationManager,
355                        Object transaction, @Nullable SuspendedResourcesHolder suspendedResources, Throwable beginEx) {
356
357                String exMessage = "Inner transaction begin exception overridden by outer transaction resume exception";
358                return resume(synchronizationManager, transaction, suspendedResources).doOnError(ErrorPredicates.RUNTIME_OR_ERROR,
359                                ex -> logger.error(exMessage, beginEx));
360        }
361
362        /**
363         * Suspend all current synchronizations and deactivate transaction
364         * synchronization for the current transaction context.
365         * @param synchronizationManager the synchronization manager bound to the current transaction
366         * @return the List of suspended TransactionSynchronization objects
367         */
368        private Mono<List<TransactionSynchronization>> doSuspendSynchronization(
369                        TransactionSynchronizationManager synchronizationManager) {
370
371                List<TransactionSynchronization> suspendedSynchronizations = synchronizationManager.getSynchronizations();
372                return Flux.fromIterable(suspendedSynchronizations)
373                                .concatMap(TransactionSynchronization::suspend)
374                                .then(Mono.defer(() -> {
375                                        synchronizationManager.clearSynchronization();
376                                        return Mono.just(suspendedSynchronizations);
377                                }));
378        }
379
380        /**
381         * Reactivate transaction synchronization for the current transaction context
382         * and resume all given synchronizations.
383         * @param synchronizationManager the synchronization manager bound to the current transaction
384         * @param suspendedSynchronizations a List of TransactionSynchronization objects
385         */
386        private Mono<Void> doResumeSynchronization(TransactionSynchronizationManager synchronizationManager,
387                        List<TransactionSynchronization> suspendedSynchronizations) {
388
389                synchronizationManager.initSynchronization();
390                return Flux.fromIterable(suspendedSynchronizations)
391                                .concatMap(synchronization -> synchronization.resume()
392                                                .doOnSuccess(ignore -> synchronizationManager.registerSynchronization(synchronization))).then();
393        }
394
395
396        /**
397         * This implementation of commit handles participating in existing
398         * transactions and programmatic rollback requests.
399         * Delegates to {@code isRollbackOnly}, {@code doCommit}
400         * and {@code rollback}.
401         * @see ReactiveTransaction#isRollbackOnly()
402         * @see #doCommit
403         * @see #rollback
404         */
405        @Override
406        public final Mono<Void> commit(ReactiveTransaction transaction) throws TransactionException {
407                if (transaction.isCompleted()) {
408                        return Mono.error(new IllegalTransactionStateException(
409                                        "Transaction is already completed - do not call commit or rollback more than once per transaction"));
410                }
411
412                return TransactionSynchronizationManager.forCurrentTransaction().flatMap(synchronizationManager -> {
413                        GenericReactiveTransaction reactiveTx = (GenericReactiveTransaction) transaction;
414                        if (reactiveTx.isRollbackOnly()) {
415                                if (reactiveTx.isDebug()) {
416                                        logger.debug("Transactional code has requested rollback");
417                                }
418                                return processRollback(synchronizationManager, reactiveTx);
419                        }
420                        return processCommit(synchronizationManager, reactiveTx);
421                });
422        }
423
424        /**
425         * Process an actual commit.
426         * Rollback-only flags have already been checked and applied.
427         * @param synchronizationManager the synchronization manager bound to the current transaction
428         * @param status object representing the transaction
429         * @throws TransactionException in case of commit failure
430         */
431        private Mono<Void> processCommit(TransactionSynchronizationManager synchronizationManager,
432                        GenericReactiveTransaction status) throws TransactionException {
433
434                AtomicBoolean beforeCompletionInvoked = new AtomicBoolean(false);
435
436                Mono<Object> commit = prepareForCommit(synchronizationManager, status)
437                                .then(triggerBeforeCommit(synchronizationManager, status))
438                                .then(triggerBeforeCompletion(synchronizationManager, status))
439                                .then(Mono.defer(() -> {
440                                        beforeCompletionInvoked.set(true);
441                                        if (status.isNewTransaction()) {
442                                                if (status.isDebug()) {
443                                                        logger.debug("Initiating transaction commit");
444                                                }
445                                                return doCommit(synchronizationManager, status);
446                                        }
447                                        return Mono.empty();
448                                })).then(Mono.empty().onErrorResume(ex -> {
449                                        Mono<Object> propagateException = Mono.error(ex);
450                                        // Store result in a local variable in order to appease the
451                                        // Eclipse compiler with regard to inferred generics.
452                                        Mono<Object> result = propagateException;
453                                        if (ErrorPredicates.UNEXPECTED_ROLLBACK.test(ex)) {
454                                                result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK)
455                                                                .then(propagateException);
456                                        }
457                                        else if (ErrorPredicates.TRANSACTION_EXCEPTION.test(ex)) {
458                                                result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN)
459                                                                .then(propagateException);
460                                        }
461                                        else if (ErrorPredicates.RUNTIME_OR_ERROR.test(ex)) {
462                                                Mono<Void> mono;
463                                                if (!beforeCompletionInvoked.get()) {
464                                                        mono = triggerBeforeCompletion(synchronizationManager, status);
465                                                }
466                                                else {
467                                                        mono = Mono.empty();
468                                                }
469                                                result = mono.then(doRollbackOnCommitException(synchronizationManager, status, ex))
470                                                                .then(propagateException);
471                                        }
472
473                                        return result;
474                                })).then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex ->
475                                                triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex)))
476                                                .then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED))));
477
478                return commit
479                                .onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status)
480                                                .then(Mono.error(ex))).then(cleanupAfterCompletion(synchronizationManager, status));
481        }
482
483        /**
484         * This implementation of rollback handles participating in existing transactions.
485         * Delegates to {@code doRollback} and {@code doSetRollbackOnly}.
486         * @see #doRollback
487         * @see #doSetRollbackOnly
488         */
489        @Override
490        public final Mono<Void> rollback(ReactiveTransaction transaction) throws TransactionException {
491                if (transaction.isCompleted()) {
492                        return Mono.error(new IllegalTransactionStateException(
493                                        "Transaction is already completed - do not call commit or rollback more than once per transaction"));
494                }
495                return TransactionSynchronizationManager.forCurrentTransaction().flatMap(synchronizationManager -> {
496                        GenericReactiveTransaction reactiveTx = (GenericReactiveTransaction) transaction;
497                        return processRollback(synchronizationManager, reactiveTx);
498                });
499        }
500
501        /**
502         * Process an actual rollback.
503         * The completed flag has already been checked.
504         * @param synchronizationManager the synchronization manager bound to the current transaction
505         * @param status object representing the transaction
506         * @throws TransactionException in case of rollback failure
507         */
508        private Mono<Void> processRollback(TransactionSynchronizationManager synchronizationManager,
509                        GenericReactiveTransaction status) {
510
511                return triggerBeforeCompletion(synchronizationManager, status).then(Mono.defer(() -> {
512                        if (status.isNewTransaction()) {
513                                if (status.isDebug()) {
514                                        logger.debug("Initiating transaction rollback");
515                                }
516                                return doRollback(synchronizationManager, status);
517                        }
518                        else {
519                                Mono<Void> beforeCompletion = Mono.empty();
520                                // Participating in larger transaction
521                                if (status.hasTransaction()) {
522                                        if (status.isDebug()) {
523                                                logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
524                                        }
525                                        beforeCompletion = doSetRollbackOnly(synchronizationManager, status);
526                                }
527                                else {
528                                        logger.debug("Should roll back transaction but cannot - no transaction available");
529                                }
530                                return beforeCompletion;
531                        }
532                })).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, ex -> triggerAfterCompletion(
533                                synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN)
534                                .then(Mono.error(ex)))
535                                .then(Mono.defer(() -> triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK)))
536                                .onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status).then(Mono.error(ex)))
537                        .then(cleanupAfterCompletion(synchronizationManager, status));
538        }
539
540        /**
541         * Invoke {@code doRollback}, handling rollback exceptions properly.
542         * @param synchronizationManager the synchronization manager bound to the current transaction
543         * @param status object representing the transaction
544         * @param ex the thrown application exception or error
545         * @throws TransactionException in case of rollback failure
546         * @see #doRollback
547         */
548        private Mono<Void> doRollbackOnCommitException(TransactionSynchronizationManager synchronizationManager,
549                        GenericReactiveTransaction status, Throwable ex) throws TransactionException {
550
551                return Mono.defer(() -> {
552                        if (status.isNewTransaction()) {
553                                if (status.isDebug()) {
554                                        logger.debug("Initiating transaction rollback after commit exception", ex);
555                                }
556                                return doRollback(synchronizationManager, status);
557                        }
558                        else if (status.hasTransaction()) {
559                                if (status.isDebug()) {
560                                        logger.debug("Marking existing transaction as rollback-only after commit exception", ex);
561                                }
562                                return doSetRollbackOnly(synchronizationManager, status);
563                        }
564                        return Mono.empty();
565                }).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, rbex -> {
566                        logger.error("Commit exception overridden by rollback exception", ex);
567                        return triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN)
568                                .then(Mono.error(rbex));
569                }).then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK));
570        }
571
572
573        /**
574         * Trigger {@code beforeCommit} callbacks.
575         * @param synchronizationManager the synchronization manager bound to the current transaction
576         * @param status object representing the transaction
577         */
578        private Mono<Void> triggerBeforeCommit(TransactionSynchronizationManager synchronizationManager,
579                        GenericReactiveTransaction status) {
580
581                if (status.isNewSynchronization()) {
582                        if (status.isDebug()) {
583                                logger.trace("Triggering beforeCommit synchronization");
584                        }
585                        return TransactionSynchronizationUtils.triggerBeforeCommit(
586                                        synchronizationManager.getSynchronizations(), status.isReadOnly());
587                }
588                return Mono.empty();
589        }
590
591        /**
592         * Trigger {@code beforeCompletion} callbacks.
593         * @param synchronizationManager the synchronization manager bound to the current transaction
594         * @param status object representing the transaction
595         */
596        private Mono<Void> triggerBeforeCompletion(TransactionSynchronizationManager synchronizationManager,
597                        GenericReactiveTransaction status) {
598
599                if (status.isNewSynchronization()) {
600                        if (status.isDebug()) {
601                                logger.trace("Triggering beforeCompletion synchronization");
602                        }
603                        return TransactionSynchronizationUtils.triggerBeforeCompletion(synchronizationManager.getSynchronizations());
604                }
605                return Mono.empty();
606        }
607
608        /**
609         * Trigger {@code afterCommit} callbacks.
610         * @param synchronizationManager the synchronization manager bound to the current transaction
611         * @param status object representing the transaction
612         */
613        private Mono<Void> triggerAfterCommit(TransactionSynchronizationManager synchronizationManager,
614                        GenericReactiveTransaction status) {
615
616                if (status.isNewSynchronization()) {
617                        if (status.isDebug()) {
618                                logger.trace("Triggering afterCommit synchronization");
619                        }
620                        return TransactionSynchronizationUtils.invokeAfterCommit(synchronizationManager.getSynchronizations());
621                }
622                return Mono.empty();
623        }
624
625        /**
626         * Trigger {@code afterCompletion} callbacks.
627         * @param synchronizationManager the synchronization manager bound to the current transaction
628         * @param status object representing the transaction
629         * @param completionStatus completion status according to TransactionSynchronization constants
630         */
631        private Mono<Void> triggerAfterCompletion(TransactionSynchronizationManager synchronizationManager,
632                        GenericReactiveTransaction status, int completionStatus) {
633
634                if (status.isNewSynchronization()) {
635                        List<TransactionSynchronization> synchronizations = synchronizationManager.getSynchronizations();
636                        synchronizationManager.clearSynchronization();
637                        if (!status.hasTransaction() || status.isNewTransaction()) {
638                                if (status.isDebug()) {
639                                        logger.trace("Triggering afterCompletion synchronization");
640                                }
641                                // No transaction or new transaction for the current scope ->
642                                // invoke the afterCompletion callbacks immediately
643                                return invokeAfterCompletion(synchronizationManager, synchronizations, completionStatus);
644                        }
645                        else if (!synchronizations.isEmpty()) {
646                                // Existing transaction that we participate in, controlled outside
647                                // of the scope of this Spring transaction manager -> try to register
648                                // an afterCompletion callback with the existing (JTA) transaction.
649                                return registerAfterCompletionWithExistingTransaction(
650                                                synchronizationManager, status.getTransaction(), synchronizations);
651                        }
652                }
653                return Mono.empty();
654        }
655
656        /**
657         * Actually invoke the {@code afterCompletion} methods of the
658         * given TransactionSynchronization objects.
659         * <p>To be called by this abstract manager itself, or by special implementations
660         * of the {@code registerAfterCompletionWithExistingTransaction} callback.
661         * @param synchronizationManager the synchronization manager bound to the current transaction
662         * @param synchronizations a List of TransactionSynchronization objects
663         * @param completionStatus the completion status according to the
664         * constants in the TransactionSynchronization interface
665         * @see #registerAfterCompletionWithExistingTransaction(TransactionSynchronizationManager, Object, List)
666         * @see TransactionSynchronization#STATUS_COMMITTED
667         * @see TransactionSynchronization#STATUS_ROLLED_BACK
668         * @see TransactionSynchronization#STATUS_UNKNOWN
669         */
670        private Mono<Void> invokeAfterCompletion(TransactionSynchronizationManager synchronizationManager,
671                        List<TransactionSynchronization> synchronizations, int completionStatus) {
672
673                return TransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus);
674        }
675
676        /**
677         * Clean up after completion, clearing synchronization if necessary,
678         * and invoking doCleanupAfterCompletion.
679         * @param synchronizationManager the synchronization manager bound to the current transaction
680         * @param status object representing the transaction
681         * @see #doCleanupAfterCompletion
682         */
683        private Mono<Void> cleanupAfterCompletion(TransactionSynchronizationManager synchronizationManager,
684                        GenericReactiveTransaction status) {
685
686                return Mono.defer(() -> {
687                        status.setCompleted();
688                        if (status.isNewSynchronization()) {
689                                synchronizationManager.clear();
690                        }
691                        Mono<Void> cleanup = Mono.empty();
692                        if (status.isNewTransaction()) {
693                                cleanup = doCleanupAfterCompletion(synchronizationManager, status.getTransaction());
694                        }
695                        if (status.getSuspendedResources() != null) {
696                                if (status.isDebug()) {
697                                        logger.debug("Resuming suspended transaction after completion of inner transaction");
698                                }
699                                Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
700                                return cleanup.then(resume(synchronizationManager, transaction,
701                                                (SuspendedResourcesHolder) status.getSuspendedResources()));
702                        }
703                        return cleanup;
704                });
705        }
706
707
708        //---------------------------------------------------------------------
709        // Template methods to be implemented in subclasses
710        //---------------------------------------------------------------------
711
712        /**
713         * Return a transaction object for the current transaction state.
714         * <p>The returned object will usually be specific to the concrete transaction
715         * manager implementation, carrying corresponding transaction state in a
716         * modifiable fashion. This object will be passed into the other template
717         * methods (e.g. doBegin and doCommit), either directly or as part of a
718         * DefaultReactiveTransactionStatus instance.
719         * <p>The returned object should contain information about any existing
720         * transaction, that is, a transaction that has already started before the
721         * current {@code getTransaction} call on the transaction manager.
722         * Consequently, a {@code doGetTransaction} implementation will usually
723         * look for an existing transaction and store corresponding state in the
724         * returned transaction object.
725         * @param synchronizationManager the synchronization manager bound to the current transaction
726         * @return the current transaction object
727         * @throws org.springframework.transaction.CannotCreateTransactionException
728         * if transaction support is not available
729         * @throws TransactionException in case of lookup or system errors
730         * @see #doBegin
731         * @see #doCommit
732         * @see #doRollback
733         * @see GenericReactiveTransaction#getTransaction
734         */
735        protected abstract Object doGetTransaction(TransactionSynchronizationManager synchronizationManager)
736                        throws TransactionException;
737
738        /**
739         * Check if the given transaction object indicates an existing transaction
740         * (that is, a transaction which has already started).
741         * <p>The result will be evaluated according to the specified propagation
742         * behavior for the new transaction. An existing transaction might get
743         * suspended (in case of PROPAGATION_REQUIRES_NEW), or the new transaction
744         * might participate in the existing one (in case of PROPAGATION_REQUIRED).
745         * <p>The default implementation returns {@code false}, assuming that
746         * participating in existing transactions is generally not supported.
747         * Subclasses are of course encouraged to provide such support.
748         * @param transaction the transaction object returned by doGetTransaction
749         * @return if there is an existing transaction
750         * @throws TransactionException in case of system errors
751         * @see #doGetTransaction
752         */
753        protected boolean isExistingTransaction(Object transaction) throws TransactionException {
754                return false;
755        }
756
757        /**
758         * Begin a new transaction with semantics according to the given transaction
759         * definition. Does not have to care about applying the propagation behavior,
760         * as this has already been handled by this abstract manager.
761         * <p>This method gets called when the transaction manager has decided to actually
762         * start a new transaction. Either there wasn't any transaction before, or the
763         * previous transaction has been suspended.
764         * <p>A special scenario is a nested transaction: This method will be called to
765         * start a nested transaction when necessary. In such a context, there will be an
766         * active transaction: The implementation of this method has to detect this and
767         * start an appropriate nested transaction.
768         * @param synchronizationManager the synchronization manager bound to the new transaction
769         * @param transaction the transaction object returned by {@code doGetTransaction}
770         * @param definition a TransactionDefinition instance, describing propagation
771         * behavior, isolation level, read-only flag, timeout, and transaction name
772         * @throws TransactionException in case of creation or system errors
773         * @throws org.springframework.transaction.NestedTransactionNotSupportedException
774         * if the underlying transaction does not support nesting (e.g. through savepoints)
775         */
776        protected abstract Mono<Void> doBegin(TransactionSynchronizationManager synchronizationManager,
777                        Object transaction, TransactionDefinition definition) throws TransactionException;
778
779        /**
780         * Suspend the resources of the current transaction.
781         * Transaction synchronization will already have been suspended.
782         * <p>The default implementation throws a TransactionSuspensionNotSupportedException,
783         * assuming that transaction suspension is generally not supported.
784         * @param synchronizationManager the synchronization manager bound to the current transaction
785         * @param transaction the transaction object returned by {@code doGetTransaction}
786         * @return an object that holds suspended resources
787         * (will be kept unexamined for passing it into doResume)
788         * @throws org.springframework.transaction.TransactionSuspensionNotSupportedException
789         * if suspending is not supported by the transaction manager implementation
790         * @throws TransactionException in case of system errors
791         * @see #doResume
792         */
793        protected Mono<Object> doSuspend(TransactionSynchronizationManager synchronizationManager,
794                        Object transaction) throws TransactionException {
795
796                throw new TransactionSuspensionNotSupportedException(
797                                "Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
798        }
799
800        /**
801         * Resume the resources of the current transaction.
802         * Transaction synchronization will be resumed afterwards.
803         * <p>The default implementation throws a TransactionSuspensionNotSupportedException,
804         * assuming that transaction suspension is generally not supported.
805         * @param synchronizationManager the synchronization manager bound to the current transaction
806         * @param transaction the transaction object returned by {@code doGetTransaction}
807         * @param suspendedResources the object that holds suspended resources,
808         * as returned by doSuspend
809         * @throws org.springframework.transaction.TransactionSuspensionNotSupportedException
810         * if suspending is not supported by the transaction manager implementation
811         * @throws TransactionException in case of system errors
812         * @see #doSuspend
813         */
814        protected Mono<Void> doResume(TransactionSynchronizationManager synchronizationManager,
815                        @Nullable Object transaction, Object suspendedResources) throws TransactionException {
816
817                throw new TransactionSuspensionNotSupportedException(
818                                "Transaction manager [" + getClass().getName() + "] does not support transaction suspension");
819        }
820
821        /**
822         * Make preparations for commit, to be performed before the
823         * {@code beforeCommit} synchronization callbacks occur.
824         * <p>Note that exceptions will get propagated to the commit caller
825         * and cause a rollback of the transaction.
826         * @param synchronizationManager the synchronization manager bound to the current transaction
827         * @param status the status representation of the transaction
828         * @throws RuntimeException in case of errors; will be <b>propagated to the caller</b>
829         * (note: do not throw TransactionException subclasses here!)
830         */
831        protected Mono<Void> prepareForCommit(TransactionSynchronizationManager synchronizationManager,
832                        GenericReactiveTransaction status) {
833
834                return Mono.empty();
835        }
836
837        /**
838         * Perform an actual commit of the given transaction.
839         * <p>An implementation does not need to check the "new transaction" flag
840         * or the rollback-only flag; this will already have been handled before.
841         * Usually, a straight commit will be performed on the transaction object
842         * contained in the passed-in status.
843         * @param synchronizationManager the synchronization manager bound to the current transaction
844         * @param status the status representation of the transaction
845         * @throws TransactionException in case of commit or system errors
846         * @see GenericReactiveTransaction#getTransaction
847         */
848        protected abstract Mono<Void> doCommit(TransactionSynchronizationManager synchronizationManager,
849                        GenericReactiveTransaction status) throws TransactionException;
850
851        /**
852         * Perform an actual rollback of the given transaction.
853         * <p>An implementation does not need to check the "new transaction" flag;
854         * this will already have been handled before. Usually, a straight rollback
855         * will be performed on the transaction object contained in the passed-in status.
856         * @param synchronizationManager the synchronization manager bound to the current transaction
857         * @param status the status representation of the transaction
858         * @throws TransactionException in case of system errors
859         * @see GenericReactiveTransaction#getTransaction
860         */
861        protected abstract Mono<Void> doRollback(TransactionSynchronizationManager synchronizationManager,
862                        GenericReactiveTransaction status) throws TransactionException;
863
864        /**
865         * Set the given transaction rollback-only. Only called on rollback
866         * if the current transaction participates in an existing one.
867         * <p>The default implementation throws an IllegalTransactionStateException,
868         * assuming that participating in existing transactions is generally not
869         * supported. Subclasses are of course encouraged to provide such support.
870         * @param synchronizationManager the synchronization manager bound to the current transaction
871         * @param status the status representation of the transaction
872         * @throws TransactionException in case of system errors
873         */
874        protected Mono<Void> doSetRollbackOnly(TransactionSynchronizationManager synchronizationManager,
875                        GenericReactiveTransaction status) throws TransactionException {
876
877                throw new IllegalTransactionStateException(
878                                "Participating in existing transactions is not supported - when 'isExistingTransaction' " +
879                                "returns true, appropriate 'doSetRollbackOnly' behavior must be provided");
880        }
881
882        /**
883         * Register the given list of transaction synchronizations with the existing transaction.
884         * <p>Invoked when the control of the Spring transaction manager and thus all Spring
885         * transaction synchronizations end, without the transaction being completed yet. This
886         * is for example the case when participating in an existing JTA or EJB CMT transaction.
887         * <p>The default implementation simply invokes the {@code afterCompletion} methods
888         * immediately, passing in "STATUS_UNKNOWN". This is the best we can do if there's no
889         * chance to determine the actual outcome of the outer transaction.
890         * @param synchronizationManager the synchronization manager bound to the current transaction
891         * @param transaction the transaction object returned by {@code doGetTransaction}
892         * @param synchronizations a List of TransactionSynchronization objects
893         * @throws TransactionException in case of system errors
894         * @see #invokeAfterCompletion(TransactionSynchronizationManager, List, int)
895         * @see TransactionSynchronization#afterCompletion(int)
896         * @see TransactionSynchronization#STATUS_UNKNOWN
897         */
898        protected Mono<Void> registerAfterCompletionWithExistingTransaction(TransactionSynchronizationManager synchronizationManager,
899                        Object transaction, List<TransactionSynchronization> synchronizations) throws TransactionException {
900
901                logger.debug("Cannot register Spring after-completion synchronization with existing transaction - " +
902                                "processing Spring after-completion callbacks immediately, with outcome status 'unknown'");
903                return invokeAfterCompletion(synchronizationManager, synchronizations, TransactionSynchronization.STATUS_UNKNOWN);
904        }
905
906        /**
907         * Cleanup resources after transaction completion.
908         * <p>Called after {@code doCommit} and {@code doRollback} execution,
909         * on any outcome. The default implementation does nothing.
910         * <p>Should not throw any exceptions but just issue warnings on errors.
911         * @param synchronizationManager the synchronization manager bound to the current transaction
912         * @param transaction the transaction object returned by {@code doGetTransaction}
913         */
914        protected Mono<Void> doCleanupAfterCompletion(TransactionSynchronizationManager synchronizationManager,
915                        Object transaction) {
916
917                return Mono.empty();
918        }
919
920
921        //---------------------------------------------------------------------
922        // Serialization support
923        //---------------------------------------------------------------------
924
925        private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
926                // Rely on default serialization; just initialize state after deserialization.
927                ois.defaultReadObject();
928
929                // Initialize transient fields.
930                this.logger = LogFactory.getLog(getClass());
931        }
932
933
934        /**
935         * Holder for suspended resources.
936         * Used internally by {@code suspend} and {@code resume}.
937         */
938        protected static final class SuspendedResourcesHolder {
939
940                @Nullable
941                private final Object suspendedResources;
942
943                @Nullable
944                private List<TransactionSynchronization> suspendedSynchronizations;
945
946                @Nullable
947                private String name;
948
949                private boolean readOnly;
950
951                @Nullable
952                private Integer isolationLevel;
953
954                private boolean wasActive;
955
956                private SuspendedResourcesHolder(@Nullable Object suspendedResources) {
957                        this.suspendedResources = suspendedResources;
958                }
959
960                private SuspendedResourcesHolder(
961                                @Nullable Object suspendedResources, List<TransactionSynchronization> suspendedSynchronizations,
962                                @Nullable String name, boolean readOnly, @Nullable Integer isolationLevel, boolean wasActive) {
963
964                        this.suspendedResources = suspendedResources;
965                        this.suspendedSynchronizations = suspendedSynchronizations;
966                        this.name = name;
967                        this.readOnly = readOnly;
968                        this.isolationLevel = isolationLevel;
969                        this.wasActive = wasActive;
970                }
971        }
972
973
974        /**
975         * Predicates for exception types that transactional error handling applies to.
976         */
977        private enum ErrorPredicates implements Predicate<Throwable> {
978
979                /**
980                 * Predicate matching {@link RuntimeException} or {@link Error}.
981                 */
982                RUNTIME_OR_ERROR {
983                        @Override
984                        public boolean test(Throwable throwable) {
985                                return throwable instanceof RuntimeException || throwable instanceof Error;
986                        }
987                },
988
989                /**
990                 * Predicate matching {@link TransactionException}.
991                 */
992                TRANSACTION_EXCEPTION {
993                        @Override
994                        public boolean test(Throwable throwable) {
995                                return throwable instanceof TransactionException;
996                        }
997                },
998
999                /**
1000                 * Predicate matching {@link UnexpectedRollbackException}.
1001                 */
1002                UNEXPECTED_ROLLBACK {
1003                        @Override
1004                        public boolean test(Throwable throwable) {
1005                                return throwable instanceof UnexpectedRollbackException;
1006                        }
1007                };
1008
1009                @Override
1010                public abstract boolean test(Throwable throwable);
1011        }
1012
1013}