001/* 002 * Copyright 2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.item.data; 017 018import org.springframework.batch.item.ItemReader; 019import org.springframework.batch.item.ItemStreamReader; 020import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; 021import org.springframework.util.Assert; 022 023import java.util.Iterator; 024 025/** 026 * A base class that handles basic reading logic based on the paginated 027 * semantics of Spring Data's paginated facilities. It also handles the 028 * semantics required for restartability based on those facilities. 029 * 030 * @author Michael Minella 031 * @author Glenn Renfro 032 * @since 2.2 033 * @param <T> Type of item to be read 034 */ 035public abstract class AbstractPaginatedDataItemReader<T> extends 036AbstractItemCountingItemStreamItemReader<T> { 037 038 protected volatile int page = 0; 039 040 protected int pageSize = 10; 041 042 protected Iterator<T> results; 043 044 private Object lock = new Object(); 045 046 /** 047 * The number of items to be read with each page. 048 * 049 * @param pageSize the number of items. pageSize must be greater than zero. 050 */ 051 public void setPageSize(int pageSize) { 052 Assert.isTrue(pageSize > 0, "pageSize must be greater than zero"); 053 this.pageSize = pageSize; 054 } 055 056 @Override 057 protected T doRead() throws Exception { 058 059 synchronized (lock) { 060 if(results == null || !results.hasNext()) { 061 062 results = doPageRead(); 063 064 page ++; 065 066 if(results == null || !results.hasNext()) { 067 return null; 068 } 069 } 070 071 072 if(results.hasNext()) { 073 return results.next(); 074 } 075 else { 076 return null; 077 } 078 } 079 } 080 081 /** 082 * Method this {@link ItemStreamReader} delegates to 083 * for the actual work of reading a page. Each time 084 * this method is called, the resulting {@link Iterator} 085 * should contain the items read within the next page. 086 * <br><br> 087 * If the {@link Iterator} is empty or null when it is 088 * returned, this {@link ItemReader} will assume that the 089 * input has been exhausted. 090 * 091 * @return an {@link Iterator} containing the items within a page. 092 */ 093 protected abstract Iterator<T> doPageRead(); 094 095 @Override 096 protected void doOpen() throws Exception { 097 } 098 099 @Override 100 protected void doClose() throws Exception { 101 } 102 103 @Override 104 protected void jumpToItem(int itemLastIndex) throws Exception { 105 synchronized (lock) { 106 page = itemLastIndex / pageSize; 107 int current = itemLastIndex % pageSize; 108 109 Iterator<T> initialPage = doPageRead(); 110 111 for(; current >= 0; current--) { 112 initialPage.next(); 113 } 114 } 115 } 116}