001/*
002 * Copyright 2012 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.item.data;
017
018import java.lang.reflect.InvocationTargetException;
019import java.util.ArrayList;
020import java.util.List;
021import java.util.Map;
022
023import org.apache.commons.logging.Log;
024import org.apache.commons.logging.LogFactory;
025
026import org.springframework.batch.item.ExecutionContext;
027import org.springframework.batch.item.adapter.AbstractMethodInvokingDelegator.InvocationTargetThrowableWrapper;
028import org.springframework.batch.item.adapter.DynamicMethodInvocationException;
029import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
030import org.springframework.beans.factory.InitializingBean;
031import org.springframework.data.domain.Page;
032import org.springframework.data.domain.PageRequest;
033import org.springframework.data.domain.Pageable;
034import org.springframework.data.domain.Sort;
035import org.springframework.data.repository.PagingAndSortingRepository;
036import org.springframework.util.Assert;
037import org.springframework.util.ClassUtils;
038import org.springframework.util.MethodInvoker;
039
040/**
041 * <p>
042 * A {@link org.springframework.batch.item.ItemReader} that reads records utilizing
043 * a {@link org.springframework.data.repository.PagingAndSortingRepository}.
044 * </p>
045 *
046 * <p>
047 * Performance of the reader is dependent on the repository implementation, however
048 * setting a reasonably large page size and matching that to the commit interval should
049 * yield better performance.
050 * </p>
051 *
052 * <p>
053 * The reader must be configured with a {@link org.springframework.data.repository.PagingAndSortingRepository},
054 * a {@link org.springframework.data.domain.Sort}, and a pageSize greater than 0.
055 * </p>
056 *
057 * <p>
058 * This implementation is thread-safe between calls to {@link #open(ExecutionContext)}, but remember to use
059 * <code>saveState=false</code> if used in a multi-threaded client (no restart available).
060 * </p>
061 *
062 * <p>It is important to note that this is a paging item reader and exceptions that are
063 * thrown while reading the page itself (mapping results to objects, etc in the
064 * {@link RepositoryItemReader#doPageRead()}) will not be skippable since this reader has
065 * no way of knowing if an exception should be skipped and therefore will continue to read
066 * the same page until the skip limit is exceeded.</p>
067 *
068 * <p>
069 * NOTE: The {@code RepositoryItemReader} only reads Java Objects i.e. non primitives.
070 * </p>
071 *
072 * @author Michael Minella
073 * @since 2.2
074 */
075public class RepositoryItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean {
076
077        protected Log logger = LogFactory.getLog(getClass());
078
079        private PagingAndSortingRepository<?, ?> repository;
080
081        private Sort sort;
082
083        private volatile int page = 0;
084
085        private int pageSize = 10;
086
087        private volatile int current = 0;
088
089        private List<?> arguments;
090
091        private volatile List<T> results;
092
093        private Object lock = new Object();
094
095        private String methodName;
096
097        public RepositoryItemReader() {
098                setName(ClassUtils.getShortName(RepositoryItemReader.class));
099        }
100
101        /**
102         * Arguments to be passed to the data providing method.
103         *
104         * @param arguments list of method arguments to be passed to the repository
105         */
106        public void setArguments(List<?> arguments) {
107                this.arguments = arguments;
108        }
109
110        /**
111         * Provides ordering of the results so that order is maintained between paged queries
112         *
113         * @param sorts the fields to sort by and the directions
114         */
115        public void setSort(Map<String, Sort.Direction> sorts) {
116                this.sort = convertToSort(sorts);
117        }
118
119        /**
120         * @param pageSize The number of items to retrieve per page.
121         */
122        public void setPageSize(int pageSize) {
123                this.pageSize = pageSize;
124        }
125
126        /**
127         * The {@link org.springframework.data.repository.PagingAndSortingRepository}
128         * implementation used to read input from.
129         *
130         * @param repository underlying repository for input to be read from.
131         */
132        public void setRepository(PagingAndSortingRepository<?, ?> repository) {
133                this.repository = repository;
134        }
135
136        /**
137         * Specifies what method on the repository to call.  This method must take
138         * {@link org.springframework.data.domain.Pageable} as the <em>last</em> argument.
139         *
140         * @param methodName name of the method to invoke
141         */
142        public void setMethodName(String methodName) {
143                this.methodName = methodName;
144        }
145
146        @Override
147        public void afterPropertiesSet() throws Exception {
148                Assert.state(repository != null, "A PagingAndSortingRepository is required");
149                Assert.state(pageSize > 0, "Page size must be greater than 0");
150                Assert.state(sort != null, "A sort is required");
151        }
152
153        @Override
154        protected T doRead() throws Exception {
155
156                synchronized (lock) {
157                        if(results == null || current >= results.size()) {
158
159                                if (logger.isDebugEnabled()) {
160                                        logger.debug("Reading page " + page);
161                                }
162
163                                results = doPageRead();
164
165                                current = 0;
166                                page ++;
167
168                                if(results.size() <= 0) {
169                                        return null;
170                                }
171                        }
172
173                        if(current < results.size()) {
174                                T curLine = results.get(current);
175                                current++;
176                                return curLine;
177                        }
178                        else {
179                                return null;
180                        }
181                }
182        }
183
184        @Override
185        protected void jumpToItem(int itemLastIndex) throws Exception {
186                synchronized (lock) {
187                        page = (itemLastIndex - 1) / pageSize;
188                        current = (itemLastIndex - 1) % pageSize;
189
190                        results = doPageRead();
191                        page++;
192                }
193        }
194
195        /**
196         * Performs the actual reading of a page via the repository.
197         * Available for overriding as needed.
198         *
199         * @return the list of items that make up the page
200         * @throws Exception Based on what the underlying method throws or related to the
201         *                      calling of the method
202         */
203        @SuppressWarnings("unchecked")
204        protected List<T> doPageRead() throws Exception {
205                Pageable pageRequest = PageRequest.of(page, pageSize, sort);
206
207                MethodInvoker invoker = createMethodInvoker(repository, methodName);
208
209                List<Object> parameters = new ArrayList<>();
210
211                if(arguments != null && arguments.size() > 0) {
212                        parameters.addAll(arguments);
213                }
214
215                parameters.add(pageRequest);
216
217                invoker.setArguments(parameters.toArray());
218
219                Page<T> curPage = (Page<T>) doInvoke(invoker);
220
221                return curPage.getContent();
222        }
223
224        @Override
225        protected void doOpen() throws Exception {
226        }
227
228        @Override
229        protected void doClose() throws Exception {
230                synchronized (lock) {
231                        current = 0;
232                        page = 0;
233                        results = null;
234                }
235        }
236
237        private Sort convertToSort(Map<String, Sort.Direction> sorts) {
238                List<Sort.Order> sortValues = new ArrayList<>();
239
240                for (Map.Entry<String, Sort.Direction> curSort : sorts.entrySet()) {
241                        sortValues.add(new Sort.Order(curSort.getValue(), curSort.getKey()));
242                }
243
244                return Sort.by(sortValues);
245        }
246
247        private Object doInvoke(MethodInvoker invoker) throws Exception{
248                try {
249                        invoker.prepare();
250                }
251                catch (ClassNotFoundException | NoSuchMethodException e) {
252                        throw new DynamicMethodInvocationException(e);
253                }
254
255                try {
256                        return invoker.invoke();
257                }
258                catch (InvocationTargetException e) {
259                        if (e.getCause() instanceof Exception) {
260                                throw (Exception) e.getCause();
261                        }
262                        else {
263                                throw new InvocationTargetThrowableWrapper(e.getCause());
264                        }
265                }
266                catch (IllegalAccessException e) {
267                        throw new DynamicMethodInvocationException(e);
268                }
269        }
270
271        private MethodInvoker createMethodInvoker(Object targetObject, String targetMethod) {
272                MethodInvoker invoker = new MethodInvoker();
273                invoker.setTargetObject(targetObject);
274                invoker.setTargetMethod(targetMethod);
275                return invoker;
276        }
277}