001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.item.file; 018 019import java.util.Arrays; 020import java.util.Comparator; 021 022import org.apache.commons.logging.Log; 023import org.apache.commons.logging.LogFactory; 024import org.springframework.batch.item.ExecutionContext; 025import org.springframework.batch.item.ItemStream; 026import org.springframework.batch.item.ItemStreamException; 027import org.springframework.batch.item.ParseException; 028import org.springframework.batch.item.ResourceAware; 029import org.springframework.batch.item.UnexpectedInputException; 030import org.springframework.batch.item.support.AbstractItemStreamItemReader; 031import org.springframework.core.io.Resource; 032import org.springframework.lang.Nullable; 033import org.springframework.util.Assert; 034import org.springframework.util.ClassUtils; 035 036/** 037 * Reads items from multiple resources sequentially - resource list is given by {@link #setResources(Resource[])}, the 038 * actual reading is delegated to {@link #setDelegate(ResourceAwareItemReaderItemStream)}. 039 * 040 * Input resources are ordered using {@link #setComparator(Comparator)} to make sure resource ordering is preserved 041 * between job runs in restart scenario. 042 * 043 * 044 * @author Robert Kasanicky 045 * @author Lucas Ward 046 * @author Mahmoud Ben Hassine 047 */ 048public class MultiResourceItemReader<T> extends AbstractItemStreamItemReader<T> { 049 050 private static final Log logger = LogFactory.getLog(MultiResourceItemReader.class); 051 052 private static final String RESOURCE_KEY = "resourceIndex"; 053 054 private ResourceAwareItemReaderItemStream<? extends T> delegate; 055 056 private Resource[] resources; 057 058 private boolean saveState = true; 059 060 private int currentResource = -1; 061 062 // signals there are no resources to read -> just return null on first read 063 private boolean noInput; 064 065 private boolean strict = false; 066 067 /** 068 * In strict mode the reader will throw an exception on 069 * {@link #open(org.springframework.batch.item.ExecutionContext)} if there are no resources to read. 070 * @param strict false by default 071 */ 072 public void setStrict(boolean strict) { 073 this.strict = strict; 074 } 075 076 private Comparator<Resource> comparator = new Comparator<Resource>() { 077 078 /** 079 * Compares resource filenames. 080 */ 081 @Override 082 public int compare(Resource r1, Resource r2) { 083 return r1.getFilename().compareTo(r2.getFilename()); 084 } 085 086 }; 087 088 public MultiResourceItemReader() { 089 this.setExecutionContextName(ClassUtils.getShortName(MultiResourceItemReader.class)); 090 } 091 092 /** 093 * Reads the next item, jumping to next resource if necessary. 094 */ 095 @Override 096 public T read() throws Exception, UnexpectedInputException, ParseException { 097 098 if (noInput) { 099 return null; 100 } 101 102 // If there is no resource, then this is the first item, set the current 103 // resource to 0 and open the first delegate. 104 if (currentResource == -1) { 105 currentResource = 0; 106 delegate.setResource(resources[currentResource]); 107 delegate.open(new ExecutionContext()); 108 } 109 110 return readNextItem(); 111 } 112 113 /** 114 * Use the delegate to read the next item, jump to next resource if current one is exhausted. Items are appended to 115 * the buffer. 116 * 117 * @return next item from input 118 */ 119 private T readNextItem() throws Exception { 120 121 T item = readFromDelegate(); 122 123 while (item == null) { 124 125 currentResource++; 126 127 if (currentResource >= resources.length) { 128 return null; 129 } 130 131 delegate.close(); 132 delegate.setResource(resources[currentResource]); 133 delegate.open(new ExecutionContext()); 134 135 item = readFromDelegate(); 136 } 137 138 return item; 139 } 140 141 private T readFromDelegate() throws Exception { 142 T item = delegate.read(); 143 if(item instanceof ResourceAware){ 144 ((ResourceAware) item).setResource(getCurrentResource()); 145 } 146 return item; 147 } 148 149 /** 150 * Close the {@link #setDelegate(ResourceAwareItemReaderItemStream)} reader and reset instance variable values. 151 */ 152 @Override 153 public void close() throws ItemStreamException { 154 super.close(); 155 156 if(!this.noInput) { 157 delegate.close(); 158 } 159 160 noInput = false; 161 } 162 163 /** 164 * Figure out which resource to start with in case of restart, open the delegate and restore delegate's position in 165 * the resource. 166 */ 167 @Override 168 public void open(ExecutionContext executionContext) throws ItemStreamException { 169 super.open(executionContext); 170 Assert.notNull(resources, "Resources must be set"); 171 172 noInput = false; 173 if (resources.length == 0) { 174 if (strict) { 175 throw new IllegalStateException( 176 "No resources to read. Set strict=false if this is not an error condition."); 177 } 178 else { 179 logger.warn("No resources to read. Set strict=true if this should be an error condition."); 180 noInput = true; 181 return; 182 } 183 } 184 185 Arrays.sort(resources, comparator); 186 187 if (executionContext.containsKey(getExecutionContextKey(RESOURCE_KEY))) { 188 currentResource = executionContext.getInt(getExecutionContextKey(RESOURCE_KEY)); 189 190 // context could have been saved before reading anything 191 if (currentResource == -1) { 192 currentResource = 0; 193 } 194 195 delegate.setResource(resources[currentResource]); 196 delegate.open(executionContext); 197 } 198 else { 199 currentResource = -1; 200 } 201 } 202 203 /** 204 * Store the current resource index and position in the resource. 205 */ 206 @Override 207 public void update(ExecutionContext executionContext) throws ItemStreamException { 208 super.update(executionContext); 209 if (saveState) { 210 executionContext.putInt(getExecutionContextKey(RESOURCE_KEY), currentResource); 211 delegate.update(executionContext); 212 } 213 } 214 215 /** 216 * @param delegate reads items from single {@link Resource}. 217 */ 218 public void setDelegate(ResourceAwareItemReaderItemStream<? extends T> delegate) { 219 this.delegate = delegate; 220 } 221 222 /** 223 * Set the boolean indicating whether or not state should be saved in the provided {@link ExecutionContext} during 224 * the {@link ItemStream} call to update. 225 * 226 * @param saveState true to update ExecutionContext. False do not update 227 * ExecutionContext. 228 */ 229 public void setSaveState(boolean saveState) { 230 this.saveState = saveState; 231 } 232 233 /** 234 * @param comparator used to order the injected resources, by default compares {@link Resource#getFilename()} 235 * values. 236 */ 237 public void setComparator(Comparator<Resource> comparator) { 238 this.comparator = comparator; 239 } 240 241 /** 242 * @param resources input resources 243 */ 244 public void setResources(Resource[] resources) { 245 Assert.notNull(resources, "The resources must not be null"); 246 this.resources = Arrays.asList(resources).toArray(new Resource[resources.length]); 247 } 248 249 /** 250 * Getter for the current resource. 251 * @return the current resource or {@code null} if all resources have been 252 * processed or the first resource has not been assigned yet. 253 */ 254 @Nullable 255 public Resource getCurrentResource() { 256 if (currentResource >= resources.length || currentResource < 0) { 257 return null; 258 } 259 return resources[currentResource]; 260 } 261 262}