001/*
002 * Copyright 2006-2007 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.support.transaction;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Map;
025import java.util.Set;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentMap;
028import java.util.concurrent.CopyOnWriteArrayList;
029import java.util.concurrent.CopyOnWriteArraySet;
030
031import org.aopalliance.intercept.MethodInterceptor;
032import org.aopalliance.intercept.MethodInvocation;
033import org.springframework.aop.framework.ProxyFactory;
034import org.springframework.transaction.support.TransactionSynchronization;
035import org.springframework.transaction.support.TransactionSynchronizationAdapter;
036import org.springframework.transaction.support.TransactionSynchronizationManager;
037
038/**
039 * <p>
040 * Factory for transaction aware objects (like lists, sets, maps). If a
041 * transaction is active when a method is called on an instance created by the
042 * factory, it makes a copy of the target object and carries out all operations
043 * on the copy. Only when the transaction commits is the target re-initialised
044 * with the copy.
045 * </p>
046 * 
047 * <p>
048 * Works well with collections and maps for testing transactional behaviour
049 * without needing a database. The base implementation handles lists, sets and
050 * maps. Subclasses can implement {@link #begin(Object)} and
051 * {@link #commit(Object, Object)} to provide support for other resources.
052 * </p>
053 * 
054 * <p>
055 * Generally not intended for multi-threaded use, but the
056 * {@link #createAppendOnlyTransactionalMap() append only version} of
057 * collections gives isolation between threads operating on different keys in a
058 * map, provided they only append to the map. (Threads are limited to removing
059 * entries that were created in the same transaction.)
060 * </p>
061 * 
062 * @author Dave Syer
063 * 
064 */
065public class TransactionAwareProxyFactory<T> {
066
067        private final T target;
068
069        private final boolean appendOnly;
070
071        private TransactionAwareProxyFactory(T target) {
072                this(target, false);
073
074        }
075
076        private TransactionAwareProxyFactory(T target, boolean appendOnly) {
077                super();
078                this.target = target;
079                this.appendOnly = appendOnly;
080        }
081
082        /**
083         * Make a copy of the target that can be used inside a transaction to
084         * isolate changes from the original. Also called from the factory
085         * constructor to isolate the target from the original value passed in.
086         * 
087         * @param target the target object (List, Set or Map)
088         * @return an independent copy
089         */
090        @SuppressWarnings({ "unchecked", "rawtypes" })
091        protected final T begin(T target) {
092                // Unfortunately in Java 5 this method has to synchronized
093                // (works OK without in Java 6).
094                synchronized (target) {
095                        if (target instanceof List) {
096                                if (appendOnly) {
097                                        return (T) new ArrayList();
098                                }
099                                return (T) new ArrayList((List) target);
100                        }
101                        else if (target instanceof Set) {
102                                if (appendOnly) {
103                                        return (T) new HashSet();
104                                }
105                                return (T) new HashSet((Set) target);
106                        }
107                        else if (target instanceof Map) {
108                                if (appendOnly) {
109                                        return (T) new HashMap();
110                                }
111                                return (T) new HashMap((Map) target);
112                        }
113                        else {
114                                throw new UnsupportedOperationException("Cannot copy target for this type: " + target.getClass());
115                        }
116                }
117        }
118
119        /**
120         * Take the working copy state and commit it back to the original target.
121         * The target then reflects all the changes applied to the copy during a
122         * transaction.
123         * 
124         * @param copy the working copy.
125         * @param target the original target of the factory.
126         */
127        @SuppressWarnings({ "unchecked", "rawtypes" })
128        protected void commit(T copy, T target) {
129                // Unfortunately in Java 5 this method has to be synchronized
130                // (works OK without in Java 6).
131                synchronized (target) {
132                        if (target instanceof Collection) {
133                                if (!appendOnly) {
134                                        ((Collection) target).clear();
135                                }
136                                ((Collection) target).addAll((Collection) copy);
137                        }
138                        else {
139                                if (!appendOnly) {
140                                        ((Map) target).clear();
141                                }
142                                ((Map) target).putAll((Map) copy);
143                        }
144                }
145        }
146
147        private T createInstance() {
148
149                synchronized (target) {
150
151                        ProxyFactory factory = new ProxyFactory(target);
152                        factory.addAdvice(new TransactionAwareInterceptor());
153                        @SuppressWarnings("unchecked")
154                        T instance = (T) factory.getProxy();
155                        return instance;
156
157                }
158
159        }
160
161        public static <K, V> Map<K, V> createTransactionalMap() {
162                return new TransactionAwareProxyFactory<ConcurrentHashMap<K, V>>(new ConcurrentHashMap<K, V>()).createInstance();
163        }
164
165        public static <K, V> Map<K, V> createTransactionalMap(Map<K, V> map) {
166                return new TransactionAwareProxyFactory<ConcurrentHashMap<K, V>>(new ConcurrentHashMap<K, V>(map)).createInstance();
167        }
168
169        public static <K, V> ConcurrentMap<K, V> createAppendOnlyTransactionalMap() {
170                return new TransactionAwareProxyFactory<ConcurrentHashMap<K, V>>(new ConcurrentHashMap<K, V>(), true).createInstance();
171        }
172
173        public static <T> Set<T> createAppendOnlyTransactionalSet() {
174                return new TransactionAwareProxyFactory<CopyOnWriteArraySet<T>>(new CopyOnWriteArraySet<T>(), true).createInstance();
175        }
176
177        public static <T> Set<T> createTransactionalSet() {
178                return new TransactionAwareProxyFactory<CopyOnWriteArraySet<T>>(new CopyOnWriteArraySet<T>()).createInstance();
179        }
180
181        public static <T> Set<T> createTransactionalSet(Set<T> set) {
182                return new TransactionAwareProxyFactory<CopyOnWriteArraySet<T>>(new CopyOnWriteArraySet<T>(set)).createInstance();
183        }
184
185        public static <T> List<T> createAppendOnlyTransactionalList() {
186                return new TransactionAwareProxyFactory<CopyOnWriteArrayList<T>>(new CopyOnWriteArrayList<T>(), true).createInstance();
187        }
188
189        public static <T> List<T> createTransactionalList() {
190                return new TransactionAwareProxyFactory<CopyOnWriteArrayList<T>>(new CopyOnWriteArrayList<T>()).createInstance();
191        }
192
193        public static <T> List<T> createTransactionalList(List<T> list) {
194                return new TransactionAwareProxyFactory<CopyOnWriteArrayList<T>>(new CopyOnWriteArrayList<T>(list)).createInstance();
195        }
196
197        private class TargetSynchronization extends TransactionSynchronizationAdapter {
198
199                private final T cache;
200
201                private final Object key;
202
203                public TargetSynchronization(Object key, T cache) {
204                        super();
205                        this.cache = cache;
206                        this.key = key;
207                }
208
209        @Override
210                public void afterCompletion(int status) {
211                        super.afterCompletion(status);
212                        if (status == TransactionSynchronization.STATUS_COMMITTED) {
213                                synchronized (target) {
214                                        commit(cache, target);
215                                }
216                        }
217                        TransactionSynchronizationManager.unbindResource(key);
218                }
219        }
220
221        private class TransactionAwareInterceptor implements MethodInterceptor {
222
223        @Override
224                public Object invoke(MethodInvocation invocation) throws Throwable {
225
226                        if (!TransactionSynchronizationManager.isActualTransactionActive()) {
227                                return invocation.proceed();
228                        }
229
230                        T cache;
231
232                        if (!TransactionSynchronizationManager.hasResource(this)) {
233                                cache = begin(target);
234                                TransactionSynchronizationManager.bindResource(this, cache);
235                                TransactionSynchronizationManager.registerSynchronization(new TargetSynchronization(this, cache));
236                        }
237                        else {
238                                @SuppressWarnings("unchecked")
239                                T retrievedCache = (T) TransactionSynchronizationManager.getResource(this);
240                                cache = retrievedCache;
241                        }
242
243                        Object result = invocation.getMethod().invoke(cache, invocation.getArguments());
244
245                        if (appendOnly) {
246                                String methodName = invocation.getMethod().getName();
247                                if ((result == null && methodName.equals("get"))
248                                                || (Boolean.FALSE.equals(result) && (methodName.startsWith("contains")) || (Boolean.TRUE
249                                                                .equals(result) && methodName.startsWith("isEmpty")))) {
250                                        // In appendOnly mode the result of a get might not be
251                                        // in the cache...
252                                        return invocation.proceed();
253                                }
254                                if (result instanceof Collection<?>) {
255                                        HashSet<Object> set = new HashSet<Object>((Collection<?>) result);
256                                        set.addAll((Collection<?>) invocation.proceed());
257                                        result = set;
258                                }
259                        }
260
261                        return result;
262
263                }
264        }
265
266}