001/* 002 * Copyright 2012 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.item.data; 017 018import java.lang.reflect.InvocationTargetException; 019import java.util.ArrayList; 020import java.util.List; 021import java.util.Map; 022 023import org.apache.commons.logging.Log; 024import org.apache.commons.logging.LogFactory; 025 026import org.springframework.batch.item.ExecutionContext; 027import org.springframework.batch.item.adapter.AbstractMethodInvokingDelegator.InvocationTargetThrowableWrapper; 028import org.springframework.batch.item.adapter.DynamicMethodInvocationException; 029import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; 030import org.springframework.beans.factory.InitializingBean; 031import org.springframework.data.domain.Page; 032import org.springframework.data.domain.PageRequest; 033import org.springframework.data.domain.Pageable; 034import org.springframework.data.domain.Sort; 035import org.springframework.data.repository.PagingAndSortingRepository; 036import org.springframework.util.Assert; 037import org.springframework.util.ClassUtils; 038import org.springframework.util.MethodInvoker; 039 040/** 041 * <p> 042 * A {@link org.springframework.batch.item.ItemReader} that reads records utilizing 043 * a {@link org.springframework.data.repository.PagingAndSortingRepository}. 044 * </p> 045 * 046 * <p> 047 * Performance of the reader is dependent on the repository implementation, however 048 * setting a reasonably large page size and matching that to the commit interval should 049 * yield better performance. 050 * </p> 051 * 052 * <p> 053 * The reader must be configured with a {@link org.springframework.data.repository.PagingAndSortingRepository}, 054 * a {@link org.springframework.data.domain.Sort}, and a pageSize greater than 0. 055 * </p> 056 * 057 * <p> 058 * This implementation is thread-safe between calls to {@link #open(ExecutionContext)}, but remember to use 059 * <code>saveState=false</code> if used in a multi-threaded client (no restart available). 060 * </p> 061 * 062 * <p>It is important to note that this is a paging item reader and exceptions that are 063 * thrown while reading the page itself (mapping results to objects, etc in the 064 * {@link RepositoryItemReader#doPageRead()}) will not be skippable since this reader has 065 * no way of knowing if an exception should be skipped and therefore will continue to read 066 * the same page until the skip limit is exceeded.</p> 067 * 068 * <p> 069 * NOTE: The {@code RepositoryItemReader} only reads Java Objects i.e. non primitives. 070 * </p> 071 * 072 * @author Michael Minella 073 * @since 2.2 074 */ 075public class RepositoryItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements InitializingBean { 076 077 protected Log logger = LogFactory.getLog(getClass()); 078 079 private PagingAndSortingRepository<?, ?> repository; 080 081 private Sort sort; 082 083 private volatile int page = 0; 084 085 private int pageSize = 10; 086 087 private volatile int current = 0; 088 089 private List<?> arguments; 090 091 private volatile List<T> results; 092 093 private Object lock = new Object(); 094 095 private String methodName; 096 097 public RepositoryItemReader() { 098 setName(ClassUtils.getShortName(RepositoryItemReader.class)); 099 } 100 101 /** 102 * Arguments to be passed to the data providing method. 103 * 104 * @param arguments list of method arguments to be passed to the repository 105 */ 106 public void setArguments(List<?> arguments) { 107 this.arguments = arguments; 108 } 109 110 /** 111 * Provides ordering of the results so that order is maintained between paged queries 112 * 113 * @param sorts the fields to sort by and the directions 114 */ 115 public void setSort(Map<String, Sort.Direction> sorts) { 116 this.sort = convertToSort(sorts); 117 } 118 119 /** 120 * @param pageSize The number of items to retrieve per page. 121 */ 122 public void setPageSize(int pageSize) { 123 this.pageSize = pageSize; 124 } 125 126 /** 127 * The {@link org.springframework.data.repository.PagingAndSortingRepository} 128 * implementation used to read input from. 129 * 130 * @param repository underlying repository for input to be read from. 131 */ 132 public void setRepository(PagingAndSortingRepository<?, ?> repository) { 133 this.repository = repository; 134 } 135 136 /** 137 * Specifies what method on the repository to call. This method must take 138 * {@link org.springframework.data.domain.Pageable} as the <em>last</em> argument. 139 * 140 * @param methodName name of the method to invoke 141 */ 142 public void setMethodName(String methodName) { 143 this.methodName = methodName; 144 } 145 146 @Override 147 public void afterPropertiesSet() throws Exception { 148 Assert.state(repository != null, "A PagingAndSortingRepository is required"); 149 Assert.state(pageSize > 0, "Page size must be greater than 0"); 150 Assert.state(sort != null, "A sort is required"); 151 } 152 153 @Override 154 protected T doRead() throws Exception { 155 156 synchronized (lock) { 157 if(results == null || current >= results.size()) { 158 159 if (logger.isDebugEnabled()) { 160 logger.debug("Reading page " + page); 161 } 162 163 results = doPageRead(); 164 165 current = 0; 166 page ++; 167 168 if(results.size() <= 0) { 169 return null; 170 } 171 } 172 173 if(current < results.size()) { 174 T curLine = results.get(current); 175 current++; 176 return curLine; 177 } 178 else { 179 return null; 180 } 181 } 182 } 183 184 @Override 185 protected void jumpToItem(int itemLastIndex) throws Exception { 186 synchronized (lock) { 187 page = (itemLastIndex - 1) / pageSize; 188 current = (itemLastIndex - 1) % pageSize; 189 190 results = doPageRead(); 191 page++; 192 } 193 } 194 195 /** 196 * Performs the actual reading of a page via the repository. 197 * Available for overriding as needed. 198 * 199 * @return the list of items that make up the page 200 * @throws Exception Based on what the underlying method throws or related to the 201 * calling of the method 202 */ 203 @SuppressWarnings("unchecked") 204 protected List<T> doPageRead() throws Exception { 205 Pageable pageRequest = PageRequest.of(page, pageSize, sort); 206 207 MethodInvoker invoker = createMethodInvoker(repository, methodName); 208 209 List<Object> parameters = new ArrayList<>(); 210 211 if(arguments != null && arguments.size() > 0) { 212 parameters.addAll(arguments); 213 } 214 215 parameters.add(pageRequest); 216 217 invoker.setArguments(parameters.toArray()); 218 219 Page<T> curPage = (Page<T>) doInvoke(invoker); 220 221 return curPage.getContent(); 222 } 223 224 @Override 225 protected void doOpen() throws Exception { 226 } 227 228 @Override 229 protected void doClose() throws Exception { 230 synchronized (lock) { 231 current = 0; 232 page = 0; 233 results = null; 234 } 235 } 236 237 private Sort convertToSort(Map<String, Sort.Direction> sorts) { 238 List<Sort.Order> sortValues = new ArrayList<>(); 239 240 for (Map.Entry<String, Sort.Direction> curSort : sorts.entrySet()) { 241 sortValues.add(new Sort.Order(curSort.getValue(), curSort.getKey())); 242 } 243 244 return Sort.by(sortValues); 245 } 246 247 private Object doInvoke(MethodInvoker invoker) throws Exception{ 248 try { 249 invoker.prepare(); 250 } 251 catch (ClassNotFoundException | NoSuchMethodException e) { 252 throw new DynamicMethodInvocationException(e); 253 } 254 255 try { 256 return invoker.invoke(); 257 } 258 catch (InvocationTargetException e) { 259 if (e.getCause() instanceof Exception) { 260 throw (Exception) e.getCause(); 261 } 262 else { 263 throw new InvocationTargetThrowableWrapper(e.getCause()); 264 } 265 } 266 catch (IllegalAccessException e) { 267 throw new DynamicMethodInvocationException(e); 268 } 269 } 270 271 private MethodInvoker createMethodInvoker(Object targetObject, String targetMethod) { 272 MethodInvoker invoker = new MethodInvoker(); 273 invoker.setTargetObject(targetObject); 274 invoker.setTargetMethod(targetMethod); 275 return invoker; 276 } 277}