001/*
002 * Copyright 2006-2007 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.item.file;
018
019import java.io.BufferedReader;
020import java.io.IOException;
021import java.nio.charset.Charset;
022
023import org.apache.commons.logging.Log;
024import org.apache.commons.logging.LogFactory;
025import org.springframework.batch.item.ItemReader;
026import org.springframework.batch.item.ReaderNotOpenException;
027import org.springframework.batch.item.file.separator.RecordSeparatorPolicy;
028import org.springframework.batch.item.file.separator.SimpleRecordSeparatorPolicy;
029import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
030import org.springframework.beans.factory.InitializingBean;
031import org.springframework.core.io.Resource;
032import org.springframework.util.Assert;
033import org.springframework.util.ClassUtils;
034import org.springframework.util.StringUtils;
035
036/**
037 * Restartable {@link ItemReader} that reads lines from input {@link #setResource(Resource)}. Line is defined by the
038 * {@link #setRecordSeparatorPolicy(RecordSeparatorPolicy)} and mapped to item using {@link #setLineMapper(LineMapper)}.
039 * If an exception is thrown during line mapping it is rethrown as {@link FlatFileParseException} adding information
040 * about the problematic line and its line number.
041 * 
042 * @author Robert Kasanicky
043 * @author Mahmoud Ben Hassine
044 */
045public class FlatFileItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements
046                ResourceAwareItemReaderItemStream<T>, InitializingBean {
047
048        private static final Log logger = LogFactory.getLog(FlatFileItemReader.class);
049
050        // default encoding for input files
051        public static final String DEFAULT_CHARSET = Charset.defaultCharset().name();
052
053        public static final String[] DEFAULT_COMMENT_PREFIXES = new String[] { "#" };
054
055        private RecordSeparatorPolicy recordSeparatorPolicy = new SimpleRecordSeparatorPolicy();
056
057        private Resource resource;
058
059        private BufferedReader reader;
060
061        private int lineCount = 0;
062
063        private String[] comments = DEFAULT_COMMENT_PREFIXES;
064
065        private boolean noInput = false;
066
067        private String encoding = DEFAULT_CHARSET;
068
069        private LineMapper<T> lineMapper;
070
071        private int linesToSkip = 0;
072
073        private LineCallbackHandler skippedLinesCallback;
074
075        private boolean strict = true;
076
077        private BufferedReaderFactory bufferedReaderFactory = new DefaultBufferedReaderFactory();
078
079        public FlatFileItemReader() {
080                setName(ClassUtils.getShortName(FlatFileItemReader.class));
081        }
082
083        /**
084         * In strict mode the reader will throw an exception on
085         * {@link #open(org.springframework.batch.item.ExecutionContext)} if the input resource does not exist.
086         * @param strict <code>true</code> by default
087         */
088        public void setStrict(boolean strict) {
089                this.strict = strict;
090        }
091
092        /**
093         * @param skippedLinesCallback will be called for each one of the initial skipped lines before any items are read.
094         */
095        public void setSkippedLinesCallback(LineCallbackHandler skippedLinesCallback) {
096                this.skippedLinesCallback = skippedLinesCallback;
097        }
098
099        /**
100         * Public setter for the number of lines to skip at the start of a file. Can be used if the file contains a header
101         * without useful (column name) information, and without a comment delimiter at the beginning of the lines.
102         * 
103         * @param linesToSkip the number of lines to skip
104         */
105        public void setLinesToSkip(int linesToSkip) {
106                this.linesToSkip = linesToSkip;
107        }
108
109        /**
110         * Setter for line mapper. This property is required to be set.
111         * @param lineMapper maps line to item
112         */
113        public void setLineMapper(LineMapper<T> lineMapper) {
114                this.lineMapper = lineMapper;
115        }
116
117        /**
118         * Setter for the encoding for this input source. Default value is {@link #DEFAULT_CHARSET}.
119         * 
120         * @param encoding a properties object which possibly contains the encoding for this input file;
121         */
122        public void setEncoding(String encoding) {
123                this.encoding = encoding;
124        }
125
126        /**
127         * Factory for the {@link BufferedReader} that will be used to extract lines from the file. The default is fine for
128         * plain text files, but this is a useful strategy for binary files where the standard BufferedReader from java.io
129         * is limiting.
130         * 
131         * @param bufferedReaderFactory the bufferedReaderFactory to set
132         */
133        public void setBufferedReaderFactory(BufferedReaderFactory bufferedReaderFactory) {
134                this.bufferedReaderFactory = bufferedReaderFactory;
135        }
136
137        /**
138         * Setter for comment prefixes. Can be used to ignore header lines as well by using e.g. the first couple of column
139         * names as a prefix. Defaults to {@link #DEFAULT_COMMENT_PREFIXES}.
140         * 
141         * @param comments an array of comment line prefixes.
142         */
143        public void setComments(String[] comments) {
144                this.comments = new String[comments.length];
145                System.arraycopy(comments, 0, this.comments, 0, comments.length);
146        }
147
148        /**
149         * Public setter for the input resource.
150         */
151    @Override
152        public void setResource(Resource resource) {
153                this.resource = resource;
154        }
155
156        /**
157         * Public setter for the recordSeparatorPolicy. Used to determine where the line endings are and do things like
158         * continue over a line ending if inside a quoted string.
159         * 
160         * @param recordSeparatorPolicy the recordSeparatorPolicy to set
161         */
162        public void setRecordSeparatorPolicy(RecordSeparatorPolicy recordSeparatorPolicy) {
163                this.recordSeparatorPolicy = recordSeparatorPolicy;
164        }
165
166        /**
167         * @return string corresponding to logical record according to
168         * {@link #setRecordSeparatorPolicy(RecordSeparatorPolicy)} (might span multiple lines in file).
169         */
170        @Override
171        protected T doRead() throws Exception {
172                if (noInput) {
173                        return null;
174                }
175
176                String line = readLine();
177
178                if (line == null) {
179                        return null;
180                }
181                else {
182                        try {
183                                return lineMapper.mapLine(line, lineCount);
184                        }
185                        catch (Exception ex) {
186                                throw new FlatFileParseException("Parsing error at line: " + lineCount + " in resource=["
187                                                + resource.getDescription() + "], input=[" + line + "]", ex, line, lineCount);
188                        }
189                }
190        }
191
192        /**
193         * @return next line (skip comments).getCurrentResource
194         */
195        private String readLine() {
196
197                if (reader == null) {
198                        throw new ReaderNotOpenException("Reader must be open before it can be read.");
199                }
200
201                String line = null;
202
203                try {
204                        line = this.reader.readLine();
205                        if (line == null) {
206                                return null;
207                        }
208                        lineCount++;
209                        while (isComment(line)) {
210                                line = reader.readLine();
211                                if (line == null) {
212                                        return null;
213                                }
214                                lineCount++;
215                        }
216
217                        line = applyRecordSeparatorPolicy(line);
218                }
219                catch (IOException e) {
220                        // Prevent IOException from recurring indefinitely
221                        // if client keeps catching and re-calling
222                        noInput = true;
223                        throw new NonTransientFlatFileException("Unable to read from resource: [" + resource + "]", e, line,
224                                        lineCount);
225                }
226                return line;
227        }
228
229        private boolean isComment(String line) {
230                for (String prefix : comments) {
231                        if (line.startsWith(prefix)) {
232                                return true;
233                        }
234                }
235                return false;
236        }
237
238        @Override
239        protected void doClose() throws Exception {
240                lineCount = 0;
241                if (reader != null) {
242                        reader.close();
243                }
244        }
245
246        @Override
247        protected void doOpen() throws Exception {
248                Assert.notNull(resource, "Input resource must be set");
249                Assert.notNull(recordSeparatorPolicy, "RecordSeparatorPolicy must be set");
250
251                noInput = true;
252                if (!resource.exists()) {
253                        if (strict) {
254                                throw new IllegalStateException("Input resource must exist (reader is in 'strict' mode): " + resource);
255                        }
256                        logger.warn("Input resource does not exist " + resource.getDescription());
257                        return;
258                }
259
260                if (!resource.isReadable()) {
261                        if (strict) {
262                                throw new IllegalStateException("Input resource must be readable (reader is in 'strict' mode): "
263                                                + resource);
264                        }
265                        logger.warn("Input resource is not readable " + resource.getDescription());
266                        return;
267                }
268
269                reader = bufferedReaderFactory.create(resource, encoding);
270                for (int i = 0; i < linesToSkip; i++) {
271                        String line = readLine();
272                        if (skippedLinesCallback != null) {
273                                skippedLinesCallback.handleLine(line);
274                        }
275                }
276                noInput = false;
277        }
278
279    @Override
280        public void afterPropertiesSet() throws Exception {
281                Assert.notNull(lineMapper, "LineMapper is required");
282        }
283
284        @Override
285        protected void jumpToItem(int itemIndex) throws Exception {
286                for (int i = 0; i < itemIndex; i++) {
287                        readLine();
288                }
289        }
290
291        private String applyRecordSeparatorPolicy(String line) throws IOException {
292
293                String record = line;
294                while (line != null && !recordSeparatorPolicy.isEndOfRecord(record)) {
295                        line = this.reader.readLine();
296                        if (line == null) {
297                                if (StringUtils.hasText(record)) {
298                                        // A record was partially complete since it hasn't ended but
299                                        // the line is null
300                                        throw new FlatFileParseException("Unexpected end of file before record complete", record, lineCount);
301                                }
302                                else {
303                                        // Record has no text but it might still be post processed
304                                        // to something (skipping preProcess since that was already
305                                        // done)
306                                        break;
307                                }
308                        }
309                        else {
310                                lineCount++;
311                        }
312                        record = recordSeparatorPolicy.preProcess(record) + line;
313                }
314
315                return recordSeparatorPolicy.postProcess(record);
316
317        }
318
319}