001/* 002 * Copyright 2006-2007 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.support.transaction; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.HashMap; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Map; 025import java.util.Set; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028import java.util.concurrent.CopyOnWriteArrayList; 029import java.util.concurrent.CopyOnWriteArraySet; 030 031import org.aopalliance.intercept.MethodInterceptor; 032import org.aopalliance.intercept.MethodInvocation; 033import org.springframework.aop.framework.ProxyFactory; 034import org.springframework.transaction.support.TransactionSynchronization; 035import org.springframework.transaction.support.TransactionSynchronizationAdapter; 036import org.springframework.transaction.support.TransactionSynchronizationManager; 037 038/** 039 * <p> 040 * Factory for transaction aware objects (like lists, sets, maps). If a 041 * transaction is active when a method is called on an instance created by the 042 * factory, it makes a copy of the target object and carries out all operations 043 * on the copy. Only when the transaction commits is the target re-initialised 044 * with the copy. 045 * </p> 046 * 047 * <p> 048 * Works well with collections and maps for testing transactional behaviour 049 * without needing a database. The base implementation handles lists, sets and 050 * maps. Subclasses can implement {@link #begin(Object)} and 051 * {@link #commit(Object, Object)} to provide support for other resources. 052 * </p> 053 * 054 * <p> 055 * Generally not intended for multi-threaded use, but the 056 * {@link #createAppendOnlyTransactionalMap() append only version} of 057 * collections gives isolation between threads operating on different keys in a 058 * map, provided they only append to the map. (Threads are limited to removing 059 * entries that were created in the same transaction.) 060 * </p> 061 * 062 * @author Dave Syer 063 * 064 */ 065public class TransactionAwareProxyFactory<T> { 066 067 private final T target; 068 069 private final boolean appendOnly; 070 071 private TransactionAwareProxyFactory(T target) { 072 this(target, false); 073 074 } 075 076 private TransactionAwareProxyFactory(T target, boolean appendOnly) { 077 super(); 078 this.target = target; 079 this.appendOnly = appendOnly; 080 } 081 082 /** 083 * Make a copy of the target that can be used inside a transaction to 084 * isolate changes from the original. Also called from the factory 085 * constructor to isolate the target from the original value passed in. 086 * 087 * @param target the target object (List, Set or Map) 088 * @return an independent copy 089 */ 090 @SuppressWarnings({ "unchecked", "rawtypes" }) 091 protected final T begin(T target) { 092 // Unfortunately in Java 5 this method has to synchronized 093 // (works OK without in Java 6). 094 synchronized (target) { 095 if (target instanceof List) { 096 if (appendOnly) { 097 return (T) new ArrayList(); 098 } 099 return (T) new ArrayList((List) target); 100 } 101 else if (target instanceof Set) { 102 if (appendOnly) { 103 return (T) new HashSet(); 104 } 105 return (T) new HashSet((Set) target); 106 } 107 else if (target instanceof Map) { 108 if (appendOnly) { 109 return (T) new HashMap(); 110 } 111 return (T) new HashMap((Map) target); 112 } 113 else { 114 throw new UnsupportedOperationException("Cannot copy target for this type: " + target.getClass()); 115 } 116 } 117 } 118 119 /** 120 * Take the working copy state and commit it back to the original target. 121 * The target then reflects all the changes applied to the copy during a 122 * transaction. 123 * 124 * @param copy the working copy. 125 * @param target the original target of the factory. 126 */ 127 @SuppressWarnings({ "unchecked", "rawtypes" }) 128 protected void commit(T copy, T target) { 129 // Unfortunately in Java 5 this method has to be synchronized 130 // (works OK without in Java 6). 131 synchronized (target) { 132 if (target instanceof Collection) { 133 if (!appendOnly) { 134 ((Collection) target).clear(); 135 } 136 ((Collection) target).addAll((Collection) copy); 137 } 138 else { 139 if (!appendOnly) { 140 ((Map) target).clear(); 141 } 142 ((Map) target).putAll((Map) copy); 143 } 144 } 145 } 146 147 private T createInstance() { 148 149 synchronized (target) { 150 151 ProxyFactory factory = new ProxyFactory(target); 152 factory.addAdvice(new TransactionAwareInterceptor()); 153 @SuppressWarnings("unchecked") 154 T instance = (T) factory.getProxy(); 155 return instance; 156 157 } 158 159 } 160 161 public static <K, V> Map<K, V> createTransactionalMap() { 162 return new TransactionAwareProxyFactory<ConcurrentHashMap<K, V>>(new ConcurrentHashMap<K, V>()).createInstance(); 163 } 164 165 public static <K, V> Map<K, V> createTransactionalMap(Map<K, V> map) { 166 return new TransactionAwareProxyFactory<ConcurrentHashMap<K, V>>(new ConcurrentHashMap<K, V>(map)).createInstance(); 167 } 168 169 public static <K, V> ConcurrentMap<K, V> createAppendOnlyTransactionalMap() { 170 return new TransactionAwareProxyFactory<ConcurrentHashMap<K, V>>(new ConcurrentHashMap<K, V>(), true).createInstance(); 171 } 172 173 public static <T> Set<T> createAppendOnlyTransactionalSet() { 174 return new TransactionAwareProxyFactory<CopyOnWriteArraySet<T>>(new CopyOnWriteArraySet<T>(), true).createInstance(); 175 } 176 177 public static <T> Set<T> createTransactionalSet() { 178 return new TransactionAwareProxyFactory<CopyOnWriteArraySet<T>>(new CopyOnWriteArraySet<T>()).createInstance(); 179 } 180 181 public static <T> Set<T> createTransactionalSet(Set<T> set) { 182 return new TransactionAwareProxyFactory<CopyOnWriteArraySet<T>>(new CopyOnWriteArraySet<T>(set)).createInstance(); 183 } 184 185 public static <T> List<T> createAppendOnlyTransactionalList() { 186 return new TransactionAwareProxyFactory<CopyOnWriteArrayList<T>>(new CopyOnWriteArrayList<T>(), true).createInstance(); 187 } 188 189 public static <T> List<T> createTransactionalList() { 190 return new TransactionAwareProxyFactory<CopyOnWriteArrayList<T>>(new CopyOnWriteArrayList<T>()).createInstance(); 191 } 192 193 public static <T> List<T> createTransactionalList(List<T> list) { 194 return new TransactionAwareProxyFactory<CopyOnWriteArrayList<T>>(new CopyOnWriteArrayList<T>(list)).createInstance(); 195 } 196 197 private class TargetSynchronization extends TransactionSynchronizationAdapter { 198 199 private final T cache; 200 201 private final Object key; 202 203 public TargetSynchronization(Object key, T cache) { 204 super(); 205 this.cache = cache; 206 this.key = key; 207 } 208 209 @Override 210 public void afterCompletion(int status) { 211 super.afterCompletion(status); 212 if (status == TransactionSynchronization.STATUS_COMMITTED) { 213 synchronized (target) { 214 commit(cache, target); 215 } 216 } 217 TransactionSynchronizationManager.unbindResource(key); 218 } 219 } 220 221 private class TransactionAwareInterceptor implements MethodInterceptor { 222 223 @Override 224 public Object invoke(MethodInvocation invocation) throws Throwable { 225 226 if (!TransactionSynchronizationManager.isActualTransactionActive()) { 227 return invocation.proceed(); 228 } 229 230 T cache; 231 232 if (!TransactionSynchronizationManager.hasResource(this)) { 233 cache = begin(target); 234 TransactionSynchronizationManager.bindResource(this, cache); 235 TransactionSynchronizationManager.registerSynchronization(new TargetSynchronization(this, cache)); 236 } 237 else { 238 @SuppressWarnings("unchecked") 239 T retrievedCache = (T) TransactionSynchronizationManager.getResource(this); 240 cache = retrievedCache; 241 } 242 243 Object result = invocation.getMethod().invoke(cache, invocation.getArguments()); 244 245 if (appendOnly) { 246 String methodName = invocation.getMethod().getName(); 247 if ((result == null && methodName.equals("get")) 248 || (Boolean.FALSE.equals(result) && (methodName.startsWith("contains")) || (Boolean.TRUE 249 .equals(result) && methodName.startsWith("isEmpty")))) { 250 // In appendOnly mode the result of a get might not be 251 // in the cache... 252 return invocation.proceed(); 253 } 254 if (result instanceof Collection<?>) { 255 HashSet<Object> set = new HashSet<Object>((Collection<?>) result); 256 set.addAll((Collection<?>) invocation.proceed()); 257 result = set; 258 } 259 } 260 261 return result; 262 263 } 264 } 265 266}