001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.transaction.reactive; 018 019import reactor.core.publisher.Mono; 020 021/** 022 * {@link TransactionSynchronization} implementation that manages a 023 * resource object bound through {@link TransactionSynchronizationManager}. 024 * 025 * @author Mark Paluch 026 * @author Juergen Hoeller 027 * @since 5.2 028 * @param <O> the resource holder type 029 * @param <K> the resource key type 030 */ 031public abstract class ReactiveResourceSynchronization<O, K> implements TransactionSynchronization { 032 033 private final O resourceObject; 034 035 private final K resourceKey; 036 037 private final TransactionSynchronizationManager synchronizationManager; 038 039 private volatile boolean holderActive = true; 040 041 042 /** 043 * Create a new ReactiveResourceSynchronization for the given holder. 044 * @param resourceObject the resource object to manage 045 * @param resourceKey the key to bind the resource object for 046 * @param synchronizationManager the synchronization manager bound to the current transaction 047 * @see TransactionSynchronizationManager#bindResource 048 */ 049 public ReactiveResourceSynchronization( 050 O resourceObject, K resourceKey, TransactionSynchronizationManager synchronizationManager) { 051 052 this.resourceObject = resourceObject; 053 this.resourceKey = resourceKey; 054 this.synchronizationManager = synchronizationManager; 055 } 056 057 058 @Override 059 public Mono<Void> suspend() { 060 if (this.holderActive) { 061 this.synchronizationManager.unbindResource(this.resourceKey); 062 } 063 return Mono.empty(); 064 } 065 066 @Override 067 public Mono<Void> resume() { 068 if (this.holderActive) { 069 this.synchronizationManager.bindResource(this.resourceKey, this.resourceObject); 070 } 071 return Mono.empty(); 072 } 073 074 @Override 075 public Mono<Void> beforeCommit(boolean readOnly) { 076 return Mono.empty(); 077 } 078 079 @Override 080 public Mono<Void> beforeCompletion() { 081 if (shouldUnbindAtCompletion()) { 082 this.synchronizationManager.unbindResource(this.resourceKey); 083 this.holderActive = false; 084 if (shouldReleaseBeforeCompletion()) { 085 return releaseResource(this.resourceObject, this.resourceKey); 086 } 087 } 088 return Mono.empty(); 089 } 090 091 @Override 092 public Mono<Void> afterCommit() { 093 if (!shouldReleaseBeforeCompletion()) { 094 return processResourceAfterCommit(this.resourceObject); 095 } 096 return Mono.empty(); 097 } 098 099 @Override 100 public Mono<Void> afterCompletion(int status) { 101 return Mono.defer(() -> { 102 Mono<Void> sync = Mono.empty(); 103 if (shouldUnbindAtCompletion()) { 104 boolean releaseNecessary = false; 105 if (this.holderActive) { 106 // The thread-bound resource holder might not be available anymore, 107 // since afterCompletion might get called from a different thread. 108 this.holderActive = false; 109 this.synchronizationManager.unbindResourceIfPossible(this.resourceKey); 110 releaseNecessary = true; 111 } 112 else { 113 releaseNecessary = shouldReleaseAfterCompletion(this.resourceObject); 114 } 115 if (releaseNecessary) { 116 sync = releaseResource(this.resourceObject, this.resourceKey); 117 } 118 } 119 else { 120 // Probably a pre-bound resource... 121 sync = cleanupResource(this.resourceObject, this.resourceKey, (status == STATUS_COMMITTED)); 122 } 123 return sync; 124 }); 125 } 126 127 128 /** 129 * Return whether this holder should be unbound at completion 130 * (or should rather be left bound to the thread after the transaction). 131 * <p>The default implementation returns {@code true}. 132 */ 133 protected boolean shouldUnbindAtCompletion() { 134 return true; 135 } 136 137 /** 138 * Return whether this holder's resource should be released before 139 * transaction completion ({@code true}) or rather after 140 * transaction completion ({@code false}). 141 * <p>Note that resources will only be released when they are 142 * unbound from the thread ({@link #shouldUnbindAtCompletion()}). 143 * <p>The default implementation returns {@code true}. 144 * @see #releaseResource 145 */ 146 protected boolean shouldReleaseBeforeCompletion() { 147 return true; 148 } 149 150 /** 151 * Return whether this holder's resource should be released after 152 * transaction completion ({@code true}). 153 * <p>The default implementation returns {@code !shouldReleaseBeforeCompletion()}, 154 * releasing after completion if no attempt was made before completion. 155 * @see #releaseResource 156 */ 157 protected boolean shouldReleaseAfterCompletion(O resourceHolder) { 158 return !shouldReleaseBeforeCompletion(); 159 } 160 161 /** 162 * After-commit callback for the given resource holder. 163 * Only called when the resource hasn't been released yet 164 * ({@link #shouldReleaseBeforeCompletion()}). 165 * @param resourceHolder the resource holder to process 166 */ 167 protected Mono<Void> processResourceAfterCommit(O resourceHolder) { 168 return Mono.empty(); 169 } 170 171 /** 172 * Release the given resource (after it has been unbound from the thread). 173 * @param resourceHolder the resource holder to process 174 * @param resourceKey the key that the resource object was bound for 175 */ 176 protected Mono<Void> releaseResource(O resourceHolder, K resourceKey) { 177 return Mono.empty(); 178 } 179 180 /** 181 * Perform a cleanup on the given resource (which is left bound to the thread). 182 * @param resourceHolder the resource holder to process 183 * @param resourceKey the key that the resource object was bound for 184 * @param committed whether the transaction has committed ({@code true}) 185 * or rolled back ({@code false}) 186 */ 187 protected Mono<Void> cleanupResource(O resourceHolder, K resourceKey, boolean committed) { 188 return Mono.empty(); 189 } 190 191}