001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.transaction.reactive; 018 019import java.io.IOException; 020import java.io.ObjectInputStream; 021import java.io.Serializable; 022import java.util.List; 023import java.util.Optional; 024import java.util.concurrent.atomic.AtomicBoolean; 025import java.util.function.Predicate; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import reactor.core.publisher.Flux; 030import reactor.core.publisher.Mono; 031 032import org.springframework.lang.Nullable; 033import org.springframework.transaction.IllegalTransactionStateException; 034import org.springframework.transaction.InvalidTimeoutException; 035import org.springframework.transaction.ReactiveTransaction; 036import org.springframework.transaction.ReactiveTransactionManager; 037import org.springframework.transaction.TransactionDefinition; 038import org.springframework.transaction.TransactionException; 039import org.springframework.transaction.TransactionSuspensionNotSupportedException; 040import org.springframework.transaction.UnexpectedRollbackException; 041 042/** 043 * Abstract base class that implements Spring's standard reactive transaction workflow, 044 * serving as basis for concrete platform transaction managers. 045 * 046 * <p>This base class provides the following workflow handling: 047 * <ul> 048 * <li>determines if there is an existing transaction; 049 * <li>applies the appropriate propagation behavior; 050 * <li>suspends and resumes transactions if necessary; 051 * <li>checks the rollback-only flag on commit; 052 * <li>applies the appropriate modification on rollback 053 * (actual rollback or setting rollback-only); 054 * <li>triggers registered synchronization callbacks. 055 * </ul> 056 * 057 * <p>Subclasses have to implement specific template methods for specific 058 * states of a transaction, e.g.: begin, suspend, resume, commit, rollback. 059 * The most important of them are abstract and must be provided by a concrete 060 * implementation; for the rest, defaults are provided, so overriding is optional. 061 * 062 * <p>Transaction synchronization is a generic mechanism for registering callbacks 063 * that get invoked at transaction completion time. This is mainly used internally 064 * by the data access support classes for R2DBC, MongoDB, etc. The same mechanism can 065 * also be leveraged for custom synchronization needs in an application. 066 * 067 * <p>The state of this class is serializable, to allow for serializing the 068 * transaction strategy along with proxies that carry a transaction interceptor. 069 * It is up to subclasses if they wish to make their state to be serializable too. 070 * They should implement the {@code java.io.Serializable} marker interface in 071 * that case, and potentially a private {@code readObject()} method (according 072 * to Java serialization rules) if they need to restore any transient state. 073 * 074 * @author Mark Paluch 075 * @author Juergen Hoeller 076 * @since 5.2 077 * @see TransactionSynchronizationManager 078 */ 079@SuppressWarnings("serial") 080public abstract class AbstractReactiveTransactionManager implements ReactiveTransactionManager, Serializable { 081 082 protected transient Log logger = LogFactory.getLog(getClass()); 083 084 085 //--------------------------------------------------------------------- 086 // Implementation of ReactiveTransactionManager 087 //--------------------------------------------------------------------- 088 089 /** 090 * This implementation handles propagation behavior. Delegates to 091 * {@code doGetTransaction}, {@code isExistingTransaction} 092 * and {@code doBegin}. 093 * @see #doGetTransaction 094 * @see #isExistingTransaction 095 * @see #doBegin 096 */ 097 @Override 098 public final Mono<ReactiveTransaction> getReactiveTransaction(@Nullable TransactionDefinition definition) 099 throws TransactionException { 100 101 // Use defaults if no transaction definition given. 102 TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults()); 103 104 return TransactionSynchronizationManager.forCurrentTransaction() 105 .flatMap(synchronizationManager -> { 106 107 Object transaction = doGetTransaction(synchronizationManager); 108 109 // Cache debug flag to avoid repeated checks. 110 boolean debugEnabled = logger.isDebugEnabled(); 111 112 if (isExistingTransaction(transaction)) { 113 // Existing transaction found -> check propagation behavior to find out how to behave. 114 return handleExistingTransaction(synchronizationManager, def, transaction, debugEnabled); 115 } 116 117 // Check definition settings for new transaction. 118 if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { 119 return Mono.error(new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout())); 120 } 121 122 // No existing transaction found -> check propagation behavior to find out how to proceed. 123 if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { 124 return Mono.error(new IllegalTransactionStateException( 125 "No existing transaction found for transaction marked with propagation 'mandatory'")); 126 } 127 else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || 128 def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || 129 def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { 130 131 return TransactionContextManager.currentContext() 132 .map(TransactionSynchronizationManager::new) 133 .flatMap(nestedSynchronizationManager -> 134 suspend(nestedSynchronizationManager, null) 135 .map(Optional::of) 136 .defaultIfEmpty(Optional.empty()) 137 .flatMap(suspendedResources -> { 138 if (debugEnabled) { 139 logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); 140 } 141 return Mono.defer(() -> { 142 GenericReactiveTransaction status = newReactiveTransaction( 143 nestedSynchronizationManager, def, transaction, true, 144 debugEnabled, suspendedResources.orElse(null)); 145 return doBegin(nestedSynchronizationManager, transaction, def) 146 .doOnSuccess(ignore -> prepareSynchronization(nestedSynchronizationManager, status, def)) 147 .thenReturn(status); 148 }).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, 149 ex -> resume(nestedSynchronizationManager, null, suspendedResources.orElse(null)) 150 .then(Mono.error(ex))); 151 })); 152 } 153 else { 154 // Create "empty" transaction: no actual transaction, but potentially synchronization. 155 if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { 156 logger.warn("Custom isolation level specified but no actual transaction initiated; " + 157 "isolation level will effectively be ignored: " + def); 158 } 159 return Mono.just(prepareReactiveTransaction(synchronizationManager, def, null, true, debugEnabled, null)); 160 } 161 }); 162 } 163 164 /** 165 * Create a ReactiveTransaction for an existing transaction. 166 */ 167 private Mono<ReactiveTransaction> handleExistingTransaction(TransactionSynchronizationManager synchronizationManager, 168 TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { 169 170 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { 171 return Mono.error(new IllegalTransactionStateException( 172 "Existing transaction found for transaction marked with propagation 'never'")); 173 } 174 175 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { 176 if (debugEnabled) { 177 logger.debug("Suspending current transaction"); 178 } 179 Mono<SuspendedResourcesHolder> suspend = suspend(synchronizationManager, transaction); 180 return suspend.map(suspendedResources -> prepareReactiveTransaction(synchronizationManager, 181 definition, null, false, debugEnabled, suspendedResources)) // 182 .switchIfEmpty(Mono.fromSupplier(() -> prepareReactiveTransaction(synchronizationManager, 183 definition, null, false, debugEnabled, null))) 184 .cast(ReactiveTransaction.class); 185 } 186 187 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { 188 if (debugEnabled) { 189 logger.debug("Suspending current transaction, creating new transaction with name [" + 190 definition.getName() + "]"); 191 } 192 Mono<SuspendedResourcesHolder> suspendedResources = suspend(synchronizationManager, transaction); 193 return suspendedResources.flatMap(suspendedResourcesHolder -> { 194 GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager, 195 definition, transaction, true, debugEnabled, suspendedResourcesHolder); 196 return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore -> 197 prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status) 198 .onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, beginEx -> 199 resumeAfterBeginException(synchronizationManager, transaction, suspendedResourcesHolder, beginEx) 200 .then(Mono.error(beginEx))); 201 }); 202 } 203 204 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { 205 if (debugEnabled) { 206 logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); 207 } 208 // Nested transaction through nested begin and commit/rollback calls. 209 GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager, 210 definition, transaction, true, debugEnabled, null); 211 return doBegin(synchronizationManager, transaction, definition).doOnSuccess(ignore -> 212 prepareSynchronization(synchronizationManager, status, definition)).thenReturn(status); 213 } 214 215 // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. 216 if (debugEnabled) { 217 logger.debug("Participating in existing transaction"); 218 } 219 return Mono.just(prepareReactiveTransaction(synchronizationManager, definition, transaction, false, debugEnabled, null)); 220 } 221 222 /** 223 * Create a new ReactiveTransaction for the given arguments, 224 * also initializing transaction synchronization as appropriate. 225 * @see #newReactiveTransaction 226 * @see #prepareReactiveTransaction 227 */ 228 private GenericReactiveTransaction prepareReactiveTransaction( 229 TransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, 230 @Nullable Object transaction, boolean newTransaction, boolean debug, @Nullable Object suspendedResources) { 231 232 GenericReactiveTransaction status = newReactiveTransaction(synchronizationManager, 233 definition, transaction, newTransaction, debug, suspendedResources); 234 prepareSynchronization(synchronizationManager, status, definition); 235 return status; 236 } 237 238 /** 239 * Create a ReactiveTransaction instance for the given arguments. 240 */ 241 private GenericReactiveTransaction newReactiveTransaction( 242 TransactionSynchronizationManager synchronizationManager, TransactionDefinition definition, 243 @Nullable Object transaction, boolean newTransaction, boolean debug, @Nullable Object suspendedResources) { 244 245 return new GenericReactiveTransaction(transaction, newTransaction, 246 !synchronizationManager.isSynchronizationActive(), 247 definition.isReadOnly(), debug, suspendedResources); 248 } 249 250 /** 251 * Initialize transaction synchronization as appropriate. 252 */ 253 private void prepareSynchronization(TransactionSynchronizationManager synchronizationManager, 254 GenericReactiveTransaction status, TransactionDefinition definition) { 255 256 if (status.isNewSynchronization()) { 257 synchronizationManager.setActualTransactionActive(status.hasTransaction()); 258 synchronizationManager.setCurrentTransactionIsolationLevel( 259 definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? 260 definition.getIsolationLevel() : null); 261 synchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly()); 262 synchronizationManager.setCurrentTransactionName(definition.getName()); 263 synchronizationManager.initSynchronization(); 264 } 265 } 266 267 268 /** 269 * Suspend the given transaction. Suspends transaction synchronization first, 270 * then delegates to the {@code doSuspend} template method. 271 * @param synchronizationManager the synchronization manager bound to the current transaction 272 * @param transaction the current transaction object 273 * (or {@code null} to just suspend active synchronizations, if any) 274 * @return an object that holds suspended resources 275 * (or {@code null} if neither transaction nor synchronization active) 276 * @see #doSuspend 277 * @see #resume 278 */ 279 private Mono<SuspendedResourcesHolder> suspend(TransactionSynchronizationManager synchronizationManager, 280 @Nullable Object transaction) throws TransactionException { 281 282 if (synchronizationManager.isSynchronizationActive()) { 283 Mono<List<TransactionSynchronization>> suspendedSynchronizations = doSuspendSynchronization(synchronizationManager); 284 return suspendedSynchronizations.flatMap(synchronizations -> { 285 Mono<Optional<Object>> suspendedResources = (transaction != null ? 286 doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty()) : 287 Mono.just(Optional.empty())); 288 return suspendedResources.map(it -> { 289 String name = synchronizationManager.getCurrentTransactionName(); 290 synchronizationManager.setCurrentTransactionName(null); 291 boolean readOnly = synchronizationManager.isCurrentTransactionReadOnly(); 292 synchronizationManager.setCurrentTransactionReadOnly(false); 293 Integer isolationLevel = synchronizationManager.getCurrentTransactionIsolationLevel(); 294 synchronizationManager.setCurrentTransactionIsolationLevel(null); 295 boolean wasActive = synchronizationManager.isActualTransactionActive(); 296 synchronizationManager.setActualTransactionActive(false); 297 return new SuspendedResourcesHolder( 298 it.orElse(null), synchronizations, name, readOnly, isolationLevel, wasActive); 299 }).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, 300 ex -> doResumeSynchronization(synchronizationManager, synchronizations) 301 .cast(SuspendedResourcesHolder.class)); 302 }); 303 } 304 else if (transaction != null) { 305 // Transaction active but no synchronization active. 306 Mono<Optional<Object>> suspendedResources = 307 doSuspend(synchronizationManager, transaction).map(Optional::of).defaultIfEmpty(Optional.empty()); 308 return suspendedResources.map(it -> new SuspendedResourcesHolder(it.orElse(null))); 309 } 310 else { 311 // Neither transaction nor synchronization active. 312 return Mono.empty(); 313 } 314 } 315 316 /** 317 * Resume the given transaction. Delegates to the {@code doResume} 318 * template method first, then resuming transaction synchronization. 319 * @param synchronizationManager the synchronization manager bound to the current transaction 320 * @param transaction the current transaction object 321 * @param resourcesHolder the object that holds suspended resources, 322 * as returned by {@code suspend} (or {@code null} to just 323 * resume synchronizations, if any) 324 * @see #doResume 325 * @see #suspend 326 */ 327 private Mono<Void> resume(TransactionSynchronizationManager synchronizationManager, 328 @Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder) 329 throws TransactionException { 330 331 Mono<Void> resume = Mono.empty(); 332 333 if (resourcesHolder != null) { 334 Object suspendedResources = resourcesHolder.suspendedResources; 335 if (suspendedResources != null) { 336 resume = doResume(synchronizationManager, transaction, suspendedResources); 337 } 338 List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations; 339 if (suspendedSynchronizations != null) { 340 synchronizationManager.setActualTransactionActive(resourcesHolder.wasActive); 341 synchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel); 342 synchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly); 343 synchronizationManager.setCurrentTransactionName(resourcesHolder.name); 344 return resume.then(doResumeSynchronization(synchronizationManager, suspendedSynchronizations)); 345 } 346 } 347 348 return resume; 349 } 350 351 /** 352 * Resume outer transaction after inner transaction begin failed. 353 */ 354 private Mono<Void> resumeAfterBeginException(TransactionSynchronizationManager synchronizationManager, 355 Object transaction, @Nullable SuspendedResourcesHolder suspendedResources, Throwable beginEx) { 356 357 String exMessage = "Inner transaction begin exception overridden by outer transaction resume exception"; 358 return resume(synchronizationManager, transaction, suspendedResources).doOnError(ErrorPredicates.RUNTIME_OR_ERROR, 359 ex -> logger.error(exMessage, beginEx)); 360 } 361 362 /** 363 * Suspend all current synchronizations and deactivate transaction 364 * synchronization for the current transaction context. 365 * @param synchronizationManager the synchronization manager bound to the current transaction 366 * @return the List of suspended TransactionSynchronization objects 367 */ 368 private Mono<List<TransactionSynchronization>> doSuspendSynchronization( 369 TransactionSynchronizationManager synchronizationManager) { 370 371 List<TransactionSynchronization> suspendedSynchronizations = synchronizationManager.getSynchronizations(); 372 return Flux.fromIterable(suspendedSynchronizations) 373 .concatMap(TransactionSynchronization::suspend) 374 .then(Mono.defer(() -> { 375 synchronizationManager.clearSynchronization(); 376 return Mono.just(suspendedSynchronizations); 377 })); 378 } 379 380 /** 381 * Reactivate transaction synchronization for the current transaction context 382 * and resume all given synchronizations. 383 * @param synchronizationManager the synchronization manager bound to the current transaction 384 * @param suspendedSynchronizations a List of TransactionSynchronization objects 385 */ 386 private Mono<Void> doResumeSynchronization(TransactionSynchronizationManager synchronizationManager, 387 List<TransactionSynchronization> suspendedSynchronizations) { 388 389 synchronizationManager.initSynchronization(); 390 return Flux.fromIterable(suspendedSynchronizations) 391 .concatMap(synchronization -> synchronization.resume() 392 .doOnSuccess(ignore -> synchronizationManager.registerSynchronization(synchronization))).then(); 393 } 394 395 396 /** 397 * This implementation of commit handles participating in existing 398 * transactions and programmatic rollback requests. 399 * Delegates to {@code isRollbackOnly}, {@code doCommit} 400 * and {@code rollback}. 401 * @see ReactiveTransaction#isRollbackOnly() 402 * @see #doCommit 403 * @see #rollback 404 */ 405 @Override 406 public final Mono<Void> commit(ReactiveTransaction transaction) throws TransactionException { 407 if (transaction.isCompleted()) { 408 return Mono.error(new IllegalTransactionStateException( 409 "Transaction is already completed - do not call commit or rollback more than once per transaction")); 410 } 411 412 return TransactionSynchronizationManager.forCurrentTransaction().flatMap(synchronizationManager -> { 413 GenericReactiveTransaction reactiveTx = (GenericReactiveTransaction) transaction; 414 if (reactiveTx.isRollbackOnly()) { 415 if (reactiveTx.isDebug()) { 416 logger.debug("Transactional code has requested rollback"); 417 } 418 return processRollback(synchronizationManager, reactiveTx); 419 } 420 return processCommit(synchronizationManager, reactiveTx); 421 }); 422 } 423 424 /** 425 * Process an actual commit. 426 * Rollback-only flags have already been checked and applied. 427 * @param synchronizationManager the synchronization manager bound to the current transaction 428 * @param status object representing the transaction 429 * @throws TransactionException in case of commit failure 430 */ 431 private Mono<Void> processCommit(TransactionSynchronizationManager synchronizationManager, 432 GenericReactiveTransaction status) throws TransactionException { 433 434 AtomicBoolean beforeCompletionInvoked = new AtomicBoolean(false); 435 436 Mono<Object> commit = prepareForCommit(synchronizationManager, status) 437 .then(triggerBeforeCommit(synchronizationManager, status)) 438 .then(triggerBeforeCompletion(synchronizationManager, status)) 439 .then(Mono.defer(() -> { 440 beforeCompletionInvoked.set(true); 441 if (status.isNewTransaction()) { 442 if (status.isDebug()) { 443 logger.debug("Initiating transaction commit"); 444 } 445 return doCommit(synchronizationManager, status); 446 } 447 return Mono.empty(); 448 })).then(Mono.empty().onErrorResume(ex -> { 449 Mono<Object> propagateException = Mono.error(ex); 450 // Store result in a local variable in order to appease the 451 // Eclipse compiler with regard to inferred generics. 452 Mono<Object> result = propagateException; 453 if (ErrorPredicates.UNEXPECTED_ROLLBACK.test(ex)) { 454 result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK) 455 .then(propagateException); 456 } 457 else if (ErrorPredicates.TRANSACTION_EXCEPTION.test(ex)) { 458 result = triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN) 459 .then(propagateException); 460 } 461 else if (ErrorPredicates.RUNTIME_OR_ERROR.test(ex)) { 462 Mono<Void> mono; 463 if (!beforeCompletionInvoked.get()) { 464 mono = triggerBeforeCompletion(synchronizationManager, status); 465 } 466 else { 467 mono = Mono.empty(); 468 } 469 result = mono.then(doRollbackOnCommitException(synchronizationManager, status, ex)) 470 .then(propagateException); 471 } 472 473 return result; 474 })).then(Mono.defer(() -> triggerAfterCommit(synchronizationManager, status).onErrorResume(ex -> 475 triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED).then(Mono.error(ex))) 476 .then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_COMMITTED)))); 477 478 return commit 479 .onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status) 480 .then(Mono.error(ex))).then(cleanupAfterCompletion(synchronizationManager, status)); 481 } 482 483 /** 484 * This implementation of rollback handles participating in existing transactions. 485 * Delegates to {@code doRollback} and {@code doSetRollbackOnly}. 486 * @see #doRollback 487 * @see #doSetRollbackOnly 488 */ 489 @Override 490 public final Mono<Void> rollback(ReactiveTransaction transaction) throws TransactionException { 491 if (transaction.isCompleted()) { 492 return Mono.error(new IllegalTransactionStateException( 493 "Transaction is already completed - do not call commit or rollback more than once per transaction")); 494 } 495 return TransactionSynchronizationManager.forCurrentTransaction().flatMap(synchronizationManager -> { 496 GenericReactiveTransaction reactiveTx = (GenericReactiveTransaction) transaction; 497 return processRollback(synchronizationManager, reactiveTx); 498 }); 499 } 500 501 /** 502 * Process an actual rollback. 503 * The completed flag has already been checked. 504 * @param synchronizationManager the synchronization manager bound to the current transaction 505 * @param status object representing the transaction 506 * @throws TransactionException in case of rollback failure 507 */ 508 private Mono<Void> processRollback(TransactionSynchronizationManager synchronizationManager, 509 GenericReactiveTransaction status) { 510 511 return triggerBeforeCompletion(synchronizationManager, status).then(Mono.defer(() -> { 512 if (status.isNewTransaction()) { 513 if (status.isDebug()) { 514 logger.debug("Initiating transaction rollback"); 515 } 516 return doRollback(synchronizationManager, status); 517 } 518 else { 519 Mono<Void> beforeCompletion = Mono.empty(); 520 // Participating in larger transaction 521 if (status.hasTransaction()) { 522 if (status.isDebug()) { 523 logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); 524 } 525 beforeCompletion = doSetRollbackOnly(synchronizationManager, status); 526 } 527 else { 528 logger.debug("Should roll back transaction but cannot - no transaction available"); 529 } 530 return beforeCompletion; 531 } 532 })).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, ex -> triggerAfterCompletion( 533 synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN) 534 .then(Mono.error(ex))) 535 .then(Mono.defer(() -> triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK))) 536 .onErrorResume(ex -> cleanupAfterCompletion(synchronizationManager, status).then(Mono.error(ex))) 537 .then(cleanupAfterCompletion(synchronizationManager, status)); 538 } 539 540 /** 541 * Invoke {@code doRollback}, handling rollback exceptions properly. 542 * @param synchronizationManager the synchronization manager bound to the current transaction 543 * @param status object representing the transaction 544 * @param ex the thrown application exception or error 545 * @throws TransactionException in case of rollback failure 546 * @see #doRollback 547 */ 548 private Mono<Void> doRollbackOnCommitException(TransactionSynchronizationManager synchronizationManager, 549 GenericReactiveTransaction status, Throwable ex) throws TransactionException { 550 551 return Mono.defer(() -> { 552 if (status.isNewTransaction()) { 553 if (status.isDebug()) { 554 logger.debug("Initiating transaction rollback after commit exception", ex); 555 } 556 return doRollback(synchronizationManager, status); 557 } 558 else if (status.hasTransaction()) { 559 if (status.isDebug()) { 560 logger.debug("Marking existing transaction as rollback-only after commit exception", ex); 561 } 562 return doSetRollbackOnly(synchronizationManager, status); 563 } 564 return Mono.empty(); 565 }).onErrorResume(ErrorPredicates.RUNTIME_OR_ERROR, rbex -> { 566 logger.error("Commit exception overridden by rollback exception", ex); 567 return triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_UNKNOWN) 568 .then(Mono.error(rbex)); 569 }).then(triggerAfterCompletion(synchronizationManager, status, TransactionSynchronization.STATUS_ROLLED_BACK)); 570 } 571 572 573 /** 574 * Trigger {@code beforeCommit} callbacks. 575 * @param synchronizationManager the synchronization manager bound to the current transaction 576 * @param status object representing the transaction 577 */ 578 private Mono<Void> triggerBeforeCommit(TransactionSynchronizationManager synchronizationManager, 579 GenericReactiveTransaction status) { 580 581 if (status.isNewSynchronization()) { 582 if (status.isDebug()) { 583 logger.trace("Triggering beforeCommit synchronization"); 584 } 585 return TransactionSynchronizationUtils.triggerBeforeCommit( 586 synchronizationManager.getSynchronizations(), status.isReadOnly()); 587 } 588 return Mono.empty(); 589 } 590 591 /** 592 * Trigger {@code beforeCompletion} callbacks. 593 * @param synchronizationManager the synchronization manager bound to the current transaction 594 * @param status object representing the transaction 595 */ 596 private Mono<Void> triggerBeforeCompletion(TransactionSynchronizationManager synchronizationManager, 597 GenericReactiveTransaction status) { 598 599 if (status.isNewSynchronization()) { 600 if (status.isDebug()) { 601 logger.trace("Triggering beforeCompletion synchronization"); 602 } 603 return TransactionSynchronizationUtils.triggerBeforeCompletion(synchronizationManager.getSynchronizations()); 604 } 605 return Mono.empty(); 606 } 607 608 /** 609 * Trigger {@code afterCommit} callbacks. 610 * @param synchronizationManager the synchronization manager bound to the current transaction 611 * @param status object representing the transaction 612 */ 613 private Mono<Void> triggerAfterCommit(TransactionSynchronizationManager synchronizationManager, 614 GenericReactiveTransaction status) { 615 616 if (status.isNewSynchronization()) { 617 if (status.isDebug()) { 618 logger.trace("Triggering afterCommit synchronization"); 619 } 620 return TransactionSynchronizationUtils.invokeAfterCommit(synchronizationManager.getSynchronizations()); 621 } 622 return Mono.empty(); 623 } 624 625 /** 626 * Trigger {@code afterCompletion} callbacks. 627 * @param synchronizationManager the synchronization manager bound to the current transaction 628 * @param status object representing the transaction 629 * @param completionStatus completion status according to TransactionSynchronization constants 630 */ 631 private Mono<Void> triggerAfterCompletion(TransactionSynchronizationManager synchronizationManager, 632 GenericReactiveTransaction status, int completionStatus) { 633 634 if (status.isNewSynchronization()) { 635 List<TransactionSynchronization> synchronizations = synchronizationManager.getSynchronizations(); 636 synchronizationManager.clearSynchronization(); 637 if (!status.hasTransaction() || status.isNewTransaction()) { 638 if (status.isDebug()) { 639 logger.trace("Triggering afterCompletion synchronization"); 640 } 641 // No transaction or new transaction for the current scope -> 642 // invoke the afterCompletion callbacks immediately 643 return invokeAfterCompletion(synchronizationManager, synchronizations, completionStatus); 644 } 645 else if (!synchronizations.isEmpty()) { 646 // Existing transaction that we participate in, controlled outside 647 // of the scope of this Spring transaction manager -> try to register 648 // an afterCompletion callback with the existing (JTA) transaction. 649 return registerAfterCompletionWithExistingTransaction( 650 synchronizationManager, status.getTransaction(), synchronizations); 651 } 652 } 653 return Mono.empty(); 654 } 655 656 /** 657 * Actually invoke the {@code afterCompletion} methods of the 658 * given TransactionSynchronization objects. 659 * <p>To be called by this abstract manager itself, or by special implementations 660 * of the {@code registerAfterCompletionWithExistingTransaction} callback. 661 * @param synchronizationManager the synchronization manager bound to the current transaction 662 * @param synchronizations a List of TransactionSynchronization objects 663 * @param completionStatus the completion status according to the 664 * constants in the TransactionSynchronization interface 665 * @see #registerAfterCompletionWithExistingTransaction(TransactionSynchronizationManager, Object, List) 666 * @see TransactionSynchronization#STATUS_COMMITTED 667 * @see TransactionSynchronization#STATUS_ROLLED_BACK 668 * @see TransactionSynchronization#STATUS_UNKNOWN 669 */ 670 private Mono<Void> invokeAfterCompletion(TransactionSynchronizationManager synchronizationManager, 671 List<TransactionSynchronization> synchronizations, int completionStatus) { 672 673 return TransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus); 674 } 675 676 /** 677 * Clean up after completion, clearing synchronization if necessary, 678 * and invoking doCleanupAfterCompletion. 679 * @param synchronizationManager the synchronization manager bound to the current transaction 680 * @param status object representing the transaction 681 * @see #doCleanupAfterCompletion 682 */ 683 private Mono<Void> cleanupAfterCompletion(TransactionSynchronizationManager synchronizationManager, 684 GenericReactiveTransaction status) { 685 686 return Mono.defer(() -> { 687 status.setCompleted(); 688 if (status.isNewSynchronization()) { 689 synchronizationManager.clear(); 690 } 691 Mono<Void> cleanup = Mono.empty(); 692 if (status.isNewTransaction()) { 693 cleanup = doCleanupAfterCompletion(synchronizationManager, status.getTransaction()); 694 } 695 if (status.getSuspendedResources() != null) { 696 if (status.isDebug()) { 697 logger.debug("Resuming suspended transaction after completion of inner transaction"); 698 } 699 Object transaction = (status.hasTransaction() ? status.getTransaction() : null); 700 return cleanup.then(resume(synchronizationManager, transaction, 701 (SuspendedResourcesHolder) status.getSuspendedResources())); 702 } 703 return cleanup; 704 }); 705 } 706 707 708 //--------------------------------------------------------------------- 709 // Template methods to be implemented in subclasses 710 //--------------------------------------------------------------------- 711 712 /** 713 * Return a transaction object for the current transaction state. 714 * <p>The returned object will usually be specific to the concrete transaction 715 * manager implementation, carrying corresponding transaction state in a 716 * modifiable fashion. This object will be passed into the other template 717 * methods (e.g. doBegin and doCommit), either directly or as part of a 718 * DefaultReactiveTransactionStatus instance. 719 * <p>The returned object should contain information about any existing 720 * transaction, that is, a transaction that has already started before the 721 * current {@code getTransaction} call on the transaction manager. 722 * Consequently, a {@code doGetTransaction} implementation will usually 723 * look for an existing transaction and store corresponding state in the 724 * returned transaction object. 725 * @param synchronizationManager the synchronization manager bound to the current transaction 726 * @return the current transaction object 727 * @throws org.springframework.transaction.CannotCreateTransactionException 728 * if transaction support is not available 729 * @throws TransactionException in case of lookup or system errors 730 * @see #doBegin 731 * @see #doCommit 732 * @see #doRollback 733 * @see GenericReactiveTransaction#getTransaction 734 */ 735 protected abstract Object doGetTransaction(TransactionSynchronizationManager synchronizationManager) 736 throws TransactionException; 737 738 /** 739 * Check if the given transaction object indicates an existing transaction 740 * (that is, a transaction which has already started). 741 * <p>The result will be evaluated according to the specified propagation 742 * behavior for the new transaction. An existing transaction might get 743 * suspended (in case of PROPAGATION_REQUIRES_NEW), or the new transaction 744 * might participate in the existing one (in case of PROPAGATION_REQUIRED). 745 * <p>The default implementation returns {@code false}, assuming that 746 * participating in existing transactions is generally not supported. 747 * Subclasses are of course encouraged to provide such support. 748 * @param transaction the transaction object returned by doGetTransaction 749 * @return if there is an existing transaction 750 * @throws TransactionException in case of system errors 751 * @see #doGetTransaction 752 */ 753 protected boolean isExistingTransaction(Object transaction) throws TransactionException { 754 return false; 755 } 756 757 /** 758 * Begin a new transaction with semantics according to the given transaction 759 * definition. Does not have to care about applying the propagation behavior, 760 * as this has already been handled by this abstract manager. 761 * <p>This method gets called when the transaction manager has decided to actually 762 * start a new transaction. Either there wasn't any transaction before, or the 763 * previous transaction has been suspended. 764 * <p>A special scenario is a nested transaction: This method will be called to 765 * start a nested transaction when necessary. In such a context, there will be an 766 * active transaction: The implementation of this method has to detect this and 767 * start an appropriate nested transaction. 768 * @param synchronizationManager the synchronization manager bound to the new transaction 769 * @param transaction the transaction object returned by {@code doGetTransaction} 770 * @param definition a TransactionDefinition instance, describing propagation 771 * behavior, isolation level, read-only flag, timeout, and transaction name 772 * @throws TransactionException in case of creation or system errors 773 * @throws org.springframework.transaction.NestedTransactionNotSupportedException 774 * if the underlying transaction does not support nesting (e.g. through savepoints) 775 */ 776 protected abstract Mono<Void> doBegin(TransactionSynchronizationManager synchronizationManager, 777 Object transaction, TransactionDefinition definition) throws TransactionException; 778 779 /** 780 * Suspend the resources of the current transaction. 781 * Transaction synchronization will already have been suspended. 782 * <p>The default implementation throws a TransactionSuspensionNotSupportedException, 783 * assuming that transaction suspension is generally not supported. 784 * @param synchronizationManager the synchronization manager bound to the current transaction 785 * @param transaction the transaction object returned by {@code doGetTransaction} 786 * @return an object that holds suspended resources 787 * (will be kept unexamined for passing it into doResume) 788 * @throws org.springframework.transaction.TransactionSuspensionNotSupportedException 789 * if suspending is not supported by the transaction manager implementation 790 * @throws TransactionException in case of system errors 791 * @see #doResume 792 */ 793 protected Mono<Object> doSuspend(TransactionSynchronizationManager synchronizationManager, 794 Object transaction) throws TransactionException { 795 796 throw new TransactionSuspensionNotSupportedException( 797 "Transaction manager [" + getClass().getName() + "] does not support transaction suspension"); 798 } 799 800 /** 801 * Resume the resources of the current transaction. 802 * Transaction synchronization will be resumed afterwards. 803 * <p>The default implementation throws a TransactionSuspensionNotSupportedException, 804 * assuming that transaction suspension is generally not supported. 805 * @param synchronizationManager the synchronization manager bound to the current transaction 806 * @param transaction the transaction object returned by {@code doGetTransaction} 807 * @param suspendedResources the object that holds suspended resources, 808 * as returned by doSuspend 809 * @throws org.springframework.transaction.TransactionSuspensionNotSupportedException 810 * if suspending is not supported by the transaction manager implementation 811 * @throws TransactionException in case of system errors 812 * @see #doSuspend 813 */ 814 protected Mono<Void> doResume(TransactionSynchronizationManager synchronizationManager, 815 @Nullable Object transaction, Object suspendedResources) throws TransactionException { 816 817 throw new TransactionSuspensionNotSupportedException( 818 "Transaction manager [" + getClass().getName() + "] does not support transaction suspension"); 819 } 820 821 /** 822 * Make preparations for commit, to be performed before the 823 * {@code beforeCommit} synchronization callbacks occur. 824 * <p>Note that exceptions will get propagated to the commit caller 825 * and cause a rollback of the transaction. 826 * @param synchronizationManager the synchronization manager bound to the current transaction 827 * @param status the status representation of the transaction 828 * @throws RuntimeException in case of errors; will be <b>propagated to the caller</b> 829 * (note: do not throw TransactionException subclasses here!) 830 */ 831 protected Mono<Void> prepareForCommit(TransactionSynchronizationManager synchronizationManager, 832 GenericReactiveTransaction status) { 833 834 return Mono.empty(); 835 } 836 837 /** 838 * Perform an actual commit of the given transaction. 839 * <p>An implementation does not need to check the "new transaction" flag 840 * or the rollback-only flag; this will already have been handled before. 841 * Usually, a straight commit will be performed on the transaction object 842 * contained in the passed-in status. 843 * @param synchronizationManager the synchronization manager bound to the current transaction 844 * @param status the status representation of the transaction 845 * @throws TransactionException in case of commit or system errors 846 * @see GenericReactiveTransaction#getTransaction 847 */ 848 protected abstract Mono<Void> doCommit(TransactionSynchronizationManager synchronizationManager, 849 GenericReactiveTransaction status) throws TransactionException; 850 851 /** 852 * Perform an actual rollback of the given transaction. 853 * <p>An implementation does not need to check the "new transaction" flag; 854 * this will already have been handled before. Usually, a straight rollback 855 * will be performed on the transaction object contained in the passed-in status. 856 * @param synchronizationManager the synchronization manager bound to the current transaction 857 * @param status the status representation of the transaction 858 * @throws TransactionException in case of system errors 859 * @see GenericReactiveTransaction#getTransaction 860 */ 861 protected abstract Mono<Void> doRollback(TransactionSynchronizationManager synchronizationManager, 862 GenericReactiveTransaction status) throws TransactionException; 863 864 /** 865 * Set the given transaction rollback-only. Only called on rollback 866 * if the current transaction participates in an existing one. 867 * <p>The default implementation throws an IllegalTransactionStateException, 868 * assuming that participating in existing transactions is generally not 869 * supported. Subclasses are of course encouraged to provide such support. 870 * @param synchronizationManager the synchronization manager bound to the current transaction 871 * @param status the status representation of the transaction 872 * @throws TransactionException in case of system errors 873 */ 874 protected Mono<Void> doSetRollbackOnly(TransactionSynchronizationManager synchronizationManager, 875 GenericReactiveTransaction status) throws TransactionException { 876 877 throw new IllegalTransactionStateException( 878 "Participating in existing transactions is not supported - when 'isExistingTransaction' " + 879 "returns true, appropriate 'doSetRollbackOnly' behavior must be provided"); 880 } 881 882 /** 883 * Register the given list of transaction synchronizations with the existing transaction. 884 * <p>Invoked when the control of the Spring transaction manager and thus all Spring 885 * transaction synchronizations end, without the transaction being completed yet. This 886 * is for example the case when participating in an existing JTA or EJB CMT transaction. 887 * <p>The default implementation simply invokes the {@code afterCompletion} methods 888 * immediately, passing in "STATUS_UNKNOWN". This is the best we can do if there's no 889 * chance to determine the actual outcome of the outer transaction. 890 * @param synchronizationManager the synchronization manager bound to the current transaction 891 * @param transaction the transaction object returned by {@code doGetTransaction} 892 * @param synchronizations a List of TransactionSynchronization objects 893 * @throws TransactionException in case of system errors 894 * @see #invokeAfterCompletion(TransactionSynchronizationManager, List, int) 895 * @see TransactionSynchronization#afterCompletion(int) 896 * @see TransactionSynchronization#STATUS_UNKNOWN 897 */ 898 protected Mono<Void> registerAfterCompletionWithExistingTransaction(TransactionSynchronizationManager synchronizationManager, 899 Object transaction, List<TransactionSynchronization> synchronizations) throws TransactionException { 900 901 logger.debug("Cannot register Spring after-completion synchronization with existing transaction - " + 902 "processing Spring after-completion callbacks immediately, with outcome status 'unknown'"); 903 return invokeAfterCompletion(synchronizationManager, synchronizations, TransactionSynchronization.STATUS_UNKNOWN); 904 } 905 906 /** 907 * Cleanup resources after transaction completion. 908 * <p>Called after {@code doCommit} and {@code doRollback} execution, 909 * on any outcome. The default implementation does nothing. 910 * <p>Should not throw any exceptions but just issue warnings on errors. 911 * @param synchronizationManager the synchronization manager bound to the current transaction 912 * @param transaction the transaction object returned by {@code doGetTransaction} 913 */ 914 protected Mono<Void> doCleanupAfterCompletion(TransactionSynchronizationManager synchronizationManager, 915 Object transaction) { 916 917 return Mono.empty(); 918 } 919 920 921 //--------------------------------------------------------------------- 922 // Serialization support 923 //--------------------------------------------------------------------- 924 925 private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException { 926 // Rely on default serialization; just initialize state after deserialization. 927 ois.defaultReadObject(); 928 929 // Initialize transient fields. 930 this.logger = LogFactory.getLog(getClass()); 931 } 932 933 934 /** 935 * Holder for suspended resources. 936 * Used internally by {@code suspend} and {@code resume}. 937 */ 938 protected static final class SuspendedResourcesHolder { 939 940 @Nullable 941 private final Object suspendedResources; 942 943 @Nullable 944 private List<TransactionSynchronization> suspendedSynchronizations; 945 946 @Nullable 947 private String name; 948 949 private boolean readOnly; 950 951 @Nullable 952 private Integer isolationLevel; 953 954 private boolean wasActive; 955 956 private SuspendedResourcesHolder(@Nullable Object suspendedResources) { 957 this.suspendedResources = suspendedResources; 958 } 959 960 private SuspendedResourcesHolder( 961 @Nullable Object suspendedResources, List<TransactionSynchronization> suspendedSynchronizations, 962 @Nullable String name, boolean readOnly, @Nullable Integer isolationLevel, boolean wasActive) { 963 964 this.suspendedResources = suspendedResources; 965 this.suspendedSynchronizations = suspendedSynchronizations; 966 this.name = name; 967 this.readOnly = readOnly; 968 this.isolationLevel = isolationLevel; 969 this.wasActive = wasActive; 970 } 971 } 972 973 974 /** 975 * Predicates for exception types that transactional error handling applies to. 976 */ 977 private enum ErrorPredicates implements Predicate<Throwable> { 978 979 /** 980 * Predicate matching {@link RuntimeException} or {@link Error}. 981 */ 982 RUNTIME_OR_ERROR { 983 @Override 984 public boolean test(Throwable throwable) { 985 return throwable instanceof RuntimeException || throwable instanceof Error; 986 } 987 }, 988 989 /** 990 * Predicate matching {@link TransactionException}. 991 */ 992 TRANSACTION_EXCEPTION { 993 @Override 994 public boolean test(Throwable throwable) { 995 return throwable instanceof TransactionException; 996 } 997 }, 998 999 /** 1000 * Predicate matching {@link UnexpectedRollbackException}. 1001 */ 1002 UNEXPECTED_ROLLBACK { 1003 @Override 1004 public boolean test(Throwable throwable) { 1005 return throwable instanceof UnexpectedRollbackException; 1006 } 1007 }; 1008 1009 @Override 1010 public abstract boolean test(Throwable throwable); 1011 } 1012 1013}