001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.item.support; 018 019import org.springframework.batch.item.ExecutionContext; 020import org.springframework.batch.item.ItemCountAware; 021import org.springframework.batch.item.ItemReader; 022import org.springframework.batch.item.ItemStreamException; 023import org.springframework.batch.item.ParseException; 024import org.springframework.batch.item.UnexpectedInputException; 025import org.springframework.lang.Nullable; 026import org.springframework.util.Assert; 027 028/** 029 * Abstract superclass for {@link ItemReader}s that supports restart by storing 030 * item count in the {@link ExecutionContext} (therefore requires item ordering 031 * to be preserved between runs). 032 * 033 * Subclasses are inherently <b>not</b> thread-safe. 034 * 035 * @author Robert Kasanicky 036 * @author Glenn Renfro 037 * @author Mahmoud Ben Hassine 038 */ 039public abstract class AbstractItemCountingItemStreamItemReader<T> extends AbstractItemStreamItemReader<T> { 040 041 private static final String READ_COUNT = "read.count"; 042 043 private static final String READ_COUNT_MAX = "read.count.max"; 044 045 private int currentItemCount = 0; 046 047 private int maxItemCount = Integer.MAX_VALUE; 048 049 private boolean saveState = true; 050 051 /** 052 * Read next item from input. 053 * 054 * @return an item or {@code null} if the data source is exhausted 055 * @throws Exception Allows subclasses to throw checked exceptions for interpretation by the framework 056 */ 057 @Nullable 058 protected abstract T doRead() throws Exception; 059 060 /** 061 * Open resources necessary to start reading input. 062 * @throws Exception Allows subclasses to throw checked exceptions for interpretation by the framework 063 */ 064 protected abstract void doOpen() throws Exception; 065 066 /** 067 * Close the resources opened in {@link #doOpen()}. 068 * @throws Exception Allows subclasses to throw checked exceptions for interpretation by the framework 069 */ 070 protected abstract void doClose() throws Exception; 071 072 /** 073 * Move to the given item index. Subclasses should override this method if 074 * there is a more efficient way of moving to given index than re-reading 075 * the input using {@link #doRead()}. 076 * 077 * @param itemIndex index of item (0 based) to jump to. 078 * @throws Exception Allows subclasses to throw checked exceptions for interpretation by the framework 079 */ 080 protected void jumpToItem(int itemIndex) throws Exception { 081 for (int i = 0; i < itemIndex; i++) { 082 read(); 083 } 084 } 085 086 @Override 087 public T read() throws Exception, UnexpectedInputException, ParseException { 088 if (currentItemCount >= maxItemCount) { 089 return null; 090 } 091 currentItemCount++; 092 T item = doRead(); 093 if(item instanceof ItemCountAware) { 094 ((ItemCountAware) item).setItemCount(currentItemCount); 095 } 096 return item; 097 } 098 099 protected int getCurrentItemCount() { 100 return currentItemCount; 101 } 102 103 /** 104 * The index of the item to start reading from. If the 105 * {@link ExecutionContext} contains a key <code>[name].read.count</code> 106 * (where <code>[name]</code> is the name of this component) the value from 107 * the {@link ExecutionContext} will be used in preference. 108 * 109 * @see #setName(String) 110 * 111 * @param count the value of the current item count 112 */ 113 public void setCurrentItemCount(int count) { 114 this.currentItemCount = count; 115 } 116 117 /** 118 * The maximum index of the items to be read. If the 119 * {@link ExecutionContext} contains a key 120 * <code>[name].read.count.max</code> (where <code>[name]</code> is the name 121 * of this component) the value from the {@link ExecutionContext} will be 122 * used in preference. 123 * 124 * @see #setName(String) 125 * 126 * @param count the value of the maximum item count. count must be greater than zero. 127 */ 128 public void setMaxItemCount(int count) { 129 Assert.isTrue(count > 0, "count must be greater than zero"); 130 this.maxItemCount = count; 131 } 132 133 @Override 134 public void close() throws ItemStreamException { 135 super.close(); 136 currentItemCount = 0; 137 try { 138 doClose(); 139 } 140 catch (Exception e) { 141 throw new ItemStreamException("Error while closing item reader", e); 142 } 143 } 144 145 @Override 146 public void open(ExecutionContext executionContext) throws ItemStreamException { 147 super.open(executionContext); 148 try { 149 doOpen(); 150 } 151 catch (Exception e) { 152 throw new ItemStreamException("Failed to initialize the reader", e); 153 } 154 if (!isSaveState()) { 155 return; 156 } 157 158 if (executionContext.containsKey(getExecutionContextKey(READ_COUNT_MAX))) { 159 maxItemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT_MAX)); 160 } 161 162 int itemCount = 0; 163 if (executionContext.containsKey(getExecutionContextKey(READ_COUNT))) { 164 itemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT)); 165 } 166 else if(currentItemCount > 0) { 167 itemCount = currentItemCount; 168 } 169 170 if (itemCount > 0 && itemCount < maxItemCount) { 171 try { 172 jumpToItem(itemCount); 173 } 174 catch (Exception e) { 175 throw new ItemStreamException("Could not move to stored position on restart", e); 176 } 177 } 178 179 currentItemCount = itemCount; 180 181 } 182 183 @Override 184 public void update(ExecutionContext executionContext) throws ItemStreamException { 185 super.update(executionContext); 186 if (saveState) { 187 Assert.notNull(executionContext, "ExecutionContext must not be null"); 188 executionContext.putInt(getExecutionContextKey(READ_COUNT), currentItemCount); 189 if (maxItemCount < Integer.MAX_VALUE) { 190 executionContext.putInt(getExecutionContextKey(READ_COUNT_MAX), maxItemCount); 191 } 192 } 193 194 } 195 196 197 /** 198 * Set the flag that determines whether to save internal data for 199 * {@link ExecutionContext}. Only switch this to false if you don't want to 200 * save any state from this stream, and you don't need it to be restartable. 201 * Always set it to false if the reader is being used in a concurrent 202 * environment. 203 * 204 * @param saveState flag value (default true). 205 */ 206 public void setSaveState(boolean saveState) { 207 this.saveState = saveState; 208 } 209 210 /** 211 * The flag that determines whether to save internal state for restarts. 212 * @return true if the flag was set 213 */ 214 public boolean isSaveState() { 215 return saveState; 216 } 217 218}