001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.item.support; 018 019import java.io.BufferedWriter; 020import java.io.File; 021import java.io.FileOutputStream; 022import java.io.IOException; 023import java.io.Writer; 024import java.nio.channels.Channels; 025import java.nio.channels.FileChannel; 026import java.nio.charset.UnsupportedCharsetException; 027import java.util.List; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031 032import org.springframework.batch.item.ExecutionContext; 033import org.springframework.batch.item.ItemStream; 034import org.springframework.batch.item.ItemStreamException; 035import org.springframework.batch.item.WriteFailedException; 036import org.springframework.batch.item.WriterNotOpenException; 037import org.springframework.batch.item.file.FlatFileFooterCallback; 038import org.springframework.batch.item.file.FlatFileHeaderCallback; 039import org.springframework.batch.item.file.ResourceAwareItemWriterItemStream; 040import org.springframework.batch.item.util.FileUtils; 041import org.springframework.batch.support.transaction.TransactionAwareBufferedWriter; 042import org.springframework.beans.factory.InitializingBean; 043import org.springframework.core.io.Resource; 044import org.springframework.util.Assert; 045 046/** 047 * Base class for item writers that write data to a file or stream. 048 * This class provides common features like restart, force sync, append etc. 049 * The location of the output file is defined by a {@link Resource} which must 050 * represent a writable file.<br> 051 * 052 * Uses buffered writer to improve performance.<br> 053 * 054 * The implementation is <b>not</b> thread-safe. 055 * 056 * @author Waseem Malik 057 * @author Tomas Slanina 058 * @author Robert Kasanicky 059 * @author Dave Syer 060 * @author Michael Minella 061 * @author Mahmoud Ben Hassine 062 * 063 * @since 4.1 064 */ 065public abstract class AbstractFileItemWriter<T> extends AbstractItemStreamItemWriter<T> 066 implements ResourceAwareItemWriterItemStream<T>, InitializingBean { 067 068 public static final boolean DEFAULT_TRANSACTIONAL = true; 069 070 protected static final Log logger = LogFactory.getLog(AbstractFileItemWriter.class); 071 072 public static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator"); 073 074 // default encoding for writing to output files - set to UTF-8. 075 public static final String DEFAULT_CHARSET = "UTF-8"; 076 077 private static final String WRITTEN_STATISTICS_NAME = "written"; 078 079 private static final String RESTART_DATA_NAME = "current.count"; 080 081 private Resource resource; 082 083 protected OutputState state = null; 084 085 private boolean saveState = true; 086 087 private boolean forceSync = false; 088 089 protected boolean shouldDeleteIfExists = true; 090 091 private boolean shouldDeleteIfEmpty = false; 092 093 private String encoding = DEFAULT_CHARSET; 094 095 private FlatFileHeaderCallback headerCallback; 096 097 private FlatFileFooterCallback footerCallback; 098 099 protected String lineSeparator = DEFAULT_LINE_SEPARATOR; 100 101 private boolean transactional = DEFAULT_TRANSACTIONAL; 102 103 protected boolean append = false; 104 105 /** 106 * Flag to indicate that changes should be force-synced to disk on flush. 107 * Defaults to false, which means that even with a local disk changes could 108 * be lost if the OS crashes in between a write and a cache flush. Setting 109 * to true may result in slower performance for usage patterns involving many 110 * frequent writes. 111 * 112 * @param forceSync the flag value to set 113 */ 114 public void setForceSync(boolean forceSync) { 115 this.forceSync = forceSync; 116 } 117 118 /** 119 * Public setter for the line separator. Defaults to the System property 120 * line.separator. 121 * @param lineSeparator the line separator to set 122 */ 123 public void setLineSeparator(String lineSeparator) { 124 this.lineSeparator = lineSeparator; 125 } 126 127 /** 128 * Setter for resource. Represents a file that can be written. 129 * 130 * @param resource the resource to be written to 131 */ 132 @Override 133 public void setResource(Resource resource) { 134 this.resource = resource; 135 } 136 137 /** 138 * Sets encoding for output template. 139 * 140 * @param newEncoding {@link String} containing the encoding to be used for 141 * the writer. 142 */ 143 public void setEncoding(String newEncoding) { 144 this.encoding = newEncoding; 145 } 146 147 /** 148 * Flag to indicate that the target file should be deleted if it already 149 * exists, otherwise it will be created. Defaults to true, so no appending 150 * except on restart. If set to false and {@link #setAppendAllowed(boolean) 151 * appendAllowed} is also false then there will be an exception when the 152 * stream is opened to prevent existing data being potentially corrupted. 153 * 154 * @param shouldDeleteIfExists the flag value to set 155 */ 156 public void setShouldDeleteIfExists(boolean shouldDeleteIfExists) { 157 this.shouldDeleteIfExists = shouldDeleteIfExists; 158 } 159 160 /** 161 * Flag to indicate that the target file should be appended if it already 162 * exists. If this flag is set then the flag 163 * {@link #setShouldDeleteIfExists(boolean) shouldDeleteIfExists} is 164 * automatically set to false, so that flag should not be set explicitly. 165 * Defaults value is false. 166 * 167 * @param append the flag value to set 168 */ 169 public void setAppendAllowed(boolean append) { 170 this.append = append; 171 } 172 173 /** 174 * Flag to indicate that the target file should be deleted if no lines have 175 * been written (other than header and footer) on close. Defaults to false. 176 * 177 * @param shouldDeleteIfEmpty the flag value to set 178 */ 179 public void setShouldDeleteIfEmpty(boolean shouldDeleteIfEmpty) { 180 this.shouldDeleteIfEmpty = shouldDeleteIfEmpty; 181 } 182 183 /** 184 * Set the flag indicating whether or not state should be saved in the 185 * provided {@link ExecutionContext} during the {@link ItemStream} call to 186 * update. Setting this to false means that it will always start at the 187 * beginning on a restart. 188 * 189 * @param saveState if true, state will be persisted 190 */ 191 public void setSaveState(boolean saveState) { 192 this.saveState = saveState; 193 } 194 195 /** 196 * headerCallback will be called before writing the first item to file. 197 * Newline will be automatically appended after the header is written. 198 * 199 * @param headerCallback {@link FlatFileHeaderCallback} to generate the header 200 * 201 */ 202 public void setHeaderCallback(FlatFileHeaderCallback headerCallback) { 203 this.headerCallback = headerCallback; 204 } 205 206 /** 207 * footerCallback will be called after writing the last item to file, but 208 * before the file is closed. 209 * 210 * @param footerCallback {@link FlatFileFooterCallback} to generate the footer 211 * 212 */ 213 public void setFooterCallback(FlatFileFooterCallback footerCallback) { 214 this.footerCallback = footerCallback; 215 } 216 217 /** 218 * Flag to indicate that writing to the buffer should be delayed if a 219 * transaction is active. Defaults to true. 220 * 221 * @param transactional true if writing to buffer should be delayed. 222 * 223 */ 224 public void setTransactional(boolean transactional) { 225 this.transactional = transactional; 226 } 227 228 /** 229 * Writes out a string followed by a "new line", where the format of the new 230 * line separator is determined by the underlying operating system. 231 * 232 * @param items list of items to be written to output stream 233 * @throws Exception if an error occurs while writing items to the output stream 234 */ 235 @Override 236 public void write(List<? extends T> items) throws Exception { 237 if (!getOutputState().isInitialized()) { 238 throw new WriterNotOpenException("Writer must be open before it can be written to"); 239 } 240 241 if (logger.isDebugEnabled()) { 242 logger.debug("Writing to file with " + items.size() + " items."); 243 } 244 245 OutputState state = getOutputState(); 246 247 String lines = doWrite(items); 248 try { 249 state.write(lines); 250 } 251 catch (IOException e) { 252 throw new WriteFailedException("Could not write data. The file may be corrupt.", e); 253 } 254 state.setLinesWritten(state.getLinesWritten() + items.size()); 255 } 256 257 /** 258 * Write out a string of items followed by a "new line", where the format of the new 259 * line separator is determined by the underlying operating system. 260 * @param items to be written 261 * @return written lines 262 */ 263 protected abstract String doWrite(List<? extends T> items); 264 265 /** 266 * @see ItemStream#close() 267 */ 268 @Override 269 public void close() { 270 super.close(); 271 if (state != null) { 272 try { 273 if (footerCallback != null && state.outputBufferedWriter != null) { 274 footerCallback.writeFooter(state.outputBufferedWriter); 275 state.outputBufferedWriter.flush(); 276 } 277 } 278 catch (IOException e) { 279 throw new ItemStreamException("Failed to write footer before closing", e); 280 } 281 finally { 282 state.close(); 283 if (state.linesWritten == 0 && shouldDeleteIfEmpty) { 284 try { 285 resource.getFile().delete(); 286 } 287 catch (IOException e) { 288 throw new ItemStreamException("Failed to delete empty file on close", e); 289 } 290 } 291 state = null; 292 } 293 } 294 } 295 296 /** 297 * Initialize the reader. This method may be called multiple times before 298 * close is called. 299 * 300 * @see ItemStream#open(ExecutionContext) 301 */ 302 @Override 303 public void open(ExecutionContext executionContext) throws ItemStreamException { 304 super.open(executionContext); 305 306 Assert.notNull(resource, "The resource must be set"); 307 308 if (!getOutputState().isInitialized()) { 309 doOpen(executionContext); 310 } 311 } 312 313 private void doOpen(ExecutionContext executionContext) throws ItemStreamException { 314 OutputState outputState = getOutputState(); 315 if (executionContext.containsKey(getExecutionContextKey(RESTART_DATA_NAME))) { 316 outputState.restoreFrom(executionContext); 317 } 318 try { 319 outputState.initializeBufferedWriter(); 320 } 321 catch (IOException ioe) { 322 throw new ItemStreamException("Failed to initialize writer", ioe); 323 } 324 if (outputState.lastMarkedByteOffsetPosition == 0 && !outputState.appending) { 325 ack != null) { 326 try { 327 headerCallback.writeHeader(outputState.outputBufferedWriter); 328 outputState.write(lineSeparator); 329 } 330 catch (IOException e) { 331 throw new ItemStreamException("Could not write headers. The file may be corrupt.", e); 332 } 333 } 334 } 335 } 336 337 /** 338 * @see ItemStream#update(ExecutionContext) 339 */ 340 @Override 341 public void update(ExecutionContext executionContext) { 342 super.update(executionContext); 343 if (state == null) { 344 throw new ItemStreamException("ItemStream not open or already closed."); 345 } 346 347 Assert.notNull(executionContext, "ExecutionContext must not be null"); 348 349 if (saveState) { 350 351 try { 352 executionContext.putLong(getExecutionContextKey(RESTART_DATA_NAME), state.position()); 353 } 354 catch (IOException e) { 355 throw new ItemStreamException("ItemStream does not return current position properly", e); 356 } 357 358 executionContext.putLong(getExecutionContextKey(WRITTEN_STATISTICS_NAME), state.linesWritten); 359 } 360 } 361 362 // Returns object representing state. 363 protected OutputState getOutputState() { 364 if (state == null) { 365 File file; 366 try { 367 file = resource.getFile(); 368 } 369 catch (IOException e) { 370 throw new ItemStreamException("Could not convert resource to file: [" + resource + "]", e); 371 } 372 Assert.state(!file.exists() || file.canWrite(), "Resource is not writable: [" + resource + "]"); 373 state = new OutputState(); 374 state.setDeleteIfExists(shouldDeleteIfExists); 375 state.setAppendAllowed(append); 376 state.setEncoding(encoding); 377 } 378 return state; 379 } 380 381 /** 382 * Encapsulates the runtime state of the writer. All state changing 383 * operations on the writer go through this class. 384 */ 385 protected class OutputState { 386 387 private FileOutputStream os; 388 389 // The bufferedWriter over the file channel that is actually written 390 Writer outputBufferedWriter; 391 392 FileChannel fileChannel; 393 394 // this represents the charset encoding (if any is needed) for the 395 // output file 396 String encoding = DEFAULT_CHARSET; 397 398 boolean restarted = false; 399 400 long lastMarkedByteOffsetPosition = 0; 401 402 long linesWritten = 0; 403 404 boolean shouldDeleteIfExists = true; 405 406 boolean initialized = false; 407 408 private boolean append = false; 409 410 private boolean appending = false; 411 412 /** 413 * Return the byte offset position of the cursor in the output file as a 414 * long integer. 415 */ 416 public long position() throws IOException { 417 long pos = 0; 418 419 if (fileChannel == null) { 420 return 0; 421 } 422 423 outputBufferedWriter.flush(); 424 pos = fileChannel.position(); 425 if (transactional) { 426 pos += ((TransactionAwareBufferedWriter) outputBufferedWriter).getBufferSize(); 427 } 428 429 return pos; 430 431 } 432 433 /** 434 * @param append if true, append to previously created file 435 */ 436 public void setAppendAllowed(boolean append) { 437 this.append = append; 438 } 439 440 /** 441 * @param executionContext state from which to restore writing from 442 */ 443 public void restoreFrom(ExecutionContext executionContext) { 444 lastMarkedByteOffsetPosition = executionContext.getLong(getExecutionContextKey(RESTART_DATA_NAME)); 445 linesWritten = executionContext.getLong(getExecutionContextKey(WRITTEN_STATISTICS_NAME)); 446 if (shouldDeleteIfEmpty && linesWritten == 0) { 447 // previous execution deleted the output file because no items were written 448 restarted = false; 449 lastMarkedByteOffsetPosition = 0; 450 } else { 451 restarted = true; 452 } 453 } 454 455 /** 456 * @param shouldDeleteIfExists indicator 457 */ 458 public void setDeleteIfExists(boolean shouldDeleteIfExists) { 459 this.shouldDeleteIfExists = shouldDeleteIfExists; 460 } 461 462 /** 463 * @param encoding file encoding 464 */ 465 public void setEncoding(String encoding) { 466 this.encoding = encoding; 467 } 468 469 public long getLinesWritten() { 470 return linesWritten; 471 } 472 473 public void setLinesWritten(long linesWritten) { 474 this.linesWritten = linesWritten; 475 } 476 477 /** 478 * Close the open resource and reset counters. 479 */ 480 public void close() { 481 482 initialized = false; 483 restarted = false; 484 try { 485 if (outputBufferedWriter != null) { 486 outputBufferedWriter.close(); 487 } 488 } 489 catch (IOException ioe) { 490 throw new ItemStreamException("Unable to close the the ItemWriter", ioe); 491 } 492 finally { 493 if (!transactional) { 494 closeStream(); 495 } 496 } 497 } 498 499 private void closeStream() { 500 try { 501 if (fileChannel != null) { 502 fileChannel.close(); 503 } 504 } 505 catch (IOException ioe) { 506 throw new ItemStreamException("Unable to close the the ItemWriter", ioe); 507 } 508 finally { 509 try { 510 if (os != null) { 511 os.close(); 512 } 513 } 514 catch (IOException ioe) { 515 throw new ItemStreamException("Unable to close the the ItemWriter", ioe); 516 } 517 } 518 } 519 520 /** 521 * @param line String to be written to the file 522 * @throws IOException 523 */ 524 public void write(String line) throws IOException { 525 if (!initialized) { 526 initializeBufferedWriter(); 527 } 528 529 outputBufferedWriter.write(line); 530 outputBufferedWriter.flush(); 531 } 532 533 /** 534 * Truncate the output at the last known good point. 535 * 536 * @throws IOException if unable to work with file 537 */ 538 public void truncate() throws IOException { 539 fileChannel.truncate(lastMarkedByteOffsetPosition); 540 fileChannel.position(lastMarkedByteOffsetPosition); 541 } 542 543 /** 544 * Creates the buffered writer for the output file channel based on 545 * configuration information. 546 * @throws IOException if unable to initialize buffer 547 */ 548 private void initializeBufferedWriter() throws IOException { 549 550 File file = resource.getFile(); 551 FileUtils.setUpOutputFile(file, restarted, append, shouldDeleteIfExists); 552 553 os = new FileOutputStream(file.getAbsolutePath(), true); 554 fileChannel = os.getChannel(); 555 556 outputBufferedWriter = getBufferedWriter(fileChannel, encoding); 557 outputBufferedWriter.flush(); 558 559 if (append) { 560 // Bug in IO library? This doesn't work... 561 // lastMarkedByteOffsetPosition = fileChannel.position(); 562 if (file.length() > 0) { 563 appending = true; 564 // Don't write the headers again 565 } 566 } 567 568 Assert.state(outputBufferedWriter != null, 569 "Unable to initialize buffered writer"); 570 // in case of restarting reset position to last committed point 571 if (restarted) { 572 checkFileSize(); 573 truncate(); 574 } 575 576 initialized = true; 577 } 578 579 public boolean isInitialized() { 580 return initialized; 581 } 582 583 /** 584 * Returns the buffered writer opened to the beginning of the file 585 * specified by the absolute path name contained in absoluteFileName. 586 */ 587 private Writer getBufferedWriter(FileChannel fileChannel, String encoding) { 588 try { 589 final FileChannel channel = fileChannel; 590 if (transactional) { 591 TransactionAwareBufferedWriter writer = new TransactionAwareBufferedWriter(channel, () -> closeStream()); 592 593 writer.setEncoding(encoding); 594 writer.setForceSync(forceSync); 595 return writer; 596 } 597 else { 598 Writer writer = new BufferedWriter(Channels.newWriter(fileChannel, encoding)) { 599 @Override 600 public void flush() throws IOException { 601 super.flush(); 602 if (forceSync) { 603 channel.force(false); 604 } 605 } 606 }; 607 608 return writer; 609 } 610 } 611 catch (UnsupportedCharsetException ucse) { 612 throw new ItemStreamException("Bad encoding configuration for output file " + fileChannel, ucse); 613 } 614 } 615 616 /** 617 * Checks (on setState) to make sure that the current output file's size 618 * is not smaller than the last saved commit point. If it is, then the 619 * file has been damaged in some way and whole task must be started over 620 * again from the beginning. 621 * @throws IOException if there is an IO problem 622 */ 623 private void checkFileSize() throws IOException { 624 long size = -1; 625 626 outputBufferedWriter.flush(); 627 size = fileChannel.size(); 628 629 if (size < lastMarkedByteOffsetPosition) { 630 throw new ItemStreamException("Current file size is smaller than size at last commit"); 631 } 632 } 633 634 } 635 636}