001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.step.builder; 017 018import java.lang.reflect.Method; 019import java.util.ArrayList; 020import java.util.HashSet; 021import java.util.LinkedHashSet; 022import java.util.Set; 023import java.util.function.Function; 024 025import org.springframework.batch.core.ChunkListener; 026import org.springframework.batch.core.ItemProcessListener; 027import org.springframework.batch.core.ItemReadListener; 028import org.springframework.batch.core.ItemWriteListener; 029import org.springframework.batch.core.StepExecutionListener; 030import org.springframework.batch.core.StepListener; 031import org.springframework.batch.core.annotation.AfterProcess; 032import org.springframework.batch.core.annotation.AfterRead; 033import org.springframework.batch.core.annotation.AfterWrite; 034import org.springframework.batch.core.annotation.BeforeProcess; 035import org.springframework.batch.core.annotation.BeforeRead; 036import org.springframework.batch.core.annotation.BeforeWrite; 037import org.springframework.batch.core.annotation.OnProcessError; 038import org.springframework.batch.core.annotation.OnReadError; 039import org.springframework.batch.core.annotation.OnWriteError; 040import org.springframework.batch.core.listener.StepListenerFactoryBean; 041import org.springframework.batch.core.step.item.ChunkOrientedTasklet; 042import org.springframework.batch.core.step.item.SimpleChunkProcessor; 043import org.springframework.batch.core.step.item.SimpleChunkProvider; 044import org.springframework.batch.core.step.tasklet.Tasklet; 045import org.springframework.batch.core.step.tasklet.TaskletStep; 046import org.springframework.batch.item.ItemProcessor; 047import org.springframework.batch.item.ItemReader; 048import org.springframework.batch.item.ItemStream; 049import org.springframework.batch.item.ItemWriter; 050import org.springframework.batch.item.function.FunctionItemProcessor; 051import org.springframework.batch.repeat.CompletionPolicy; 052import org.springframework.batch.repeat.RepeatOperations; 053import org.springframework.batch.repeat.policy.SimpleCompletionPolicy; 054import org.springframework.batch.repeat.support.RepeatTemplate; 055import org.springframework.batch.support.ReflectionUtils; 056import org.springframework.util.Assert; 057 058/** 059 * Step builder for simple item processing (chunk oriented) steps. Items are read and cached in chunks, and then 060 * processed (transformed) and written (optionally either the processor or the writer can be omitted) all in the same 061 * transaction. 062 * 063 * @see FaultTolerantStepBuilder for a step that handles retry and skip of failed items 064 * 065 * @author Dave Syer 066 * @author Mahmoud Ben Hassine 067 * 068 * @since 2.2 069 */ 070public class SimpleStepBuilder<I, O> extends AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> { 071 072 private static final int DEFAULT_COMMIT_INTERVAL = 1; 073 074 private ItemReader<? extends I> reader; 075 076 private ItemWriter<? super O> writer; 077 078 private ItemProcessor<? super I, ? extends O> processor; 079 080 private Function<? super I, ? extends O> itemProcessorFunction; 081 082 private int chunkSize = 0; 083 084 private RepeatOperations chunkOperations; 085 086 private CompletionPolicy completionPolicy; 087 088 private Set<StepListener> itemListeners = new LinkedHashSet<>(); 089 090 private boolean readerTransactionalQueue = false; 091 092 /** 093 * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used. 094 * 095 * @param parent a parent helper containing common step properties 096 */ 097 public SimpleStepBuilder(StepBuilderHelper<?> parent) { 098 super(parent); 099 } 100 101 /** 102 * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used. 103 * 104 * @param parent a parent helper containing common step properties 105 */ 106 protected SimpleStepBuilder(SimpleStepBuilder<I, O> parent) { 107 super(parent); 108 this.chunkSize = parent.chunkSize; 109 this.completionPolicy = parent.completionPolicy; 110 this.chunkOperations = parent.chunkOperations; 111 this.reader = parent.reader; 112 this.writer = parent.writer; 113 this.processor = parent.processor; 114 this.itemListeners = parent.itemListeners; 115 this.readerTransactionalQueue = parent.readerTransactionalQueue; 116 } 117 118 public FaultTolerantStepBuilder<I, O> faultTolerant() { 119 return new FaultTolerantStepBuilder<>(this); 120 } 121 122 /** 123 * Build a step with the reader, writer, processor as provided. 124 * 125 * @see org.springframework.batch.core.step.builder.AbstractTaskletStepBuilder#build() 126 */ 127 @Override 128 public TaskletStep build() { 129 130 registerStepListenerAsItemListener(); 131 registerAsStreamsAndListeners(reader, processor, writer); 132 return super.build(); 133 } 134 135 protected void registerStepListenerAsItemListener() { 136 for (StepExecutionListener stepExecutionListener: properties.getStepExecutionListeners()){ 137 checkAndAddItemListener(stepExecutionListener); 138 } 139 for (ChunkListener chunkListener: this.chunkListeners){ 140 checkAndAddItemListener(chunkListener); 141 } 142 } 143 144 @SuppressWarnings("unchecked") 145 private void checkAndAddItemListener(StepListener stepListener) { 146 if (stepListener instanceof ItemReadListener){ 147 listener((ItemReadListener<I>)stepListener); 148 } 149 if (stepListener instanceof ItemProcessListener){ 150 listener((ItemProcessListener<I,O>)stepListener); 151 } 152 if (stepListener instanceof ItemWriteListener){ 153 listener((ItemWriteListener<O>)stepListener); 154 } 155 } 156 157 @Override 158 protected Tasklet createTasklet() { 159 Assert.state(reader != null, "ItemReader must be provided"); 160 Assert.state(writer != null, "ItemWriter must be provided"); 161 RepeatOperations repeatOperations = createChunkOperations(); 162 SimpleChunkProvider<I> chunkProvider = new SimpleChunkProvider<>(getReader(), repeatOperations); 163 SimpleChunkProcessor<I, O> chunkProcessor = new SimpleChunkProcessor<>(getProcessor(), getWriter()); 164 chunkProvider.setListeners(new ArrayList<>(itemListeners)); 165 chunkProcessor.setListeners(new ArrayList<>(itemListeners)); 166 ChunkOrientedTasklet<I> tasklet = new ChunkOrientedTasklet<>(chunkProvider, chunkProcessor); 167 tasklet.setBuffering(!readerTransactionalQueue); 168 return tasklet; 169 } 170 171 /** 172 * Sets the chunk size or commit interval for this step. This is the maximum number of items that will be read 173 * before processing starts in a single transaction. Not compatible with {@link #completionPolicy} 174 * . 175 * 176 * @param chunkSize the chunk size (a.k.a commit interval) 177 * @return this for fluent chaining 178 */ 179 public SimpleStepBuilder<I, O> chunk(int chunkSize) { 180 Assert.state(completionPolicy == null || chunkSize == 0, 181 "You must specify either a chunkCompletionPolicy or a commitInterval but not both."); 182 this.chunkSize = chunkSize; 183 return this; 184 } 185 186 /** 187 * Sets a completion policy for the chunk processing. Items are read until this policy determines that a chunk is 188 * complete, giving more control than with just the {@link #chunk(int) chunk size} (or commit interval). 189 * 190 * @param completionPolicy a completion policy for the chunk 191 * @return this for fluent chaining 192 */ 193 public SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) { 194 Assert.state(chunkSize == 0 || completionPolicy == null, 195 "You must specify either a chunkCompletionPolicy or a commitInterval but not both."); 196 this.completionPolicy = completionPolicy; 197 return this; 198 } 199 200 /** 201 * An item reader that provides a stream of items. Will be automatically registered as a {@link #stream(ItemStream)} 202 * or listener if it implements the corresponding interface. By default assumed to be non-transactional. 203 * 204 * @see #readerTransactionalQueue 205 * @param reader an item reader 206 * @return this for fluent chaining 207 */ 208 public SimpleStepBuilder<I, O> reader(ItemReader<? extends I> reader) { 209 this.reader = reader; 210 return this; 211 } 212 213 /** 214 * An item writer that writes a chunk of items. Will be automatically registered as a {@link #stream(ItemStream)} or 215 * listener if it implements the corresponding interface. 216 * 217 * @param writer an item writer 218 * @return this for fluent chaining 219 */ 220 public SimpleStepBuilder<I, O> writer(ItemWriter<? super O> writer) { 221 this.writer = writer; 222 return this; 223 } 224 225 /** 226 * An item processor that processes or transforms a stream of items. Will be automatically registered as a 227 * {@link #stream(ItemStream)} or listener if it implements the corresponding interface. 228 * 229 * @param processor an item processor 230 * @return this for fluent chaining 231 */ 232 public SimpleStepBuilder<I, O> processor(ItemProcessor<? super I, ? extends O> processor) { 233 this.processor = processor; 234 return this; 235 } 236 237 /** 238 * A {@link Function} to be delegated to as an {@link ItemProcessor}. If this is set, 239 * it will take precedence over any {@code ItemProcessor} configured via 240 * {@link #processor(ItemProcessor)}. 241 * 242 * @param function the function to delegate item processing to 243 * @return this for fluent chaining 244 */ 245 public SimpleStepBuilder<I, O> processor(Function<? super I, ? extends O> function) { 246 this.itemProcessorFunction = function; 247 return this; 248 } 249 250 /** 251 * Sets a flag to say that the reader is transactional (usually a queue), which is to say that failed items might be 252 * rolled back and re-presented in a subsequent transaction. Default is false, meaning that the items are read 253 * outside a transaction and possibly cached. 254 * 255 * @return this for fluent chaining 256 */ 257 public SimpleStepBuilder<I, O> readerIsTransactionalQueue() { 258 this.readerTransactionalQueue = true; 259 return this; 260 } 261 262 /** 263 * Registers objects using the annotation based listener configuration. 264 * 265 * @param listener the object that has a method configured with listener annotation 266 * @return this for fluent chaining 267 */ 268 @SuppressWarnings("unchecked") 269 @Override 270 public SimpleStepBuilder<I, O> listener(Object listener) { 271 super.listener(listener); 272 273 Set<Method> itemListenerMethods = new HashSet<>(); 274 itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeRead.class)); 275 itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterRead.class)); 276 itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeProcess.class)); 277 itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterProcess.class)); 278 itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeWrite.class)); 279 itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterWrite.class)); 280 itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnReadError.class)); 281 itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnProcessError.class)); 282 itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnWriteError.class)); 283 284 if(itemListenerMethods.size() > 0) { 285 StepListenerFactoryBean factory = new StepListenerFactoryBean(); 286 factory.setDelegate(listener); 287 itemListeners.add((StepListener) factory.getObject()); 288 } 289 290 @SuppressWarnings("unchecked") 291 SimpleStepBuilder<I, O> result = this; 292 return result; 293 } 294 295 296 /** 297 * Register an item reader listener. 298 * 299 * @param listener the listener to register 300 * @return this for fluent chaining 301 */ 302 public SimpleStepBuilder<I, O> listener(ItemReadListener<? super I> listener) { 303 itemListeners.add(listener); 304 return this; 305 } 306 307 /** 308 * Register an item writer listener. 309 * 310 * @param listener the listener to register 311 * @return this for fluent chaining 312 */ 313 public SimpleStepBuilder<I, O> listener(ItemWriteListener<? super O> listener) { 314 itemListeners.add(listener); 315 return this; 316 } 317 318 /** 319 * Register an item processor listener. 320 * 321 * @param listener the listener to register 322 * @return this for fluent chaining 323 */ 324 public SimpleStepBuilder<I, O> listener(ItemProcessListener<? super I, ? super O> listener) { 325 itemListeners.add(listener); 326 return this; 327 } 328 329 /** 330 * Instead of a {@link #chunk(int) chunk size} or {@link #chunk(CompletionPolicy) completion policy} you can provide 331 * a complete repeat operations instance that handles the iteration over the item reader. 332 * 333 * @param repeatTemplate a complete repeat template for the chunk 334 * @return this for fluent chaining 335 */ 336 public SimpleStepBuilder<I, O> chunkOperations(RepeatOperations repeatTemplate) { 337 this.chunkOperations = repeatTemplate; 338 return this; 339 } 340 341 protected RepeatOperations createChunkOperations() { 342 RepeatOperations repeatOperations = chunkOperations; 343 if (repeatOperations == null) { 344 RepeatTemplate repeatTemplate = new RepeatTemplate(); 345 repeatTemplate.setCompletionPolicy(getChunkCompletionPolicy()); 346 repeatOperations = repeatTemplate; 347 } 348 return repeatOperations; 349 } 350 351 protected ItemReader<? extends I> getReader() { 352 return reader; 353 } 354 355 protected ItemWriter<? super O> getWriter() { 356 return writer; 357 } 358 359 protected ItemProcessor<? super I, ? extends O> getProcessor() { 360 if(this.itemProcessorFunction != null) { 361 this.processor = new FunctionItemProcessor<>(this.itemProcessorFunction); 362 } 363 364 return processor; 365 } 366 367 protected int getChunkSize() { 368 return chunkSize; 369 } 370 371 protected boolean isReaderTransactionalQueue() { 372 return readerTransactionalQueue; 373 } 374 375 protected Set<StepListener> getItemListeners() { 376 return itemListeners; 377 } 378 379 /** 380 * @return a {@link CompletionPolicy} consistent with the chunk size and injected policy (if present). 381 */ 382 protected CompletionPolicy getChunkCompletionPolicy() { 383 Assert.state(!(completionPolicy != null && chunkSize > 0), 384 "You must specify either a chunkCompletionPolicy or a commitInterval but not both."); 385 Assert.state(chunkSize >= 0, "The commitInterval must be positive or zero (for default value)."); 386 387 if (completionPolicy != null) { 388 return completionPolicy; 389 } 390 if (chunkSize == 0) { 391 logger.info("Setting commit interval to default value (" + DEFAULT_COMMIT_INTERVAL + ")"); 392 chunkSize = DEFAULT_COMMIT_INTERVAL; 393 } 394 return new SimpleCompletionPolicy(chunkSize); 395 } 396 397 protected void registerAsStreamsAndListeners(ItemReader<? extends I> itemReader, 398 ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter) { 399 for (Object itemHandler : new Object[] { itemReader, itemWriter, itemProcessor }) { 400 if (itemHandler instanceof ItemStream) { 401 stream((ItemStream) itemHandler); 402 } 403 if (StepListenerFactoryBean.isListener(itemHandler)) { 404 StepListener listener = StepListenerFactoryBean.getListener(itemHandler); 405 if (listener instanceof StepExecutionListener) { 406 listener((StepExecutionListener) listener); 407 } 408 if (listener instanceof ChunkListener) { 409 listener((ChunkListener) listener); 410 } 411 if (listener instanceof ItemReadListener<?> || listener instanceof ItemProcessListener<?, ?> 412 || listener instanceof ItemWriteListener<?>) { 413 itemListeners.add(listener); 414 } 415 } 416 } 417 } 418 419}