001/* 002 * Copyright 2006-2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.item.database; 017 018import java.util.List; 019 020import org.apache.commons.logging.Log; 021import org.apache.commons.logging.LogFactory; 022import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; 023import org.springframework.beans.factory.InitializingBean; 024import org.springframework.util.Assert; 025import org.springframework.util.ClassUtils; 026 027/** 028 * Abstract {@link org.springframework.batch.item.ItemStreamReader} for to extend when 029 * reading database records in a paging fashion. 030 * 031 * <p> 032 * Implementations should execute queries using paged requests of a size 033 * specified in {@link #setPageSize(int)}. Additional pages are requested when 034 * needed as {@link #read()} method is called, returning an object corresponding 035 * to current position. 036 * </p> 037 * 038 * @author Thomas Risberg 039 * @author Dave Syer 040 * @since 2.0 041 */ 042public abstract class AbstractPagingItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> 043 implements InitializingBean { 044 045 protected Log logger = LogFactory.getLog(getClass()); 046 047 private volatile boolean initialized = false; 048 049 private int pageSize = 10; 050 051 private volatile int current = 0; 052 053 private volatile int page = 0; 054 055 protected volatile List<T> results; 056 057 private Object lock = new Object(); 058 059 public AbstractPagingItemReader() { 060 setName(ClassUtils.getShortName(AbstractPagingItemReader.class)); 061 } 062 063 /** 064 * The current page number. 065 * @return the current page 066 */ 067 public int getPage() { 068 return page; 069 } 070 071 /** 072 * The page size configured for this reader. 073 * @return the page size 074 */ 075 public int getPageSize() { 076 return pageSize; 077 } 078 079 /** 080 * The number of rows to retrieve at a time. 081 * 082 * @param pageSize the number of rows to fetch per page 083 */ 084 public void setPageSize(int pageSize) { 085 this.pageSize = pageSize; 086 } 087 088 /** 089 * Check mandatory properties. 090 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() 091 */ 092 @Override 093 public void afterPropertiesSet() throws Exception { 094 Assert.isTrue(pageSize > 0, "pageSize must be greater than zero"); 095 } 096 097 @Override 098 protected T doRead() throws Exception { 099 100 synchronized (lock) { 101 102 if (results == null || current >= pageSize) { 103 104 if (logger.isDebugEnabled()) { 105 logger.debug("Reading page " + getPage()); 106 } 107 108 doReadPage(); 109 page++; 110 if (current >= pageSize) { 111 current = 0; 112 } 113 114 } 115 116 int next = current++; 117 if (next < results.size()) { 118 return results.get(next); 119 } 120 else { 121 return null; 122 } 123 124 } 125 126 } 127 128 abstract protected void doReadPage(); 129 130 @Override 131 protected void doOpen() throws Exception { 132 133 Assert.state(!initialized, "Cannot open an already opened ItemReader, call close first"); 134 initialized = true; 135 136 } 137 138 @Override 139 protected void doClose() throws Exception { 140 141 synchronized (lock) { 142 initialized = false; 143 current = 0; 144 page = 0; 145 results = null; 146 } 147 148 } 149 150 @Override 151 protected void jumpToItem(int itemIndex) throws Exception { 152 153 synchronized (lock) { 154 page = itemIndex / pageSize; 155 current = itemIndex % pageSize; 156 } 157 158 doJumpToPage(itemIndex); 159 160 if (logger.isDebugEnabled()) { 161 logger.debug("Jumping to page " + getPage() + " and index " + current); 162 } 163 164 } 165 166 abstract protected void doJumpToPage(int itemIndex); 167 168}