001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.item.file;
018
019import java.util.Arrays;
020import java.util.Comparator;
021
022import org.apache.commons.logging.Log;
023import org.apache.commons.logging.LogFactory;
024import org.springframework.batch.item.ExecutionContext;
025import org.springframework.batch.item.ItemStream;
026import org.springframework.batch.item.ItemStreamException;
027import org.springframework.batch.item.ParseException;
028import org.springframework.batch.item.ResourceAware;
029import org.springframework.batch.item.UnexpectedInputException;
030import org.springframework.batch.item.support.AbstractItemStreamItemReader;
031import org.springframework.core.io.Resource;
032import org.springframework.lang.Nullable;
033import org.springframework.util.Assert;
034import org.springframework.util.ClassUtils;
035
036/**
037 * Reads items from multiple resources sequentially - resource list is given by {@link #setResources(Resource[])}, the
038 * actual reading is delegated to {@link #setDelegate(ResourceAwareItemReaderItemStream)}.
039 * 
040 * Input resources are ordered using {@link #setComparator(Comparator)} to make sure resource ordering is preserved
041 * between job runs in restart scenario.
042 * 
043 * 
044 * @author Robert Kasanicky
045 * @author Lucas Ward
046 * @author Mahmoud Ben Hassine
047 */
048public class MultiResourceItemReader<T> extends AbstractItemStreamItemReader<T> {
049
050        private static final Log logger = LogFactory.getLog(MultiResourceItemReader.class);
051
052        private static final String RESOURCE_KEY = "resourceIndex";
053
054        private ResourceAwareItemReaderItemStream<? extends T> delegate;
055
056        private Resource[] resources;
057
058        private boolean saveState = true;
059
060        private int currentResource = -1;
061
062        // signals there are no resources to read -> just return null on first read
063        private boolean noInput;
064
065        private boolean strict = false;
066
067        /**
068         * In strict mode the reader will throw an exception on
069         * {@link #open(org.springframework.batch.item.ExecutionContext)} if there are no resources to read.
070         * @param strict false by default
071         */
072        public void setStrict(boolean strict) {
073                this.strict = strict;
074        }
075
076        private Comparator<Resource> comparator = new Comparator<Resource>() {
077
078                /**
079                 * Compares resource filenames.
080                 */
081                @Override
082                public int compare(Resource r1, Resource r2) {
083                        return r1.getFilename().compareTo(r2.getFilename());
084                }
085
086        };
087
088        public MultiResourceItemReader() {
089                this.setExecutionContextName(ClassUtils.getShortName(MultiResourceItemReader.class));
090        }
091
092        /**
093         * Reads the next item, jumping to next resource if necessary.
094         */
095        @Override
096        public T read() throws Exception, UnexpectedInputException, ParseException {
097
098                if (noInput) {
099                        return null;
100                }
101
102                // If there is no resource, then this is the first item, set the current
103                // resource to 0 and open the first delegate.
104                if (currentResource == -1) {
105                        currentResource = 0;
106                        delegate.setResource(resources[currentResource]);
107                        delegate.open(new ExecutionContext());
108                }
109
110                return readNextItem();
111        }
112
113        /**
114         * Use the delegate to read the next item, jump to next resource if current one is exhausted. Items are appended to
115         * the buffer.
116         * 
117         * @return next item from input
118         */
119        private T readNextItem() throws Exception {
120
121                T item = readFromDelegate();
122
123                while (item == null) {
124
125                        currentResource++;
126
127                        if (currentResource >= resources.length) {
128                                return null;
129                        }
130
131                        delegate.close();
132                        delegate.setResource(resources[currentResource]);
133                        delegate.open(new ExecutionContext());
134
135                        item = readFromDelegate();
136                }
137
138                return item;
139        }
140
141        private T readFromDelegate() throws Exception {
142                T item = delegate.read();
143                if(item instanceof ResourceAware){
144                        ((ResourceAware) item).setResource(getCurrentResource());
145                }
146                return item;
147        }
148
149        /**
150         * Close the {@link #setDelegate(ResourceAwareItemReaderItemStream)} reader and reset instance variable values.
151         */
152        @Override
153        public void close() throws ItemStreamException {
154                super.close();
155
156                if(!this.noInput) {
157                        delegate.close();
158                }
159
160                noInput = false;
161        }
162
163        /**
164         * Figure out which resource to start with in case of restart, open the delegate and restore delegate's position in
165         * the resource.
166         */
167        @Override
168        public void open(ExecutionContext executionContext) throws ItemStreamException {
169                super.open(executionContext);
170                Assert.notNull(resources, "Resources must be set");
171
172                noInput = false;
173                if (resources.length == 0) {
174                        if (strict) {
175                                throw new IllegalStateException(
176                                                "No resources to read. Set strict=false if this is not an error condition.");
177                        }
178                        else {
179                                logger.warn("No resources to read. Set strict=true if this should be an error condition.");
180                                noInput = true;
181                                return;
182                        }
183                }
184
185                Arrays.sort(resources, comparator);
186
187                if (executionContext.containsKey(getExecutionContextKey(RESOURCE_KEY))) {
188                        currentResource = executionContext.getInt(getExecutionContextKey(RESOURCE_KEY));
189
190                        // context could have been saved before reading anything
191                        if (currentResource == -1) {
192                                currentResource = 0;
193                        }
194
195                        delegate.setResource(resources[currentResource]);
196                        delegate.open(executionContext);
197                }
198                else {
199                        currentResource = -1;
200                }
201        }
202
203        /**
204         * Store the current resource index and position in the resource.
205         */
206        @Override
207        public void update(ExecutionContext executionContext) throws ItemStreamException {
208                super.update(executionContext);
209                if (saveState) {
210                        executionContext.putInt(getExecutionContextKey(RESOURCE_KEY), currentResource);
211                        delegate.update(executionContext);
212                }
213        }
214
215        /**
216         * @param delegate reads items from single {@link Resource}.
217         */
218        public void setDelegate(ResourceAwareItemReaderItemStream<? extends T> delegate) {
219                this.delegate = delegate;
220        }
221
222        /**
223         * Set the boolean indicating whether or not state should be saved in the provided {@link ExecutionContext} during
224         * the {@link ItemStream} call to update.
225         * 
226         * @param saveState true to update ExecutionContext. False do not update
227         * ExecutionContext.
228         */
229        public void setSaveState(boolean saveState) {
230                this.saveState = saveState;
231        }
232
233        /**
234         * @param comparator used to order the injected resources, by default compares {@link Resource#getFilename()}
235         * values.
236         */
237        public void setComparator(Comparator<Resource> comparator) {
238                this.comparator = comparator;
239        }
240
241        /**
242         * @param resources input resources
243         */
244        public void setResources(Resource[] resources) {
245                Assert.notNull(resources, "The resources must not be null");
246                this.resources = Arrays.asList(resources).toArray(new Resource[resources.length]);
247        }
248
249        /**
250         * Getter for the current resource.
251         * @return the current resource or {@code null} if all resources have been
252         * processed or the first resource has not been assigned yet.
253         */
254        @Nullable
255        public Resource getCurrentResource() {
256                if (currentResource >= resources.length || currentResource < 0) {
257                        return null;
258                }
259                return resources[currentResource];
260        }
261
262}