001/* 002 * Copyright 2012-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.item.data; 018 019import java.util.ArrayList; 020import java.util.Iterator; 021import java.util.List; 022import java.util.Map; 023import java.util.regex.Matcher; 024import java.util.regex.Pattern; 025 026import com.mongodb.util.JSON; 027 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030import org.springframework.batch.item.ExecutionContext; 031import org.springframework.batch.item.ItemReader; 032import org.springframework.beans.factory.InitializingBean; 033import org.springframework.data.domain.PageRequest; 034import org.springframework.data.domain.Pageable; 035import org.springframework.data.domain.Sort; 036import org.springframework.data.mongodb.core.MongoOperations; 037import org.springframework.data.mongodb.core.query.BasicQuery; 038import org.springframework.data.mongodb.core.query.Query; 039import org.springframework.util.Assert; 040import org.springframework.util.ClassUtils; 041import org.springframework.util.StringUtils; 042 043/** 044 * <p> 045 * Restartable {@link ItemReader} that reads documents from MongoDB 046 * via a paging technique. 047 * </p> 048 * 049 * <p> 050 * If you set JSON String query {@link #setQuery(String)} then 051 * it executes the JSON to retrieve the requested documents. 052 * </p> 053 * 054 * <p> 055 * If you set Query object {@link #setQuery(Query)} then 056 * it executes the Query to retrieve the requested documents. 057 * </p> 058 * 059 * <p> 060 * The query is executed using paged requests specified in the 061 * {@link #setPageSize(int)}. Additional pages are requested as needed to 062 * provide data when the {@link #read()} method is called. 063 * </p> 064 * 065 * <p> 066 * The JSON String query provided supports parameter substitution via ?<index> 067 * placeholders where the <index> indicates the index of the 068 * parameterValue to substitute. 069 * </p> 070 * 071 * <p> 072 * The implementation is thread-safe between calls to 073 * {@link #open(ExecutionContext)}, but remember to use <code>saveState=false</code> 074 * if used in a multi-threaded client (no restart available). 075 * </p> 076 * 077 * 078 * @author Michael Minella 079 * @author Takaaki Iida 080 */ 081public class MongoItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean { 082 083 private static final Logger log = LoggerFactory.getLogger(MongoItemReader.class); 084 085 private static final Pattern PLACEHOLDER = Pattern.compile("\\?(\\d+)"); 086 private MongoOperations template; 087 private Query query; 088 private String queryString; 089 private Class<? extends T> type; 090 private Sort sort; 091 private String hint; 092 private String fields; 093 private String collection; 094 private List<Object> parameterValues; 095 096 public MongoItemReader() { 097 super(); 098 setName(ClassUtils.getShortName(MongoItemReader.class)); 099 } 100 101 /** 102 * A Mongo Query to be used. 103 * 104 * @param query Mongo Query to be used. 105 */ 106 public void setQuery(Query query) { 107 this.query = query; 108 } 109 110 /** 111 * Used to perform operations against the MongoDB instance. Also 112 * handles the mapping of documents to objects. 113 * 114 * @param template the MongoOperations instance to use 115 * @see MongoOperations 116 */ 117 public void setTemplate(MongoOperations template) { 118 this.template = template; 119 } 120 121 /** 122 * A JSON formatted MongoDB query. Parameterization of the provided query is allowed 123 * via ?<index> placeholders where the <index> indicates the index of the 124 * parameterValue to substitute. 125 * 126 * @param queryString JSON formatted Mongo query 127 */ 128 public void setQuery(String queryString) { 129 this.queryString = queryString; 130 } 131 132 /** 133 * The type of object to be returned for each {@link #read()} call. 134 * 135 * @param type the type of object to return 136 */ 137 public void setTargetType(Class<? extends T> type) { 138 this.type = type; 139 } 140 141 /** 142 * {@link List} of values to be substituted in for each of the 143 * parameters in the query. 144 * 145 * @param parameterValues values 146 */ 147 public void setParameterValues(List<Object> parameterValues) { 148 this.parameterValues = parameterValues; 149 } 150 151 /** 152 * JSON defining the fields to be returned from the matching documents 153 * by MongoDB. 154 * 155 * @param fields JSON string that identifies the fields to sort by. 156 */ 157 public void setFields(String fields) { 158 this.fields = fields; 159 } 160 161 /** 162 * {@link Map} of property names/{@link org.springframework.data.domain.Sort.Direction} values to 163 * sort the input by. 164 * 165 * @param sorts map of properties and direction to sort each. 166 */ 167 public void setSort(Map<String, Sort.Direction> sorts) { 168 this.sort = convertToSort(sorts); 169 } 170 171 /** 172 * @param collection Mongo collection to be queried. 173 */ 174 public void setCollection(String collection) { 175 this.collection = collection; 176 } 177 178 /** 179 * JSON String telling MongoDB what index to use. 180 * 181 * @param hint string indicating what index to use. 182 */ 183 public void setHint(String hint) { 184 this.hint = hint; 185 } 186 187 @Override 188 @SuppressWarnings("unchecked") 189 protected Iterator<T> doPageRead() { 190 if (queryString != null) { 191 Pageable pageRequest = PageRequest.of(page, pageSize, sort); 192 193 String populatedQuery = replacePlaceholders(queryString, parameterValues); 194 195 Query mongoQuery; 196 197 if(StringUtils.hasText(fields)) { 198 mongoQuery = new BasicQuery(populatedQuery, fields); 199 } 200 else { 201 mongoQuery = new BasicQuery(populatedQuery); 202 } 203 204 mongoQuery.with(pageRequest); 205 206 if(StringUtils.hasText(hint)) { 207 mongoQuery.withHint(hint); 208 } 209 210 if(StringUtils.hasText(collection)) { 211 return (Iterator<T>) template.find(mongoQuery, type, collection).iterator(); 212 } else { 213 return (Iterator<T>) template.find(mongoQuery, type).iterator(); 214 } 215 216 } else { 217 Pageable pageRequest = PageRequest.of(page, pageSize); 218 query.with(pageRequest); 219 220 if(StringUtils.hasText(collection)) { 221 return (Iterator<T>) template.find(query, type, collection).iterator(); 222 } else { 223 return (Iterator<T>) template.find(query, type).iterator(); 224 } 225 } 226 } 227 228 /** 229 * Checks mandatory properties 230 * 231 * @see InitializingBean#afterPropertiesSet() 232 */ 233 @Override 234 public void afterPropertiesSet() throws Exception { 235 Assert.state(template != null, "An implementation of MongoOperations is required."); 236 Assert.state(type != null, "A type to convert the input into is required."); 237 Assert.state(queryString != null || query != null, "A query is required."); 238 239 if (queryString != null) { 240 Assert.state(sort != null, "A sort is required."); 241 } 242 243 if (query != null && query.getLimit() != 0) { 244 log.warn("PageSize in Query object was ignored. Please set it by MongoItemReader.setPageSize()."); 245 } 246 } 247 248 // Copied from StringBasedMongoQuery...is there a place where this type of logic is already exposed? 249 private String replacePlaceholders(String input, List<Object> values) { 250 Matcher matcher = PLACEHOLDER.matcher(input); 251 String result = input; 252 253 while (matcher.find()) { 254 String group = matcher.group(); 255 int index = Integer.parseInt(matcher.group(1)); 256 result = result.replace(group, getParameterWithIndex(values, index)); 257 } 258 259 return result; 260 } 261 262 // Copied from StringBasedMongoQuery...is there a place where this type of logic is already exposed? 263 private String getParameterWithIndex(List<Object> values, int index) { 264 return JSON.serialize(values.get(index)); 265 } 266 267 private Sort convertToSort(Map<String, Sort.Direction> sorts) { 268 List<Sort.Order> sortValues = new ArrayList<>(); 269 270 for (Map.Entry<String, Sort.Direction> curSort : sorts.entrySet()) { 271 sortValues.add(new Sort.Order(curSort.getValue(), curSort.getKey())); 272 } 273 274 return Sort.by(sortValues); 275 } 276}