001/*
002 * Copyright 2013-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.scope.context;
017
018import java.util.Map;
019import java.util.Stack;
020import java.util.concurrent.ConcurrentHashMap;
021import java.util.concurrent.atomic.AtomicInteger;
022
023import org.springframework.batch.core.jsr.configuration.support.BatchPropertyContext;
024import org.springframework.lang.Nullable;
025
026
027/**
028 * Central convenience class for framework use in managing the scope
029 * context.
030 *
031 * @author Dave Syer
032 * @author Jimmy Praet
033 * @author Mahmoud Ben Hassine
034 * @since 3.0
035 */
036public abstract class SynchronizationManagerSupport<E, C> {
037
038        /*
039         * We have to deal with single and multi-threaded execution, with a single
040         * and with multiple step execution instances. That's 2x2 = 4 scenarios.
041         */
042
043        /**
044         * Storage for the current execution; has to be ThreadLocal because it
045         * is needed to locate a context in components that are not part of a
046         * step/job (like when re-hydrating a scoped proxy). Doesn't use
047         * InheritableThreadLocal because there are side effects if a step is trying
048         * to run multiple child steps (e.g. with partitioning). The Stack is used
049         * to cover the single threaded case, so that the API is the same as
050         * multi-threaded.
051         */
052        private final ThreadLocal<Stack<E>> executionHolder = new ThreadLocal<Stack<E>>();
053
054        /**
055         * Reference counter for each execution: how many threads are using the
056         * same one?
057         */
058        private final Map<E, AtomicInteger> counts = new ConcurrentHashMap<E, AtomicInteger>();
059
060        /**
061         * Simple map from a running execution to the associated context.
062         */
063        private final Map<E, C> contexts = new ConcurrentHashMap<E, C>();
064
065        /**
066         * Getter for the current context if there is one, otherwise returns {@code null}.
067         *
068         * @return the current context or {@code null} if there is none (if one
069         *         has not been registered for this thread).
070         */
071        @Nullable
072        public C getContext() {
073                if (getCurrent().isEmpty()) {
074                        return null;
075                }
076                synchronized (contexts) {
077                        return contexts.get(getCurrent().peek());
078                }
079        }
080
081        /**
082         * Register a context with the current thread - always put a matching {@link #close()} call in a finally block to
083         * ensure that the correct
084         * context is available in the enclosing block.
085         *
086         * @param execution the execution to register
087         * @return a new context or the current one if it has the same
088         *         execution
089         */
090        @Nullable
091        public C register(@Nullable E execution) {
092                if (execution == null) {
093                        return null;
094                }
095                getCurrent().push(execution);
096                C context;
097                synchronized (contexts) {
098                        context = contexts.get(execution);
099                        if (context == null) {
100                                context = createNewContext(execution, null);
101                                contexts.put(execution, context);
102                        }
103                }
104                increment();
105                return context;
106        }
107
108        /**
109         * Register a context with the current thread - always put a matching {@link #close()} call in a finally block to
110         * ensure that the correct
111         * context is available in the enclosing block.
112         *
113         * @param execution the execution to register
114         * @param propertyContext instance of {@link BatchPropertyContext} to be registered with this thread.
115         * @return a new context or the current one if it has the same
116         *         execution
117         */
118        @Nullable
119        public C register(@Nullable E execution, @Nullable BatchPropertyContext propertyContext) {
120                if (execution == null) {
121                        return null;
122                }
123                getCurrent().push(execution);
124                C context;
125                synchronized (contexts) {
126                        context = contexts.get(execution);
127                        if (context == null) {
128                                context = createNewContext(execution, propertyContext);
129                                contexts.put(execution, context);
130                        }
131                }
132                increment();
133                return context;
134        }
135
136        /**
137         * Method for unregistering the current context - should always and only be
138         * used by in conjunction with a matching {@link #register(Object)} to ensure that {@link #getContext()} always returns
139         * the correct value.
140         * Does not call close on the context - that is left up to the caller
141         * because he has a reference to the context (having registered it) and only
142         * he has knowledge of when the execution actually ended.
143         */
144        public void close() {
145                C oldSession = getContext();
146                if (oldSession == null) {
147                        return;
148                }
149                decrement();
150        }
151
152        private void decrement() {
153                E current = getCurrent().pop();
154                if (current != null) {
155                        int remaining = counts.get(current).decrementAndGet();
156                        if (remaining <= 0) {
157                                synchronized (contexts) {
158                                        contexts.remove(current);
159                                        counts.remove(current);
160                                }
161                        }
162                }
163        }
164
165        public void increment() {
166                E current = getCurrent().peek();
167                if (current != null) {
168                        AtomicInteger count;
169                        synchronized (counts) {
170                                count = counts.get(current);
171                                if (count == null) {
172                                        count = new AtomicInteger();
173                                        counts.put(current, count);
174                                }
175                        }
176                        count.incrementAndGet();
177                }
178        }
179
180        public Stack<E> getCurrent() {
181                if (executionHolder.get() == null) {
182                        executionHolder.set(new Stack<E>());
183                }
184                return executionHolder.get();
185        }
186
187        /**
188         * A convenient "deep" close operation. Call this instead of {@link #close()} if the execution for the current
189         * context is ending.
190         * Delegates to {@link #close(Object)} and then ensures that {@link #close()} is also called in a finally block.
191         */
192        public void release() {
193                C context = getContext();
194                try {
195                        if (context != null) {
196                                close(context);
197                        }
198                } finally {
199                        close();
200                }
201        }
202
203        protected abstract void close(C context);
204
205        protected abstract C createNewContext(E execution, @Nullable BatchPropertyContext propertyContext);
206
207}