001/* 002 * Copyright 2006-2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.step.factory; 017 018import org.apache.commons.logging.Log; 019import org.apache.commons.logging.LogFactory; 020import org.springframework.batch.core.ChunkListener; 021import org.springframework.batch.core.ItemProcessListener; 022import org.springframework.batch.core.ItemReadListener; 023import org.springframework.batch.core.ItemWriteListener; 024import org.springframework.batch.core.Step; 025import org.springframework.batch.core.StepExecutionListener; 026import org.springframework.batch.core.StepListener; 027import org.springframework.batch.core.repository.JobRepository; 028import org.springframework.batch.core.step.builder.SimpleStepBuilder; 029import org.springframework.batch.core.step.builder.StepBuilder; 030import org.springframework.batch.core.step.tasklet.TaskletStep; 031import org.springframework.batch.item.ItemProcessor; 032import org.springframework.batch.item.ItemReader; 033import org.springframework.batch.item.ItemStream; 034import org.springframework.batch.item.ItemWriter; 035import org.springframework.batch.repeat.CompletionPolicy; 036import org.springframework.batch.repeat.RepeatOperations; 037import org.springframework.batch.repeat.exception.DefaultExceptionHandler; 038import org.springframework.batch.repeat.exception.ExceptionHandler; 039import org.springframework.batch.repeat.policy.SimpleCompletionPolicy; 040import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate; 041import org.springframework.beans.factory.BeanNameAware; 042import org.springframework.beans.factory.FactoryBean; 043import org.springframework.core.task.TaskExecutor; 044import org.springframework.transaction.PlatformTransactionManager; 045import org.springframework.transaction.annotation.Isolation; 046import org.springframework.transaction.annotation.Propagation; 047import org.springframework.transaction.interceptor.DefaultTransactionAttribute; 048import org.springframework.transaction.interceptor.TransactionAttribute; 049 050/** 051 * Most common configuration options for simple steps should be found here. Use this factory bean instead of creating a 052 * {@link Step} implementation manually. 053 * 054 * This factory does not support configuration of fault-tolerant behavior, use appropriate subclass of this factory bean 055 * to configure skip or retry. 056 * 057 * @see FaultTolerantStepFactoryBean 058 * 059 * @author Dave Syer 060 * @author Robert Kasanicky 061 * 062 */ 063public class SimpleStepFactoryBean<T, S> implements FactoryBean<Step>, BeanNameAware { 064 065 private String name; 066 067 private int startLimit = Integer.MAX_VALUE; 068 069 private boolean allowStartIfComplete; 070 071 private ItemReader<? extends T> itemReader; 072 073 private ItemProcessor<? super T, ? extends S> itemProcessor; 074 075 private ItemWriter<? super S> itemWriter; 076 077 private PlatformTransactionManager transactionManager; 078 079 private Propagation propagation = Propagation.REQUIRED; 080 081 private Isolation isolation = Isolation.DEFAULT; 082 083 private int transactionTimeout = DefaultTransactionAttribute.TIMEOUT_DEFAULT; 084 085 private JobRepository jobRepository; 086 087 private boolean singleton = true; 088 089 private ItemStream[] streams = new ItemStream[0]; 090 091 private StepListener[] listeners = new StepListener[0]; 092 093 protected final Log logger = LogFactory.getLog(getClass()); 094 095 private int commitInterval = 0; 096 097 private TaskExecutor taskExecutor; 098 099 private RepeatOperations stepOperations; 100 101 private RepeatOperations chunkOperations; 102 103 private ExceptionHandler exceptionHandler = new DefaultExceptionHandler(); 104 105 private CompletionPolicy chunkCompletionPolicy; 106 107 private int throttleLimit = TaskExecutorRepeatTemplate.DEFAULT_THROTTLE_LIMIT; 108 109 private boolean isReaderTransactionalQueue = false; 110 111 /** 112 * Default constructor for {@link SimpleStepFactoryBean}. 113 */ 114 public SimpleStepFactoryBean() { 115 super(); 116 } 117 118 /** 119 * Flag to signal that the reader is transactional (usually a JMS consumer) so that items are re-presented after a 120 * rollback. The default is false and readers are assumed to be forward-only. 121 * 122 * @param isReaderTransactionalQueue the value of the flag 123 */ 124 public void setIsReaderTransactionalQueue(boolean isReaderTransactionalQueue) { 125 this.isReaderTransactionalQueue = isReaderTransactionalQueue; 126 } 127 128 /** 129 * Convenience method for subclasses. 130 * @return true if the flag is set (default false) 131 */ 132 protected boolean isReaderTransactionalQueue() { 133 return isReaderTransactionalQueue; 134 } 135 136 /** 137 * Set the bean name property, which will become the name of the {@link Step} when it is created. 138 * 139 * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String) 140 */ 141 @Override 142 public void setBeanName(String name) { 143 this.name = name; 144 } 145 146 /** 147 * Public getter for the name of the step. 148 * @return the name 149 */ 150 public String getName() { 151 return name; 152 } 153 154 /** 155 * The timeout for an individual transaction in the step. 156 * 157 * @param transactionTimeout the transaction timeout to set, defaults to infinite 158 */ 159 public void setTransactionTimeout(int transactionTimeout) { 160 this.transactionTimeout = transactionTimeout; 161 } 162 163 /** 164 * @param propagation the propagation to set for business transactions 165 */ 166 public void setPropagation(Propagation propagation) { 167 this.propagation = propagation; 168 } 169 170 /** 171 * @param isolation the isolation to set for business transactions 172 */ 173 public void setIsolation(Isolation isolation) { 174 this.isolation = isolation; 175 } 176 177 /** 178 * Public setter for the start limit for the step. 179 * 180 * @param startLimit the startLimit to set 181 */ 182 public void setStartLimit(int startLimit) { 183 this.startLimit = startLimit; 184 } 185 186 /** 187 * Public setter for the flag to indicate that the step should be replayed on a restart, even if successful the 188 * first time. 189 * 190 * @param allowStartIfComplete the shouldAllowStartIfComplete to set 191 */ 192 public void setAllowStartIfComplete(boolean allowStartIfComplete) { 193 this.allowStartIfComplete = allowStartIfComplete; 194 } 195 196 /** 197 * @param itemReader the {@link ItemReader} to set 198 */ 199 public void setItemReader(ItemReader<? extends T> itemReader) { 200 this.itemReader = itemReader; 201 } 202 203 /** 204 * @param itemWriter the {@link ItemWriter} to set 205 */ 206 public void setItemWriter(ItemWriter<? super S> itemWriter) { 207 this.itemWriter = itemWriter; 208 } 209 210 /** 211 * @param itemProcessor the {@link ItemProcessor} to set 212 */ 213 public void setItemProcessor(ItemProcessor<? super T, ? extends S> itemProcessor) { 214 this.itemProcessor = itemProcessor; 215 } 216 217 /** 218 * The streams to inject into the {@link Step}. Any instance of {@link ItemStream} can be used, and will then 219 * receive callbacks at the appropriate stage in the step. 220 * 221 * @param streams an array of listeners 222 */ 223 public void setStreams(ItemStream[] streams) { 224 this.streams = streams; 225 } 226 227 /** 228 * The listeners to inject into the {@link Step}. Any instance of {@link StepListener} can be used, and will then 229 * receive callbacks at the appropriate stage in the step. 230 * 231 * @param listeners an array of listeners 232 */ 233 public void setListeners(StepListener[] listeners) { 234 this.listeners = listeners; 235 } 236 237 /** 238 * Protected getter for the {@link StepListener}s. 239 * @return the listeners 240 */ 241 protected StepListener[] getListeners() { 242 return listeners; 243 } 244 245 /** 246 * Protected getter for the {@link ItemReader} for subclasses to use. 247 * @return the itemReader 248 */ 249 protected ItemReader<? extends T> getItemReader() { 250 return itemReader; 251 } 252 253 /** 254 * Protected getter for the {@link ItemWriter} for subclasses to use 255 * @return the itemWriter 256 */ 257 protected ItemWriter<? super S> getItemWriter() { 258 return itemWriter; 259 } 260 261 /** 262 * Protected getter for the {@link ItemProcessor} for subclasses to use 263 * @return the itemProcessor 264 */ 265 protected ItemProcessor<? super T, ? extends S> getItemProcessor() { 266 return itemProcessor; 267 } 268 269 /** 270 * Public setter for {@link JobRepository}. 271 * 272 * @param jobRepository is a mandatory dependence (no default). 273 */ 274 public void setJobRepository(JobRepository jobRepository) { 275 this.jobRepository = jobRepository; 276 } 277 278 /** 279 * Public setter for the {@link PlatformTransactionManager}. 280 * 281 * @param transactionManager the transaction manager to set 282 */ 283 public void setTransactionManager(PlatformTransactionManager transactionManager) { 284 this.transactionManager = transactionManager; 285 } 286 287 /** 288 * Getter for the {@link TransactionAttribute} for subclasses only. 289 * @return the transactionAttribute 290 */ 291 @SuppressWarnings("serial") 292 protected TransactionAttribute getTransactionAttribute() { 293 294 DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); 295 attribute.setPropagationBehavior(propagation.value()); 296 attribute.setIsolationLevel(isolation.value()); 297 attribute.setTimeout(transactionTimeout); 298 return new DefaultTransactionAttribute(attribute) { 299 300 /** 301 * Ignore the default behaviour and rollback on all exceptions that bubble up to the tasklet level. The 302 * tasklet has to deal with the rollback rules internally. 303 */ 304 @Override 305 public boolean rollbackOn(Throwable ex) { 306 return true; 307 } 308 309 }; 310 311 } 312 313 /** 314 * Create a {@link Step} from the configuration provided. 315 * 316 * @see FactoryBean#getObject() 317 */ 318 @Override 319 public final Step getObject() throws Exception { 320 SimpleStepBuilder<T, S> builder = createBuilder(getName()); 321 applyConfiguration(builder); 322 TaskletStep step = builder.build(); 323 return step; 324 } 325 326 protected SimpleStepBuilder<T, S> createBuilder(String name) { 327 return new SimpleStepBuilder<>(new StepBuilder(name)); 328 } 329 330 @Override 331 public Class<TaskletStep> getObjectType() { 332 return TaskletStep.class; 333 } 334 335 /** 336 * Returns true by default, but in most cases a {@link Step} should <b>not</b> be treated as thread-safe. Clients are 337 * recommended to create a new step for each job execution. 338 * 339 * @see org.springframework.beans.factory.FactoryBean#isSingleton() 340 */ 341 @Override 342 public boolean isSingleton() { 343 return this.singleton; 344 } 345 346 /** 347 * Public setter for the singleton flag. 348 * @param singleton the value to set. Defaults to true. 349 */ 350 public void setSingleton(boolean singleton) { 351 this.singleton = singleton; 352 } 353 354 /** 355 * Set the commit interval. Either set this or the chunkCompletionPolicy but not both. 356 * 357 * @param commitInterval 1 by default 358 */ 359 public void setCommitInterval(int commitInterval) { 360 this.commitInterval = commitInterval; 361 } 362 363 /** 364 * Public setter for the {@link CompletionPolicy} applying to the chunk level. A transaction will be committed when 365 * this policy decides to complete. Defaults to a {@link SimpleCompletionPolicy} with chunk size equal to the 366 * commitInterval property. 367 * 368 * @param chunkCompletionPolicy the chunkCompletionPolicy to set 369 */ 370 public void setChunkCompletionPolicy(CompletionPolicy chunkCompletionPolicy) { 371 this.chunkCompletionPolicy = chunkCompletionPolicy; 372 } 373 374 /** 375 * Protected getter for the step operations to make them available in subclasses. 376 * @return the step operations 377 */ 378 protected RepeatOperations getStepOperations() { 379 return stepOperations; 380 } 381 382 /** 383 * Public setter for the stepOperations. 384 * @param stepOperations the stepOperations to set 385 */ 386 public void setStepOperations(RepeatOperations stepOperations) { 387 this.stepOperations = stepOperations; 388 } 389 390 /** 391 * Public setter for the chunkOperations. 392 * @param chunkOperations the chunkOperations to set 393 */ 394 public void setChunkOperations(RepeatOperations chunkOperations) { 395 this.chunkOperations = chunkOperations; 396 } 397 398 /** 399 * Protected getter for the chunk operations to make them available in subclasses. 400 * @return the step operations 401 */ 402 protected RepeatOperations getChunkOperations() { 403 return chunkOperations; 404 } 405 406 /** 407 * Public setter for the {@link ExceptionHandler}. 408 * @param exceptionHandler the exceptionHandler to set 409 */ 410 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 411 this.exceptionHandler = exceptionHandler; 412 } 413 414 /** 415 * Protected getter for the {@link ExceptionHandler}. 416 * @return the {@link ExceptionHandler} 417 */ 418 protected ExceptionHandler getExceptionHandler() { 419 return exceptionHandler; 420 } 421 422 /** 423 * Public setter for the {@link TaskExecutor}. If this is set, then it will be used to execute the chunk processing 424 * inside the {@link Step}. 425 * 426 * @param taskExecutor the taskExecutor to set 427 */ 428 public void setTaskExecutor(TaskExecutor taskExecutor) { 429 this.taskExecutor = taskExecutor; 430 } 431 432 /** 433 * Make the {@link TaskExecutor} available to subclasses 434 * @return the taskExecutor to be used to execute chunks 435 */ 436 protected TaskExecutor getTaskExecutor() { 437 return taskExecutor; 438 } 439 440 /** 441 * Public setter for the throttle limit. This limits the number of tasks queued for concurrent processing to prevent 442 * thread pools from being overwhelmed. Defaults to {@link TaskExecutorRepeatTemplate#DEFAULT_THROTTLE_LIMIT}. 443 * @param throttleLimit the throttle limit to set. 444 */ 445 public void setThrottleLimit(int throttleLimit) { 446 this.throttleLimit = throttleLimit; 447 } 448 449 protected void applyConfiguration(SimpleStepBuilder<T, S> builder) { 450 451 builder.reader(itemReader); 452 builder.processor(itemProcessor); 453 builder.writer(itemWriter); 454 for (StepExecutionListener listener : BatchListenerFactoryHelper.<StepExecutionListener> getListeners( 455 listeners, StepExecutionListener.class)) { 456 builder.listener(listener); 457 } 458 for (ChunkListener listener : BatchListenerFactoryHelper.<ChunkListener> getListeners(listeners, 459 ChunkListener.class)) { 460 builder.listener(listener); 461 } 462 for (ItemReadListener<T> listener : BatchListenerFactoryHelper.<ItemReadListener<T>> getListeners(listeners, 463 ItemReadListener.class)) { 464 builder.listener(listener); 465 } 466 for (ItemWriteListener<S> listener : BatchListenerFactoryHelper.<ItemWriteListener<S>> getListeners(listeners, 467 ItemWriteListener.class)) { 468 builder.listener(listener); 469 } 470 for (ItemProcessListener<T, S> listener : BatchListenerFactoryHelper.<ItemProcessListener<T, S>> getListeners( 471 listeners, ItemProcessListener.class)) { 472 builder.listener(listener); 473 } 474 builder.transactionManager(transactionManager); 475 builder.transactionAttribute(getTransactionAttribute()); 476 builder.repository(jobRepository); 477 builder.startLimit(startLimit); 478 builder.allowStartIfComplete(allowStartIfComplete); 479 builder.chunk(commitInterval); 480 builder.chunk(chunkCompletionPolicy); 481 builder.chunkOperations(chunkOperations); 482 builder.stepOperations(stepOperations); 483 builder.taskExecutor(taskExecutor); 484 builder.throttleLimit(throttleLimit); 485 builder.exceptionHandler(exceptionHandler); 486 if (isReaderTransactionalQueue) { 487 builder.readerIsTransactionalQueue(); 488 } 489 for (ItemStream stream : streams) { 490 builder.stream(stream); 491 } 492 493 } 494}