001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.transaction.reactive;
018
019import reactor.core.publisher.Mono;
020
021/**
022 * {@link TransactionSynchronization} implementation that manages a
023 * resource object bound through {@link TransactionSynchronizationManager}.
024 *
025 * @author Mark Paluch
026 * @author Juergen Hoeller
027 * @since 5.2
028 * @param <O> the resource holder type
029 * @param <K> the resource key type
030 */
031public abstract class ReactiveResourceSynchronization<O, K> implements TransactionSynchronization {
032
033        private final O resourceObject;
034
035        private final K resourceKey;
036
037        private final TransactionSynchronizationManager synchronizationManager;
038
039        private volatile boolean holderActive = true;
040
041
042        /**
043         * Create a new ReactiveResourceSynchronization for the given holder.
044         * @param resourceObject the resource object to manage
045         * @param resourceKey the key to bind the resource object for
046         * @param synchronizationManager the synchronization manager bound to the current transaction
047         * @see TransactionSynchronizationManager#bindResource
048         */
049        public ReactiveResourceSynchronization(
050                        O resourceObject, K resourceKey, TransactionSynchronizationManager synchronizationManager) {
051
052                this.resourceObject = resourceObject;
053                this.resourceKey = resourceKey;
054                this.synchronizationManager = synchronizationManager;
055        }
056
057
058        @Override
059        public Mono<Void> suspend() {
060                if (this.holderActive) {
061                        this.synchronizationManager.unbindResource(this.resourceKey);
062                }
063                return Mono.empty();
064        }
065
066        @Override
067        public Mono<Void> resume() {
068                if (this.holderActive) {
069                        this.synchronizationManager.bindResource(this.resourceKey, this.resourceObject);
070                }
071                return Mono.empty();
072        }
073
074        @Override
075        public Mono<Void> beforeCommit(boolean readOnly) {
076                return Mono.empty();
077        }
078
079        @Override
080        public Mono<Void> beforeCompletion() {
081                if (shouldUnbindAtCompletion()) {
082                        this.synchronizationManager.unbindResource(this.resourceKey);
083                        this.holderActive = false;
084                        if (shouldReleaseBeforeCompletion()) {
085                                return releaseResource(this.resourceObject, this.resourceKey);
086                        }
087                }
088                return Mono.empty();
089        }
090
091        @Override
092        public Mono<Void> afterCommit() {
093                if (!shouldReleaseBeforeCompletion()) {
094                        return processResourceAfterCommit(this.resourceObject);
095                }
096                return Mono.empty();
097        }
098
099        @Override
100        public Mono<Void> afterCompletion(int status) {
101                return Mono.defer(() -> {
102                        Mono<Void> sync = Mono.empty();
103                        if (shouldUnbindAtCompletion()) {
104                                boolean releaseNecessary = false;
105                                if (this.holderActive) {
106                                        // The thread-bound resource holder might not be available anymore,
107                                        // since afterCompletion might get called from a different thread.
108                                        this.holderActive = false;
109                                        this.synchronizationManager.unbindResourceIfPossible(this.resourceKey);
110                                        releaseNecessary = true;
111                                }
112                                else {
113                                        releaseNecessary = shouldReleaseAfterCompletion(this.resourceObject);
114                                }
115                                if (releaseNecessary) {
116                                        sync = releaseResource(this.resourceObject, this.resourceKey);
117                                }
118                        }
119                        else {
120                                // Probably a pre-bound resource...
121                                sync = cleanupResource(this.resourceObject, this.resourceKey, (status == STATUS_COMMITTED));
122                        }
123                        return sync;
124                });
125        }
126
127
128        /**
129         * Return whether this holder should be unbound at completion
130         * (or should rather be left bound to the thread after the transaction).
131         * <p>The default implementation returns {@code true}.
132         */
133        protected boolean shouldUnbindAtCompletion() {
134                return true;
135        }
136
137        /**
138         * Return whether this holder's resource should be released before
139         * transaction completion ({@code true}) or rather after
140         * transaction completion ({@code false}).
141         * <p>Note that resources will only be released when they are
142         * unbound from the thread ({@link #shouldUnbindAtCompletion()}).
143         * <p>The default implementation returns {@code true}.
144         * @see #releaseResource
145         */
146        protected boolean shouldReleaseBeforeCompletion() {
147                return true;
148        }
149
150        /**
151         * Return whether this holder's resource should be released after
152         * transaction completion ({@code true}).
153         * <p>The default implementation returns {@code !shouldReleaseBeforeCompletion()},
154         * releasing after completion if no attempt was made before completion.
155         * @see #releaseResource
156         */
157        protected boolean shouldReleaseAfterCompletion(O resourceHolder) {
158                return !shouldReleaseBeforeCompletion();
159        }
160
161        /**
162         * After-commit callback for the given resource holder.
163         * Only called when the resource hasn't been released yet
164         * ({@link #shouldReleaseBeforeCompletion()}).
165         * @param resourceHolder the resource holder to process
166         */
167        protected Mono<Void> processResourceAfterCommit(O resourceHolder) {
168                return Mono.empty();
169        }
170
171        /**
172         * Release the given resource (after it has been unbound from the thread).
173         * @param resourceHolder the resource holder to process
174         * @param resourceKey the key that the resource object was bound for
175         */
176        protected Mono<Void> releaseResource(O resourceHolder, K resourceKey) {
177                return Mono.empty();
178        }
179
180        /**
181         * Perform a cleanup on the given resource (which is left bound to the thread).
182         * @param resourceHolder the resource holder to process
183         * @param resourceKey the key that the resource object was bound for
184         * @param committed whether the transaction has committed ({@code true})
185         * or rolled back ({@code false})
186         */
187        protected Mono<Void> cleanupResource(O resourceHolder, K resourceKey, boolean committed) {
188                return Mono.empty();
189        }
190
191}