001/*
002 * Copyright 2006-2014 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.sample.domain.multiline;
018
019import java.util.ArrayList;
020import java.util.List;
021
022import org.apache.commons.logging.Log;
023import org.apache.commons.logging.LogFactory;
024import org.springframework.batch.item.ItemReader;
025
026/**
027 * An {@link ItemReader} that delivers a list as its item, storing up objects
028 * from the injected {@link ItemReader} until they are ready to be packed out as
029 * a collection. This class must be used as a wrapper for a custom
030 * {@link ItemReader} that can identify the record boundaries. The custom reader
031 * should mark the beginning and end of records by returning an
032 * {@link AggregateItem} which responds true to its query methods
033 * <code>is*()</code>.<br><br>
034 * 
035 * This class is thread-safe (it can be used concurrently by multiple threads)
036 * as long as the {@link ItemReader} is also thread-safe.
037 * 
038 * @see AggregateItem#isHeader()
039 * @see AggregateItem#isFooter()
040 * 
041 * @author Dave Syer
042 * 
043 */
044public class AggregateItemReader<T> implements ItemReader<List<T>> {
045        private static final Log LOG = LogFactory.getLog(AggregateItemReader.class);
046
047        private ItemReader<AggregateItem<T>> itemReader;
048
049        /**
050         * Get the next list of records.
051         *
052         * @see org.springframework.batch.item.ItemReader#read()
053         */
054        @Override
055        public List<T> read() throws Exception {
056                ResultHolder holder = new ResultHolder();
057
058                while (process(itemReader.read(), holder)) {
059                        continue;
060                }
061
062                if (!holder.isExhausted()) {
063                        return holder.getRecords();
064                }
065                else {
066                        return null;
067                }
068        }
069
070        private boolean process(AggregateItem<T> value, ResultHolder holder) {
071                // finish processing if we hit the end of file
072                if (value == null) {
073                        LOG.debug("Exhausted ItemReader");
074                        holder.setExhausted(true);
075                        return false;
076                }
077
078                // start a new collection
079                if (value.isHeader()) {
080                        LOG.debug("Start of new record detected");
081                        return true;
082                }
083
084                // mark we are finished with current collection
085                if (value.isFooter()) {
086                        LOG.debug("End of record detected");
087                        return false;
088                }
089
090                // add a simple record to the current collection
091                LOG.debug("Mapping: " + value);
092                holder.addRecord(value.getItem());
093                return true;
094        }
095
096        public void setItemReader(ItemReader<AggregateItem<T>> itemReader) {
097                this.itemReader = itemReader;
098        }
099
100        /**
101         * Private class for temporary state management while item is being
102         * collected.
103         * 
104         * @author Dave Syer
105         * 
106         */
107        private class ResultHolder {
108                private List<T> records = new ArrayList<T>();
109                private boolean exhausted = false;
110
111                public List<T> getRecords() {
112                        return records;
113                }
114
115                public boolean isExhausted() {
116                        return exhausted;
117                }
118
119                public void addRecord(T record) {
120                        records.add(record);
121                }
122
123                public void setExhausted(boolean exhausted) {
124                        this.exhausted = exhausted;
125                }
126        }
127}