001/* 002 * Copyright 2013-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.scope.context; 017 018import java.util.Map; 019import java.util.Stack; 020import java.util.concurrent.ConcurrentHashMap; 021import java.util.concurrent.atomic.AtomicInteger; 022 023import org.springframework.batch.core.jsr.configuration.support.BatchPropertyContext; 024import org.springframework.lang.Nullable; 025 026 027/** 028 * Central convenience class for framework use in managing the scope 029 * context. 030 * 031 * @author Dave Syer 032 * @author Jimmy Praet 033 * @author Mahmoud Ben Hassine 034 * @since 3.0 035 */ 036public abstract class SynchronizationManagerSupport<E, C> { 037 038 /* 039 * We have to deal with single and multi-threaded execution, with a single 040 * and with multiple step execution instances. That's 2x2 = 4 scenarios. 041 */ 042 043 /** 044 * Storage for the current execution; has to be ThreadLocal because it 045 * is needed to locate a context in components that are not part of a 046 * step/job (like when re-hydrating a scoped proxy). Doesn't use 047 * InheritableThreadLocal because there are side effects if a step is trying 048 * to run multiple child steps (e.g. with partitioning). The Stack is used 049 * to cover the single threaded case, so that the API is the same as 050 * multi-threaded. 051 */ 052 private final ThreadLocal<Stack<E>> executionHolder = new ThreadLocal<Stack<E>>(); 053 054 /** 055 * Reference counter for each execution: how many threads are using the 056 * same one? 057 */ 058 private final Map<E, AtomicInteger> counts = new ConcurrentHashMap<E, AtomicInteger>(); 059 060 /** 061 * Simple map from a running execution to the associated context. 062 */ 063 private final Map<E, C> contexts = new ConcurrentHashMap<E, C>(); 064 065 /** 066 * Getter for the current context if there is one, otherwise returns {@code null}. 067 * 068 * @return the current context or {@code null} if there is none (if one 069 * has not been registered for this thread). 070 */ 071 @Nullable 072 public C getContext() { 073 if (getCurrent().isEmpty()) { 074 return null; 075 } 076 synchronized (contexts) { 077 return contexts.get(getCurrent().peek()); 078 } 079 } 080 081 /** 082 * Register a context with the current thread - always put a matching {@link #close()} call in a finally block to 083 * ensure that the correct 084 * context is available in the enclosing block. 085 * 086 * @param execution the execution to register 087 * @return a new context or the current one if it has the same 088 * execution 089 */ 090 @Nullable 091 public C register(@Nullable E execution) { 092 if (execution == null) { 093 return null; 094 } 095 getCurrent().push(execution); 096 C context; 097 synchronized (contexts) { 098 context = contexts.get(execution); 099 if (context == null) { 100 context = createNewContext(execution, null); 101 contexts.put(execution, context); 102 } 103 } 104 increment(); 105 return context; 106 } 107 108 /** 109 * Register a context with the current thread - always put a matching {@link #close()} call in a finally block to 110 * ensure that the correct 111 * context is available in the enclosing block. 112 * 113 * @param execution the execution to register 114 * @param propertyContext instance of {@link BatchPropertyContext} to be registered with this thread. 115 * @return a new context or the current one if it has the same 116 * execution 117 */ 118 @Nullable 119 public C register(@Nullable E execution, @Nullable BatchPropertyContext propertyContext) { 120 if (execution == null) { 121 return null; 122 } 123 getCurrent().push(execution); 124 C context; 125 synchronized (contexts) { 126 context = contexts.get(execution); 127 if (context == null) { 128 context = createNewContext(execution, propertyContext); 129 contexts.put(execution, context); 130 } 131 } 132 increment(); 133 return context; 134 } 135 136 /** 137 * Method for unregistering the current context - should always and only be 138 * used by in conjunction with a matching {@link #register(Object)} to ensure that {@link #getContext()} always returns 139 * the correct value. 140 * Does not call close on the context - that is left up to the caller 141 * because he has a reference to the context (having registered it) and only 142 * he has knowledge of when the execution actually ended. 143 */ 144 public void close() { 145 C oldSession = getContext(); 146 if (oldSession == null) { 147 return; 148 } 149 decrement(); 150 } 151 152 private void decrement() { 153 E current = getCurrent().pop(); 154 if (current != null) { 155 int remaining = counts.get(current).decrementAndGet(); 156 if (remaining <= 0) { 157 synchronized (contexts) { 158 contexts.remove(current); 159 counts.remove(current); 160 } 161 } 162 } 163 } 164 165 public void increment() { 166 E current = getCurrent().peek(); 167 if (current != null) { 168 AtomicInteger count; 169 synchronized (counts) { 170 count = counts.get(current); 171 if (count == null) { 172 count = new AtomicInteger(); 173 counts.put(current, count); 174 } 175 } 176 count.incrementAndGet(); 177 } 178 } 179 180 public Stack<E> getCurrent() { 181 if (executionHolder.get() == null) { 182 executionHolder.set(new Stack<E>()); 183 } 184 return executionHolder.get(); 185 } 186 187 /** 188 * A convenient "deep" close operation. Call this instead of {@link #close()} if the execution for the current 189 * context is ending. 190 * Delegates to {@link #close(Object)} and then ensures that {@link #close()} is also called in a finally block. 191 */ 192 public void release() { 193 C context = getContext(); 194 try { 195 if (context != null) { 196 close(context); 197 } 198 } finally { 199 close(); 200 } 201 } 202 203 protected abstract void close(C context); 204 205 protected abstract C createNewContext(E execution, @Nullable BatchPropertyContext propertyContext); 206 207}