001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.item.support;
018
019import org.springframework.batch.item.ExecutionContext;
020import org.springframework.batch.item.ItemCountAware;
021import org.springframework.batch.item.ItemReader;
022import org.springframework.batch.item.ItemStreamException;
023import org.springframework.batch.item.ParseException;
024import org.springframework.batch.item.UnexpectedInputException;
025import org.springframework.lang.Nullable;
026import org.springframework.util.Assert;
027
028/**
029 * Abstract superclass for {@link ItemReader}s that supports restart by storing
030 * item count in the {@link ExecutionContext} (therefore requires item ordering
031 * to be preserved between runs).
032 * 
033 * Subclasses are inherently <b>not</b> thread-safe.
034 * 
035 * @author Robert Kasanicky
036 * @author Glenn Renfro
037 * @author Mahmoud Ben Hassine
038 */
039public abstract class AbstractItemCountingItemStreamItemReader<T> extends AbstractItemStreamItemReader<T> {
040
041        private static final String READ_COUNT = "read.count";
042
043        private static final String READ_COUNT_MAX = "read.count.max";
044
045        private int currentItemCount = 0;
046
047        private int maxItemCount = Integer.MAX_VALUE;
048
049        private boolean saveState = true;
050
051        /**
052         * Read next item from input.
053         * 
054         * @return an item or {@code null} if the data source is exhausted
055         * @throws Exception Allows subclasses to throw checked exceptions for interpretation by the framework
056         */
057        @Nullable
058        protected abstract T doRead() throws Exception;
059
060        /**
061         * Open resources necessary to start reading input.
062         * @throws Exception Allows subclasses to throw checked exceptions for interpretation by the framework
063         */
064        protected abstract void doOpen() throws Exception;
065
066        /**
067         * Close the resources opened in {@link #doOpen()}.
068         * @throws Exception Allows subclasses to throw checked exceptions for interpretation by the framework
069         */
070        protected abstract void doClose() throws Exception;
071
072        /**
073         * Move to the given item index. Subclasses should override this method if
074         * there is a more efficient way of moving to given index than re-reading
075         * the input using {@link #doRead()}.
076         *
077         * @param itemIndex index of item (0 based) to jump to.
078         * @throws Exception Allows subclasses to throw checked exceptions for interpretation by the framework
079         */
080        protected void jumpToItem(int itemIndex) throws Exception {
081                for (int i = 0; i < itemIndex; i++) {
082                        read();
083                }
084        }
085
086        @Override
087        public T read() throws Exception, UnexpectedInputException, ParseException {
088                if (currentItemCount >= maxItemCount) {
089                        return null;
090                }
091                currentItemCount++;
092                T item = doRead();
093                if(item instanceof ItemCountAware) {
094                        ((ItemCountAware) item).setItemCount(currentItemCount);
095                }
096                return item;
097        }
098
099        protected int getCurrentItemCount() {
100                return currentItemCount;
101        }
102
103        /**
104         * The index of the item to start reading from. If the
105         * {@link ExecutionContext} contains a key <code>[name].read.count</code>
106         * (where <code>[name]</code> is the name of this component) the value from
107         * the {@link ExecutionContext} will be used in preference.
108         * 
109         * @see #setName(String)
110         * 
111         * @param count the value of the current item count
112         */
113        public void setCurrentItemCount(int count) {
114                this.currentItemCount = count;
115        }
116
117        /**
118         * The maximum index of the items to be read. If the
119         * {@link ExecutionContext} contains a key
120         * <code>[name].read.count.max</code> (where <code>[name]</code> is the name
121         * of this component) the value from the {@link ExecutionContext} will be
122         * used in preference.
123         * 
124         * @see #setName(String)
125         * 
126         * @param count the value of the maximum item count.  count must be greater than zero.
127         */
128        public void setMaxItemCount(int count) {
129                Assert.isTrue(count > 0, "count must be greater than zero");
130                this.maxItemCount = count;
131        }
132
133        @Override
134        public void close() throws ItemStreamException {
135                super.close();
136                currentItemCount = 0;
137                try {
138                        doClose();
139                }
140                catch (Exception e) {
141                        throw new ItemStreamException("Error while closing item reader", e);
142                }
143        }
144
145        @Override
146        public void open(ExecutionContext executionContext) throws ItemStreamException {
147                super.open(executionContext);
148                try {
149                        doOpen();
150                }
151                catch (Exception e) {
152                        throw new ItemStreamException("Failed to initialize the reader", e);
153                }
154                if (!isSaveState()) {
155                        return;
156                }
157
158                if (executionContext.containsKey(getExecutionContextKey(READ_COUNT_MAX))) {
159                        maxItemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT_MAX));
160                }
161
162                int itemCount = 0;
163                if (executionContext.containsKey(getExecutionContextKey(READ_COUNT))) {
164                        itemCount = executionContext.getInt(getExecutionContextKey(READ_COUNT));
165                }
166                else if(currentItemCount > 0) {
167                        itemCount = currentItemCount;
168                }
169
170                if (itemCount > 0 && itemCount < maxItemCount) {
171                        try {
172                                jumpToItem(itemCount);
173                        }
174                        catch (Exception e) {
175                                throw new ItemStreamException("Could not move to stored position on restart", e);
176                        }
177                }
178
179                currentItemCount = itemCount;
180
181        }
182
183        @Override
184        public void update(ExecutionContext executionContext) throws ItemStreamException {
185                super.update(executionContext);
186                if (saveState) {
187                        Assert.notNull(executionContext, "ExecutionContext must not be null");
188                        executionContext.putInt(getExecutionContextKey(READ_COUNT), currentItemCount);
189                        if (maxItemCount < Integer.MAX_VALUE) {
190                                executionContext.putInt(getExecutionContextKey(READ_COUNT_MAX), maxItemCount);
191                        }
192                }
193
194        }
195
196
197        /**
198         * Set the flag that determines whether to save internal data for
199         * {@link ExecutionContext}. Only switch this to false if you don't want to
200         * save any state from this stream, and you don't need it to be restartable.
201         * Always set it to false if the reader is being used in a concurrent
202         * environment.
203         * 
204         * @param saveState flag value (default true).
205         */
206        public void setSaveState(boolean saveState) {
207                this.saveState = saveState;
208        }
209
210        /**
211         * The flag that determines whether to save internal state for restarts.
212         * @return true if the flag was set
213         */
214        public boolean isSaveState() {
215                return saveState;
216        }
217
218}