001/*
002 * Copyright 2012-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.item.data;
018
019import java.util.ArrayList;
020import java.util.List;
021
022import org.springframework.batch.item.ItemWriter;
023import org.springframework.beans.factory.InitializingBean;
024import org.springframework.data.mongodb.core.MongoOperations;
025import org.springframework.transaction.support.TransactionSynchronizationAdapter;
026import org.springframework.transaction.support.TransactionSynchronizationManager;
027import org.springframework.util.Assert;
028import org.springframework.util.CollectionUtils;
029import org.springframework.util.StringUtils;
030
031/**
032 * <p>
033 * A {@link ItemWriter} implementation that writes to a MongoDB store using an implementation of Spring Data's
034 * {@link MongoOperations}.  Since MongoDB is not a transactional store, a best effort is made to persist
035 * written data at the last moment, yet still honor job status contracts.  No attempt to roll back is made
036 * if an error occurs during writing.
037 * </p>
038 *
039 * <p>
040 * This writer is thread-safe once all properties are set (normal singleton behavior) so it can be used in multiple
041 * concurrent transactions.
042 * </p>
043 *
044 * @author Michael Minella
045 *
046 */
047public class MongoItemWriter<T> implements ItemWriter<T>, InitializingBean {
048
049        private MongoOperations template;
050        private final Object bufferKey;
051        private String collection;
052        private boolean delete = false;
053
054        public MongoItemWriter() {
055                super();
056                this.bufferKey = new Object();
057        }
058
059        /**
060         * Indicates if the items being passed to the writer are to be saved or
061         * removed from the data store.  If set to false (default), the items will
062         * be saved.  If set to true, the items will be removed.
063         *
064         * @param delete removal indicator
065         */
066        public void setDelete(boolean delete) {
067                this.delete = delete;
068        }
069
070        /**
071         * Set the {@link MongoOperations} to be used to save items to be written.
072         *
073         * @param template the template implementation to be used.
074         */
075        public void setTemplate(MongoOperations template) {
076                this.template = template;
077        }
078
079        /**
080         * Get the {@link MongoOperations} to be used to save items to be written.
081         * This can be called by a subclass if necessary.
082         * 
083         * @return template the template implementation to be used.
084         */
085        protected MongoOperations getTemplate() {
086                return template;
087        }
088
089        /**
090         * Set the name of the Mongo collection to be written to.
091         *
092         * @param collection the name of the collection.
093         */
094        public void setCollection(String collection) {
095                this.collection = collection;
096        }
097
098        /**
099         * If a transaction is active, buffer items to be written just before commit.
100         * Otherwise write items using the provided template.
101         *
102         * @see org.springframework.batch.item.ItemWriter#write(List)
103         */
104        @Override
105        public void write(List<? extends T> items) throws Exception {
106                if(!transactionActive()) {
107                        doWrite(items);
108                        return;
109                }
110
111                List<T> bufferedItems = getCurrentBuffer();
112                bufferedItems.addAll(items);
113        }
114
115        /**
116         * Performs the actual write to the store via the template.
117         * This can be overridden by a subclass if necessary.
118         *
119         * @param items the list of items to be persisted.
120         */
121        protected void doWrite(List<? extends T> items) {
122                if(! CollectionUtils.isEmpty(items)) {
123                        if(delete) {
124                                if(StringUtils.hasText(collection)) {
125                                        for (Object object : items) {
126                                                template.remove(object, collection);
127                                        }
128                                }
129                                else {
130                                        for (Object object : items) {
131                                                template.remove(object);
132                                        }
133                                }
134                        }
135                        else {
136                                if(StringUtils.hasText(collection)) {
137                                        for (Object object : items) {
138                                                template.save(object, collection);
139                                        }
140                                }
141                                else {
142                                        for (Object object : items) {
143                                                template.save(object);
144                                        }
145                                }
146                        }
147                }
148        }
149
150        private boolean transactionActive() {
151                return TransactionSynchronizationManager.isActualTransactionActive();
152        }
153
154        @SuppressWarnings("unchecked")
155        private List<T> getCurrentBuffer() {
156                if(!TransactionSynchronizationManager.hasResource(bufferKey)) {
157                        TransactionSynchronizationManager.bindResource(bufferKey, new ArrayList<T>());
158
159                        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
160                                @Override
161                                public void beforeCommit(boolean readOnly) {
162                                        List<T> items = (List<T>) TransactionSynchronizationManager.getResource(bufferKey);
163
164                                        if(!CollectionUtils.isEmpty(items)) {
165                                                if(!readOnly) {
166                                                        doWrite(items);
167                                                }
168                                        }
169                                }
170
171                                @Override
172                                public void afterCompletion(int status) {
173                                        if(TransactionSynchronizationManager.hasResource(bufferKey)) {
174                                                TransactionSynchronizationManager.unbindResource(bufferKey);
175                                        }
176                                }
177                        });
178                }
179
180                return (List<T>) TransactionSynchronizationManager.getResource(bufferKey);
181        }
182
183        @Override
184        public void afterPropertiesSet() throws Exception {
185                Assert.state(template != null, "A MongoOperations implementation is required.");
186        }
187}