001/* 002 * Copyright 2012-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.item.data; 018 019import java.util.ArrayList; 020import java.util.List; 021 022import org.springframework.batch.item.ItemWriter; 023import org.springframework.beans.factory.InitializingBean; 024import org.springframework.data.mongodb.core.MongoOperations; 025import org.springframework.transaction.support.TransactionSynchronizationAdapter; 026import org.springframework.transaction.support.TransactionSynchronizationManager; 027import org.springframework.util.Assert; 028import org.springframework.util.CollectionUtils; 029import org.springframework.util.StringUtils; 030 031/** 032 * <p> 033 * A {@link ItemWriter} implementation that writes to a MongoDB store using an implementation of Spring Data's 034 * {@link MongoOperations}. Since MongoDB is not a transactional store, a best effort is made to persist 035 * written data at the last moment, yet still honor job status contracts. No attempt to roll back is made 036 * if an error occurs during writing. 037 * </p> 038 * 039 * <p> 040 * This writer is thread-safe once all properties are set (normal singleton behavior) so it can be used in multiple 041 * concurrent transactions. 042 * </p> 043 * 044 * @author Michael Minella 045 * 046 */ 047public class MongoItemWriter<T> implements ItemWriter<T>, InitializingBean { 048 049 private MongoOperations template; 050 private final Object bufferKey; 051 private String collection; 052 private boolean delete = false; 053 054 public MongoItemWriter() { 055 super(); 056 this.bufferKey = new Object(); 057 } 058 059 /** 060 * Indicates if the items being passed to the writer are to be saved or 061 * removed from the data store. If set to false (default), the items will 062 * be saved. If set to true, the items will be removed. 063 * 064 * @param delete removal indicator 065 */ 066 public void setDelete(boolean delete) { 067 this.delete = delete; 068 } 069 070 /** 071 * Set the {@link MongoOperations} to be used to save items to be written. 072 * 073 * @param template the template implementation to be used. 074 */ 075 public void setTemplate(MongoOperations template) { 076 this.template = template; 077 } 078 079 /** 080 * Get the {@link MongoOperations} to be used to save items to be written. 081 * This can be called by a subclass if necessary. 082 * 083 * @return template the template implementation to be used. 084 */ 085 protected MongoOperations getTemplate() { 086 return template; 087 } 088 089 /** 090 * Set the name of the Mongo collection to be written to. 091 * 092 * @param collection the name of the collection. 093 */ 094 public void setCollection(String collection) { 095 this.collection = collection; 096 } 097 098 /** 099 * If a transaction is active, buffer items to be written just before commit. 100 * Otherwise write items using the provided template. 101 * 102 * @see org.springframework.batch.item.ItemWriter#write(List) 103 */ 104 @Override 105 public void write(List<? extends T> items) throws Exception { 106 if(!transactionActive()) { 107 doWrite(items); 108 return; 109 } 110 111 List<T> bufferedItems = getCurrentBuffer(); 112 bufferedItems.addAll(items); 113 } 114 115 /** 116 * Performs the actual write to the store via the template. 117 * This can be overridden by a subclass if necessary. 118 * 119 * @param items the list of items to be persisted. 120 */ 121 protected void doWrite(List<? extends T> items) { 122 if(! CollectionUtils.isEmpty(items)) { 123 if(delete) { 124 if(StringUtils.hasText(collection)) { 125 for (Object object : items) { 126 template.remove(object, collection); 127 } 128 } 129 else { 130 for (Object object : items) { 131 template.remove(object); 132 } 133 } 134 } 135 else { 136 if(StringUtils.hasText(collection)) { 137 for (Object object : items) { 138 template.save(object, collection); 139 } 140 } 141 else { 142 for (Object object : items) { 143 template.save(object); 144 } 145 } 146 } 147 } 148 } 149 150 private boolean transactionActive() { 151 return TransactionSynchronizationManager.isActualTransactionActive(); 152 } 153 154 @SuppressWarnings("unchecked") 155 private List<T> getCurrentBuffer() { 156 if(!TransactionSynchronizationManager.hasResource(bufferKey)) { 157 TransactionSynchronizationManager.bindResource(bufferKey, new ArrayList<T>()); 158 159 TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { 160 @Override 161 public void beforeCommit(boolean readOnly) { 162 List<T> items = (List<T>) TransactionSynchronizationManager.getResource(bufferKey); 163 164 if(!CollectionUtils.isEmpty(items)) { 165 if(!readOnly) { 166 doWrite(items); 167 } 168 } 169 } 170 171 @Override 172 public void afterCompletion(int status) { 173 if(TransactionSynchronizationManager.hasResource(bufferKey)) { 174 TransactionSynchronizationManager.unbindResource(bufferKey); 175 } 176 } 177 }); 178 } 179 180 return (List<T>) TransactionSynchronizationManager.getResource(bufferKey); 181 } 182 183 @Override 184 public void afterPropertiesSet() throws Exception { 185 Assert.state(template != null, "A MongoOperations implementation is required."); 186 } 187}