001/* 002 * Copyright 2006-2014 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.sample.domain.multiline; 018 019import java.util.ArrayList; 020import java.util.List; 021 022import org.apache.commons.logging.Log; 023import org.apache.commons.logging.LogFactory; 024import org.springframework.batch.item.ItemReader; 025 026/** 027 * An {@link ItemReader} that delivers a list as its item, storing up objects 028 * from the injected {@link ItemReader} until they are ready to be packed out as 029 * a collection. This class must be used as a wrapper for a custom 030 * {@link ItemReader} that can identify the record boundaries. The custom reader 031 * should mark the beginning and end of records by returning an 032 * {@link AggregateItem} which responds true to its query methods 033 * <code>is*()</code>.<br><br> 034 * 035 * This class is thread-safe (it can be used concurrently by multiple threads) 036 * as long as the {@link ItemReader} is also thread-safe. 037 * 038 * @see AggregateItem#isHeader() 039 * @see AggregateItem#isFooter() 040 * 041 * @author Dave Syer 042 * 043 */ 044public class AggregateItemReader<T> implements ItemReader<List<T>> { 045 private static final Log LOG = LogFactory.getLog(AggregateItemReader.class); 046 047 private ItemReader<AggregateItem<T>> itemReader; 048 049 /** 050 * Get the next list of records. 051 * 052 * @see org.springframework.batch.item.ItemReader#read() 053 */ 054 @Override 055 public List<T> read() throws Exception { 056 ResultHolder holder = new ResultHolder(); 057 058 while (process(itemReader.read(), holder)) { 059 continue; 060 } 061 062 if (!holder.isExhausted()) { 063 return holder.getRecords(); 064 } 065 else { 066 return null; 067 } 068 } 069 070 private boolean process(AggregateItem<T> value, ResultHolder holder) { 071 // finish processing if we hit the end of file 072 if (value == null) { 073 LOG.debug("Exhausted ItemReader"); 074 holder.setExhausted(true); 075 return false; 076 } 077 078 // start a new collection 079 if (value.isHeader()) { 080 LOG.debug("Start of new record detected"); 081 return true; 082 } 083 084 // mark we are finished with current collection 085 if (value.isFooter()) { 086 LOG.debug("End of record detected"); 087 return false; 088 } 089 090 // add a simple record to the current collection 091 LOG.debug("Mapping: " + value); 092 holder.addRecord(value.getItem()); 093 return true; 094 } 095 096 public void setItemReader(ItemReader<AggregateItem<T>> itemReader) { 097 this.itemReader = itemReader; 098 } 099 100 /** 101 * Private class for temporary state management while item is being 102 * collected. 103 * 104 * @author Dave Syer 105 * 106 */ 107 private class ResultHolder { 108 private List<T> records = new ArrayList<T>(); 109 private boolean exhausted = false; 110 111 public List<T> getRecords() { 112 return records; 113 } 114 115 public boolean isExhausted() { 116 return exhausted; 117 } 118 119 public void addRecord(T record) { 120 records.add(record); 121 } 122 123 public void setExhausted(boolean exhausted) { 124 this.exhausted = exhausted; 125 } 126 } 127}