001/*
002 * Copyright 2012-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.item.data;
018
019import java.util.ArrayList;
020import java.util.Iterator;
021import java.util.List;
022import java.util.Map;
023import java.util.regex.Matcher;
024import java.util.regex.Pattern;
025
026import com.mongodb.util.JSON;
027
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030import org.springframework.batch.item.ExecutionContext;
031import org.springframework.batch.item.ItemReader;
032import org.springframework.beans.factory.InitializingBean;
033import org.springframework.data.domain.PageRequest;
034import org.springframework.data.domain.Pageable;
035import org.springframework.data.domain.Sort;
036import org.springframework.data.mongodb.core.MongoOperations;
037import org.springframework.data.mongodb.core.query.BasicQuery;
038import org.springframework.data.mongodb.core.query.Query;
039import org.springframework.util.Assert;
040import org.springframework.util.ClassUtils;
041import org.springframework.util.StringUtils;
042
043/**
044 * <p>
045 * Restartable {@link ItemReader} that reads documents from MongoDB
046 * via a paging technique.
047 * </p>
048 *
049 * <p>
050 * If you set JSON String query {@link #setQuery(String)} then
051 * it executes the JSON to retrieve the requested documents.
052 * </p>
053 * 
054 * <p>
055 * If you set Query object {@link #setQuery(Query)} then
056 * it executes the Query to retrieve the requested documents.
057 * </p>
058 * 
059 * <p>
060 * The query is executed using paged requests specified in the
061 * {@link #setPageSize(int)}.  Additional pages are requested as needed to
062 * provide data when the {@link #read()} method is called.
063 * </p>
064 *
065 * <p>
066 * The JSON String query provided supports parameter substitution via ?&lt;index&gt;
067 * placeholders where the &lt;index&gt; indicates the index of the
068 * parameterValue to substitute.
069 * </p>
070 *
071 * <p>
072 * The implementation is thread-safe between calls to
073 * {@link #open(ExecutionContext)}, but remember to use <code>saveState=false</code>
074 * if used in a multi-threaded client (no restart available).
075 * </p>
076 *
077 *
078 * @author Michael Minella
079 * @author Takaaki Iida
080 */
081public class MongoItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {
082        
083        private static final Logger log = LoggerFactory.getLogger(MongoItemReader.class);
084        
085        private static final Pattern PLACEHOLDER = Pattern.compile("\\?(\\d+)");
086        private MongoOperations template;
087        private Query query;
088        private String queryString;
089        private Class<? extends T> type;
090        private Sort sort;
091        private String hint;
092        private String fields;
093        private String collection;
094        private List<Object> parameterValues;
095
096        public MongoItemReader() {
097                super();
098                setName(ClassUtils.getShortName(MongoItemReader.class));
099        }
100        
101        /**
102         * A Mongo Query to be used.
103         *
104         * @param query Mongo Query to be used.
105         */
106        public void setQuery(Query query) {
107                this.query = query;
108        }
109
110        /**
111         * Used to perform operations against the MongoDB instance.  Also
112         * handles the mapping of documents to objects.
113         *
114         * @param template the MongoOperations instance to use
115         * @see MongoOperations
116         */
117        public void setTemplate(MongoOperations template) {
118                this.template = template;
119        }
120
121        /**
122         * A JSON formatted MongoDB query.  Parameterization of the provided query is allowed
123         * via ?&lt;index&gt; placeholders where the &lt;index&gt; indicates the index of the
124         * parameterValue to substitute.
125         *
126         * @param queryString JSON formatted Mongo query
127         */
128        public void setQuery(String queryString) {
129                this.queryString = queryString;
130        }
131
132        /**
133         * The type of object to be returned for each {@link #read()} call.
134         *
135         * @param type the type of object to return
136         */
137        public void setTargetType(Class<? extends T> type) {
138                this.type = type;
139        }
140
141        /**
142         * {@link List} of values to be substituted in for each of the
143         * parameters in the query.
144         *
145         * @param parameterValues values
146         */
147        public void setParameterValues(List<Object> parameterValues) {
148                this.parameterValues = parameterValues;
149        }
150
151        /**
152         * JSON defining the fields to be returned from the matching documents
153         * by MongoDB.
154         *
155         * @param fields JSON string that identifies the fields to sort by.
156         */
157        public void setFields(String fields) {
158                this.fields = fields;
159        }
160
161        /**
162         * {@link Map} of property names/{@link org.springframework.data.domain.Sort.Direction} values to
163         * sort the input by.
164         *
165         * @param sorts map of properties and direction to sort each.
166         */
167        public void setSort(Map<String, Sort.Direction> sorts) {
168                this.sort = convertToSort(sorts);
169        }
170
171        /**
172         * @param collection Mongo collection to be queried.
173         */
174        public void setCollection(String collection) {
175                this.collection = collection;
176        }
177
178        /**
179         * JSON String telling MongoDB what index to use.
180         *
181         * @param hint string indicating what index to use.
182         */
183        public void setHint(String hint) {
184                this.hint = hint;
185        }
186
187        @Override
188        @SuppressWarnings("unchecked")
189        protected Iterator<T> doPageRead() {
190                if (queryString != null) {
191                        Pageable pageRequest = PageRequest.of(page, pageSize, sort);
192        
193                        String populatedQuery = replacePlaceholders(queryString, parameterValues);
194        
195                        Query mongoQuery;
196        
197                        if(StringUtils.hasText(fields)) {
198                                mongoQuery = new BasicQuery(populatedQuery, fields);
199                        }
200                        else {
201                                mongoQuery = new BasicQuery(populatedQuery);
202                        }
203        
204                        mongoQuery.with(pageRequest);
205        
206                        if(StringUtils.hasText(hint)) {
207                                mongoQuery.withHint(hint);
208                        }
209        
210                        if(StringUtils.hasText(collection)) {
211                                return (Iterator<T>) template.find(mongoQuery, type, collection).iterator();
212                        } else {
213                                return (Iterator<T>) template.find(mongoQuery, type).iterator();
214                        }
215                        
216                } else {
217                        Pageable pageRequest = PageRequest.of(page, pageSize);
218                        query.with(pageRequest);
219                        
220                        if(StringUtils.hasText(collection)) {
221                                return (Iterator<T>) template.find(query, type, collection).iterator();
222                        } else {
223                                return (Iterator<T>) template.find(query, type).iterator();
224                        }
225                }
226        }
227
228        /**
229         * Checks mandatory properties
230         *
231         * @see InitializingBean#afterPropertiesSet()
232         */
233        @Override
234        public void afterPropertiesSet() throws Exception {
235                Assert.state(template != null, "An implementation of MongoOperations is required.");
236                Assert.state(type != null, "A type to convert the input into is required.");
237                Assert.state(queryString != null || query != null, "A query is required.");
238                
239                if (queryString != null) {
240                        Assert.state(sort != null, "A sort is required.");
241                }
242
243                if (query != null && query.getLimit() != 0) {
244                        log.warn("PageSize in Query object was ignored. Please set it by MongoItemReader.setPageSize().");
245                }
246        }
247
248        // Copied from StringBasedMongoQuery...is there a place where this type of logic is already exposed?
249        private String replacePlaceholders(String input, List<Object> values) {
250                Matcher matcher = PLACEHOLDER.matcher(input);
251                String result = input;
252
253                while (matcher.find()) {
254                        String group = matcher.group();
255                        int index = Integer.parseInt(matcher.group(1));
256                        result = result.replace(group, getParameterWithIndex(values, index));
257                }
258
259                return result;
260        }
261
262        // Copied from StringBasedMongoQuery...is there a place where this type of logic is already exposed?
263        private String getParameterWithIndex(List<Object> values, int index) {
264                return JSON.serialize(values.get(index));
265        }
266
267        private Sort convertToSort(Map<String, Sort.Direction> sorts) {
268                List<Sort.Order> sortValues = new ArrayList<>();
269
270                for (Map.Entry<String, Sort.Direction> curSort : sorts.entrySet()) {
271                        sortValues.add(new Sort.Order(curSort.getValue(), curSort.getKey()));
272                }
273
274                return Sort.by(sortValues);
275        }
276}