001/* 002 * Copyright 2006-2007 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.item.file; 018 019import java.io.BufferedReader; 020import java.io.IOException; 021import java.nio.charset.Charset; 022 023import org.apache.commons.logging.Log; 024import org.apache.commons.logging.LogFactory; 025import org.springframework.batch.item.ItemReader; 026import org.springframework.batch.item.ReaderNotOpenException; 027import org.springframework.batch.item.file.separator.RecordSeparatorPolicy; 028import org.springframework.batch.item.file.separator.SimpleRecordSeparatorPolicy; 029import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; 030import org.springframework.beans.factory.InitializingBean; 031import org.springframework.core.io.Resource; 032import org.springframework.util.Assert; 033import org.springframework.util.ClassUtils; 034import org.springframework.util.StringUtils; 035 036/** 037 * Restartable {@link ItemReader} that reads lines from input {@link #setResource(Resource)}. Line is defined by the 038 * {@link #setRecordSeparatorPolicy(RecordSeparatorPolicy)} and mapped to item using {@link #setLineMapper(LineMapper)}. 039 * If an exception is thrown during line mapping it is rethrown as {@link FlatFileParseException} adding information 040 * about the problematic line and its line number. 041 * 042 * @author Robert Kasanicky 043 * @author Mahmoud Ben Hassine 044 */ 045public class FlatFileItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements 046 ResourceAwareItemReaderItemStream<T>, InitializingBean { 047 048 private static final Log logger = LogFactory.getLog(FlatFileItemReader.class); 049 050 // default encoding for input files 051 public static final String DEFAULT_CHARSET = Charset.defaultCharset().name(); 052 053 public static final String[] DEFAULT_COMMENT_PREFIXES = new String[] { "#" }; 054 055 private RecordSeparatorPolicy recordSeparatorPolicy = new SimpleRecordSeparatorPolicy(); 056 057 private Resource resource; 058 059 private BufferedReader reader; 060 061 private int lineCount = 0; 062 063 private String[] comments = DEFAULT_COMMENT_PREFIXES; 064 065 private boolean noInput = false; 066 067 private String encoding = DEFAULT_CHARSET; 068 069 private LineMapper<T> lineMapper; 070 071 private int linesToSkip = 0; 072 073 private LineCallbackHandler skippedLinesCallback; 074 075 private boolean strict = true; 076 077 private BufferedReaderFactory bufferedReaderFactory = new DefaultBufferedReaderFactory(); 078 079 public FlatFileItemReader() { 080 setName(ClassUtils.getShortName(FlatFileItemReader.class)); 081 } 082 083 /** 084 * In strict mode the reader will throw an exception on 085 * {@link #open(org.springframework.batch.item.ExecutionContext)} if the input resource does not exist. 086 * @param strict <code>true</code> by default 087 */ 088 public void setStrict(boolean strict) { 089 this.strict = strict; 090 } 091 092 /** 093 * @param skippedLinesCallback will be called for each one of the initial skipped lines before any items are read. 094 */ 095 public void setSkippedLinesCallback(LineCallbackHandler skippedLinesCallback) { 096 this.skippedLinesCallback = skippedLinesCallback; 097 } 098 099 /** 100 * Public setter for the number of lines to skip at the start of a file. Can be used if the file contains a header 101 * without useful (column name) information, and without a comment delimiter at the beginning of the lines. 102 * 103 * @param linesToSkip the number of lines to skip 104 */ 105 public void setLinesToSkip(int linesToSkip) { 106 this.linesToSkip = linesToSkip; 107 } 108 109 /** 110 * Setter for line mapper. This property is required to be set. 111 * @param lineMapper maps line to item 112 */ 113 public void setLineMapper(LineMapper<T> lineMapper) { 114 this.lineMapper = lineMapper; 115 } 116 117 /** 118 * Setter for the encoding for this input source. Default value is {@link #DEFAULT_CHARSET}. 119 * 120 * @param encoding a properties object which possibly contains the encoding for this input file; 121 */ 122 public void setEncoding(String encoding) { 123 this.encoding = encoding; 124 } 125 126 /** 127 * Factory for the {@link BufferedReader} that will be used to extract lines from the file. The default is fine for 128 * plain text files, but this is a useful strategy for binary files where the standard BufferedReader from java.io 129 * is limiting. 130 * 131 * @param bufferedReaderFactory the bufferedReaderFactory to set 132 */ 133 public void setBufferedReaderFactory(BufferedReaderFactory bufferedReaderFactory) { 134 this.bufferedReaderFactory = bufferedReaderFactory; 135 } 136 137 /** 138 * Setter for comment prefixes. Can be used to ignore header lines as well by using e.g. the first couple of column 139 * names as a prefix. Defaults to {@link #DEFAULT_COMMENT_PREFIXES}. 140 * 141 * @param comments an array of comment line prefixes. 142 */ 143 public void setComments(String[] comments) { 144 this.comments = new String[comments.length]; 145 System.arraycopy(comments, 0, this.comments, 0, comments.length); 146 } 147 148 /** 149 * Public setter for the input resource. 150 */ 151 @Override 152 public void setResource(Resource resource) { 153 this.resource = resource; 154 } 155 156 /** 157 * Public setter for the recordSeparatorPolicy. Used to determine where the line endings are and do things like 158 * continue over a line ending if inside a quoted string. 159 * 160 * @param recordSeparatorPolicy the recordSeparatorPolicy to set 161 */ 162 public void setRecordSeparatorPolicy(RecordSeparatorPolicy recordSeparatorPolicy) { 163 this.recordSeparatorPolicy = recordSeparatorPolicy; 164 } 165 166 /** 167 * @return string corresponding to logical record according to 168 * {@link #setRecordSeparatorPolicy(RecordSeparatorPolicy)} (might span multiple lines in file). 169 */ 170 @Override 171 protected T doRead() throws Exception { 172 if (noInput) { 173 return null; 174 } 175 176 String line = readLine(); 177 178 if (line == null) { 179 return null; 180 } 181 else { 182 try { 183 return lineMapper.mapLine(line, lineCount); 184 } 185 catch (Exception ex) { 186 throw new FlatFileParseException("Parsing error at line: " + lineCount + " in resource=[" 187 + resource.getDescription() + "], input=[" + line + "]", ex, line, lineCount); 188 } 189 } 190 } 191 192 /** 193 * @return next line (skip comments).getCurrentResource 194 */ 195 private String readLine() { 196 197 if (reader == null) { 198 throw new ReaderNotOpenException("Reader must be open before it can be read."); 199 } 200 201 String line = null; 202 203 try { 204 line = this.reader.readLine(); 205 if (line == null) { 206 return null; 207 } 208 lineCount++; 209 while (isComment(line)) { 210 line = reader.readLine(); 211 if (line == null) { 212 return null; 213 } 214 lineCount++; 215 } 216 217 line = applyRecordSeparatorPolicy(line); 218 } 219 catch (IOException e) { 220 // Prevent IOException from recurring indefinitely 221 // if client keeps catching and re-calling 222 noInput = true; 223 throw new NonTransientFlatFileException("Unable to read from resource: [" + resource + "]", e, line, 224 lineCount); 225 } 226 return line; 227 } 228 229 private boolean isComment(String line) { 230 for (String prefix : comments) { 231 if (line.startsWith(prefix)) { 232 return true; 233 } 234 } 235 return false; 236 } 237 238 @Override 239 protected void doClose() throws Exception { 240 lineCount = 0; 241 if (reader != null) { 242 reader.close(); 243 } 244 } 245 246 @Override 247 protected void doOpen() throws Exception { 248 Assert.notNull(resource, "Input resource must be set"); 249 Assert.notNull(recordSeparatorPolicy, "RecordSeparatorPolicy must be set"); 250 251 noInput = true; 252 if (!resource.exists()) { 253 if (strict) { 254 throw new IllegalStateException("Input resource must exist (reader is in 'strict' mode): " + resource); 255 } 256 logger.warn("Input resource does not exist " + resource.getDescription()); 257 return; 258 } 259 260 if (!resource.isReadable()) { 261 if (strict) { 262 throw new IllegalStateException("Input resource must be readable (reader is in 'strict' mode): " 263 + resource); 264 } 265 logger.warn("Input resource is not readable " + resource.getDescription()); 266 return; 267 } 268 269 reader = bufferedReaderFactory.create(resource, encoding); 270 for (int i = 0; i < linesToSkip; i++) { 271 String line = readLine(); 272 if (skippedLinesCallback != null) { 273 skippedLinesCallback.handleLine(line); 274 } 275 } 276 noInput = false; 277 } 278 279 @Override 280 public void afterPropertiesSet() throws Exception { 281 Assert.notNull(lineMapper, "LineMapper is required"); 282 } 283 284 @Override 285 protected void jumpToItem(int itemIndex) throws Exception { 286 for (int i = 0; i < itemIndex; i++) { 287 readLine(); 288 } 289 } 290 291 private String applyRecordSeparatorPolicy(String line) throws IOException { 292 293 String record = line; 294 while (line != null && !recordSeparatorPolicy.isEndOfRecord(record)) { 295 line = this.reader.readLine(); 296 if (line == null) { 297 if (StringUtils.hasText(record)) { 298 // A record was partially complete since it hasn't ended but 299 // the line is null 300 throw new FlatFileParseException("Unexpected end of file before record complete", record, lineCount); 301 } 302 else { 303 // Record has no text but it might still be post processed 304 // to something (skipping preProcess since that was already 305 // done) 306 break; 307 } 308 } 309 else { 310 lineCount++; 311 } 312 record = recordSeparatorPolicy.preProcess(record) + line; 313 } 314 315 return recordSeparatorPolicy.postProcess(record); 316 317 } 318 319}