001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.core.step.item; 018 019import java.util.List; 020 021import org.springframework.batch.core.StepContribution; 022import org.springframework.batch.core.StepListener; 023import org.springframework.batch.core.listener.MulticasterBatchListener; 024import org.springframework.batch.item.ItemProcessor; 025import org.springframework.batch.item.ItemWriter; 026import org.springframework.beans.factory.InitializingBean; 027import org.springframework.lang.Nullable; 028import org.springframework.util.Assert; 029 030/** 031 * Simple implementation of the {@link ChunkProcessor} interface that handles 032 * basic item writing and processing. Any exceptions encountered will be 033 * rethrown. 034 * 035 * @see ChunkOrientedTasklet 036 */ 037public class SimpleChunkProcessor<I, O> implements ChunkProcessor<I>, InitializingBean { 038 039 private ItemProcessor<? super I, ? extends O> itemProcessor; 040 041 private ItemWriter<? super O> itemWriter; 042 043 private final MulticasterBatchListener<I, O> listener = new MulticasterBatchListener<I, O>(); 044 045 /** 046 * Default constructor for ease of configuration. 047 */ 048 @SuppressWarnings("unused") 049 private SimpleChunkProcessor() { 050 this(null, null); 051 } 052 053 public SimpleChunkProcessor(@Nullable ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter) { 054 this.itemProcessor = itemProcessor; 055 this.itemWriter = itemWriter; 056 } 057 058 public SimpleChunkProcessor(ItemWriter<? super O> itemWriter) { 059 this(null, itemWriter); 060 } 061 062 /** 063 * @param itemProcessor the {@link ItemProcessor} to set 064 */ 065 public void setItemProcessor(ItemProcessor<? super I, ? extends O> itemProcessor) { 066 this.itemProcessor = itemProcessor; 067 } 068 069 /** 070 * @param itemWriter the {@link ItemWriter} to set 071 */ 072 public void setItemWriter(ItemWriter<? super O> itemWriter) { 073 this.itemWriter = itemWriter; 074 } 075 076 /** 077 * Check mandatory properties. 078 * 079 * @see InitializingBean#afterPropertiesSet() 080 */ 081 @Override 082 public void afterPropertiesSet() throws Exception { 083 Assert.notNull(itemWriter, "ItemWriter must be set"); 084 } 085 086 /** 087 * Register some {@link StepListener}s with the handler. Each will get the 088 * callbacks in the order specified at the correct stage. 089 * 090 * @param listeners list of {@link StepListener} instances. 091 */ 092 public void setListeners(List<? extends StepListener> listeners) { 093 for (StepListener listener : listeners) { 094 registerListener(listener); 095 } 096 } 097 098 /** 099 * Register a listener for callbacks at the appropriate stages in a process. 100 * 101 * @param listener a {@link StepListener} 102 */ 103 public void registerListener(StepListener listener) { 104 this.listener.register(listener); 105 } 106 107 /** 108 * @return the listener 109 */ 110 protected MulticasterBatchListener<I, O> getListener() { 111 return listener; 112 } 113 114 /** 115 * @param item the input item 116 * @return the result of the processing 117 * @throws Exception thrown if error occurs. 118 */ 119 protected final O doProcess(I item) throws Exception { 120 121 if (itemProcessor == null) { 122 @SuppressWarnings("unchecked") 123 O result = (O) item; 124 return result; 125 } 126 127 try { 128 listener.beforeProcess(item); 129 O result = itemProcessor.process(item); 130 listener.afterProcess(item, result); 131 return result; 132 } 133 catch (Exception e) { 134 listener.onProcessError(item, e); 135 throw e; 136 } 137 138 } 139 140 /** 141 * Surrounds the actual write call with listener callbacks. 142 * 143 * @param items list of items to be written. 144 * @throws Exception thrown if error occurs. 145 */ 146 protected final void doWrite(List<O> items) throws Exception { 147 148 if (itemWriter == null) { 149 return; 150 } 151 152 try { 153 listener.beforeWrite(items); 154 writeItems(items); 155 doAfterWrite(items); 156 } 157 catch (Exception e) { 158 doOnWriteError(e, items); 159 throw e; 160 } 161 162 } 163 164 /** 165 * Call the listener's after write method. 166 * 167 * @param items list of items that were just written. 168 */ 169 protected final void doAfterWrite(List<O> items) { 170 listener.afterWrite(items); 171 } 172 173 /** 174 * Call listener's writerError method. 175 * @param e exception that occurred. 176 * @param items list of items that failed to be written. 177 */ 178 protected final void doOnWriteError(Exception e, List<O> items) { 179 listener.onWriteError(e, items); 180 } 181 182 /** 183 * @param items list of items to be written. 184 * @throws Exception thrown if error occurs. 185 */ 186 protected void writeItems(List<O> items) throws Exception { 187 if (itemWriter != null) { 188 itemWriter.write(items); 189 } 190 } 191 192 @Override 193 public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception { 194 195 // Allow temporary state to be stored in the user data field 196 initializeUserData(inputs); 197 198 // If there is no input we don't have to do anything more 199 if (isComplete(inputs)) { 200 return; 201 } 202 203 // Make the transformation, calling remove() on the inputs iterator if 204 // any items are filtered. Might throw exception and cause rollback. 205 Chunk<O> outputs = transform(contribution, inputs); 206 207 // Adjust the filter count based on available data 208 contribution.incrementFilterCount(getFilterCount(inputs, outputs)); 209 210 // Adjust the outputs if necessary for housekeeping purposes, and then 211 // write them out... 212 write(contribution, inputs, getAdjustedOutputs(inputs, outputs)); 213 214 } 215 216 /** 217 * Extension point for subclasses to allow them to memorise the contents of 218 * the inputs, in case they are needed for accounting purposes later. The 219 * default implementation sets up some user data to remember the original 220 * size of the inputs. If this method is overridden then some or all of 221 * {@link #isComplete(Chunk)}, {@link #getFilterCount(Chunk, Chunk)} and 222 * {@link #getAdjustedOutputs(Chunk, Chunk)} might also need to be, to 223 * ensure that the user data is handled consistently. 224 * 225 * @param inputs the inputs for the process 226 */ 227 protected void initializeUserData(Chunk<I> inputs) { 228 inputs.setUserData(inputs.size()); 229 } 230 231 /** 232 * Extension point for subclasses to calculate the filter count. Defaults to 233 * the difference between input size and output size. 234 * 235 * @param inputs the inputs after transformation 236 * @param outputs the outputs after transformation 237 * 238 * @return the difference in sizes 239 * 240 * @see #initializeUserData(Chunk) 241 */ 242 protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) { 243 return (Integer) inputs.getUserData() - outputs.size(); 244 } 245 246 /** 247 * Extension point for subclasses that want to store additional data in the 248 * inputs. Default just checks if inputs are empty. 249 * 250 * @param inputs the input chunk 251 * @return true if it is empty 252 * 253 * @see #initializeUserData(Chunk) 254 */ 255 protected boolean isComplete(Chunk<I> inputs) { 256 return inputs.isEmpty(); 257 } 258 259 /** 260 * Extension point for subclasses that want to adjust the outputs based on 261 * additional saved data in the inputs. Default implementation just returns 262 * the outputs unchanged. 263 * 264 * @param inputs the inputs for the transformation 265 * @param outputs the result of the transformation 266 * @return the outputs unchanged 267 * 268 * @see #initializeUserData(Chunk) 269 */ 270 protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) { 271 return outputs; 272 } 273 274 /** 275 * Simple implementation delegates to the {@link #doWrite(List)} method and 276 * increments the write count in the contribution. Subclasses can handle 277 * more complicated scenarios, e.g.with fault tolerance. If output items are 278 * skipped they should be removed from the inputs as well. 279 * 280 * @param contribution the current step contribution 281 * @param inputs the inputs that gave rise to the outputs 282 * @param outputs the outputs to write 283 * @throws Exception if there is a problem 284 */ 285 protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception { 286 try { 287 doWrite(outputs.getItems()); 288 } 289 catch (Exception e) { 290 /* 291 * For a simple chunk processor (no fault tolerance) we are done 292 * here, so prevent any more processing of these inputs. 293 */ 294 inputs.clear(); 295 throw e; 296 } 297 contribution.incrementWriteCount(outputs.size()); 298 } 299 300 protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception { 301 Chunk<O> outputs = new Chunk<O>(); 302 for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) { 303 final I item = iterator.next(); 304 O output; 305 try { 306 output = doProcess(item); 307 } 308 catch (Exception e) { 309 /* 310 * For a simple chunk processor (no fault tolerance) we are done 311 * here, so prevent any more processing of these inputs. 312 */ 313 inputs.clear(); 314 throw e; 315 } 316 if (output != null) { 317 outputs.add(output); 318 } 319 else { 320 iterator.remove(); 321 } 322 } 323 return outputs; 324 } 325 326}