001/* 002 * Copyright 2006-2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.core.configuration.xml; 018 019import java.io.Serializable; 020import java.util.Collection; 021import java.util.HashMap; 022import java.util.HashSet; 023import java.util.LinkedHashSet; 024import java.util.Map; 025import java.util.Queue; 026import java.util.Set; 027import java.util.concurrent.locks.ReentrantLock; 028import javax.batch.api.chunk.listener.RetryProcessListener; 029import javax.batch.api.chunk.listener.RetryReadListener; 030import javax.batch.api.chunk.listener.RetryWriteListener; 031import javax.batch.api.chunk.listener.SkipProcessListener; 032import javax.batch.api.chunk.listener.SkipReadListener; 033import javax.batch.api.chunk.listener.SkipWriteListener; 034import javax.batch.api.partition.PartitionCollector; 035 036import org.springframework.batch.core.ChunkListener; 037import org.springframework.batch.core.ItemProcessListener; 038import org.springframework.batch.core.ItemReadListener; 039import org.springframework.batch.core.ItemWriteListener; 040import org.springframework.batch.core.Job; 041import org.springframework.batch.core.SkipListener; 042import org.springframework.batch.core.Step; 043import org.springframework.batch.core.StepExecutionListener; 044import org.springframework.batch.core.StepListener; 045import org.springframework.batch.core.job.flow.Flow; 046import org.springframework.batch.core.jsr.ChunkListenerAdapter; 047import org.springframework.batch.core.jsr.ItemProcessListenerAdapter; 048import org.springframework.batch.core.jsr.ItemReadListenerAdapter; 049import org.springframework.batch.core.jsr.ItemWriteListenerAdapter; 050import org.springframework.batch.core.jsr.RetryProcessListenerAdapter; 051import org.springframework.batch.core.jsr.RetryReadListenerAdapter; 052import org.springframework.batch.core.jsr.RetryWriteListenerAdapter; 053import org.springframework.batch.core.jsr.SkipListenerAdapter; 054import org.springframework.batch.core.jsr.StepListenerAdapter; 055import org.springframework.batch.core.jsr.partition.PartitionCollectorAdapter; 056import org.springframework.batch.core.launch.JobLauncher; 057import org.springframework.batch.core.partition.PartitionHandler; 058import org.springframework.batch.core.partition.support.Partitioner; 059import org.springframework.batch.core.partition.support.StepExecutionAggregator; 060import org.springframework.batch.core.repository.JobRepository; 061import org.springframework.batch.core.step.builder.AbstractTaskletStepBuilder; 062import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; 063import org.springframework.batch.core.step.builder.FlowStepBuilder; 064import org.springframework.batch.core.step.builder.JobStepBuilder; 065import org.springframework.batch.core.step.builder.PartitionStepBuilder; 066import org.springframework.batch.core.step.builder.SimpleStepBuilder; 067import org.springframework.batch.core.step.builder.StepBuilder; 068import org.springframework.batch.core.step.builder.StepBuilderHelper; 069import org.springframework.batch.core.step.builder.TaskletStepBuilder; 070import org.springframework.batch.core.step.factory.FaultTolerantStepFactoryBean; 071import org.springframework.batch.core.step.factory.SimpleStepFactoryBean; 072import org.springframework.batch.core.step.item.KeyGenerator; 073import org.springframework.batch.core.step.job.JobParametersExtractor; 074import org.springframework.batch.core.step.skip.SkipPolicy; 075import org.springframework.batch.core.step.tasklet.Tasklet; 076import org.springframework.batch.core.step.tasklet.TaskletStep; 077import org.springframework.batch.item.ItemProcessor; 078import org.springframework.batch.item.ItemReader; 079import org.springframework.batch.item.ItemStream; 080import org.springframework.batch.item.ItemWriter; 081import org.springframework.batch.repeat.CompletionPolicy; 082import org.springframework.batch.repeat.policy.SimpleCompletionPolicy; 083import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate; 084import org.springframework.beans.factory.BeanNameAware; 085import org.springframework.beans.factory.FactoryBean; 086import org.springframework.classify.BinaryExceptionClassifier; 087import org.springframework.core.task.TaskExecutor; 088import org.springframework.retry.RetryListener; 089import org.springframework.retry.RetryPolicy; 090import org.springframework.retry.backoff.BackOffPolicy; 091import org.springframework.retry.policy.MapRetryContextCache; 092import org.springframework.retry.policy.RetryContextCache; 093import org.springframework.transaction.PlatformTransactionManager; 094import org.springframework.transaction.annotation.Isolation; 095import org.springframework.transaction.annotation.Propagation; 096import org.springframework.transaction.interceptor.DefaultTransactionAttribute; 097import org.springframework.util.Assert; 098 099/** 100 * This {@link FactoryBean} is used by the batch namespace parser to create {@link Step} objects. Stores all of the 101 * properties that are configurable on the <step/> (and its inner <tasklet/>). Based on which properties are 102 * configured, the {@link #getObject()} method will delegate to the appropriate class for generating the {@link Step}. 103 * 104 * @author Dan Garrette 105 * @author Josh Long 106 * @author Michael Minella 107 * @author Chris Schaefer 108 * @see SimpleStepFactoryBean 109 * @see FaultTolerantStepFactoryBean 110 * @see TaskletStep 111 * @since 2.0 112 */ 113public class StepParserStepFactoryBean<I, O> implements FactoryBean<Step>, BeanNameAware { 114 115 // 116 // Step Attributes 117 // 118 private String name; 119 120 // 121 // Tasklet Attributes 122 // 123 private Boolean allowStartIfComplete; 124 125 private JobRepository jobRepository; 126 127 private Integer startLimit; 128 129 private Tasklet tasklet; 130 131 private PlatformTransactionManager transactionManager; 132 133 private Set<Object> stepExecutionListeners = new LinkedHashSet<Object>(); 134 135 // 136 // Flow Elements 137 // 138 private Flow flow; 139 140 // 141 // Job Elements 142 // 143 private Job job; 144 145 private JobLauncher jobLauncher; 146 147 private JobParametersExtractor jobParametersExtractor; 148 149 // 150 // Partition Elements 151 // 152 private Partitioner partitioner; 153 154 private static final int DEFAULT_GRID_SIZE = 6; 155 156 private Step step; 157 158 private PartitionHandler partitionHandler; 159 160 private int gridSize = DEFAULT_GRID_SIZE; 161 162 private Queue<Serializable> partitionQueue; 163 164 private ReentrantLock partitionLock; 165 166 // 167 // Tasklet Elements 168 // 169 private Collection<Class<? extends Throwable>> noRollbackExceptionClasses; 170 171 private Integer transactionTimeout; 172 173 private Propagation propagation; 174 175 private Isolation isolation; 176 177 private Set<ChunkListener> chunkListeners = new LinkedHashSet<ChunkListener>(); 178 179 // 180 // Chunk Attributes 181 // 182 private int cacheCapacity = 0; 183 184 private CompletionPolicy chunkCompletionPolicy; 185 186 private Integer commitInterval; 187 188 private Boolean readerTransactionalQueue; 189 190 private Boolean processorTransactional; 191 192 private int retryLimit = 0; 193 194 private BackOffPolicy backOffPolicy; 195 196 private RetryPolicy retryPolicy; 197 198 private RetryContextCache retryContextCache; 199 200 private KeyGenerator keyGenerator; 201 202 private Integer skipLimit; 203 204 private SkipPolicy skipPolicy; 205 206 private TaskExecutor taskExecutor; 207 208 private Integer throttleLimit; 209 210 private ItemReader<? extends I> itemReader; 211 212 private ItemProcessor<? super I, ? extends O> itemProcessor; 213 214 private ItemWriter<? super O> itemWriter; 215 216 // 217 // Chunk Elements 218 // 219 private RetryListener[] retryListeners; 220 221 private Map<Class<? extends Throwable>, Boolean> skippableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>(); 222 223 private Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>(); 224 225 private ItemStream[] streams; 226 227 private Set<ItemReadListener<I>> readListeners = new LinkedHashSet<ItemReadListener<I>>(); 228 229 private Set<ItemWriteListener<O>> writeListeners = new LinkedHashSet<ItemWriteListener<O>>(); 230 231 private Set<ItemProcessListener<I, O>> processListeners = new LinkedHashSet<ItemProcessListener<I, O>>(); 232 233 private Set<SkipListener<I, O>> skipListeners = new LinkedHashSet<SkipListener<I, O>>(); 234 235 private Set<org.springframework.batch.core.jsr.RetryListener> jsrRetryListeners = new LinkedHashSet<org.springframework.batch.core.jsr.RetryListener>(); 236 237 // 238 // Additional 239 // 240 private boolean hasChunkElement = false; 241 242 private StepExecutionAggregator stepExecutionAggregator; 243 244 /** 245 * @param queue The {@link Queue} that is used for communication between {@link javax.batch.api.partition.PartitionCollector} and {@link javax.batch.api.partition.PartitionAnalyzer} 246 */ 247 public void setPartitionQueue(Queue<Serializable> queue) { 248 this.partitionQueue = queue; 249 } 250 251 /** 252 * Used to coordinate access to the partition queue between the {@link javax.batch.api.partition.PartitionCollector} and {@link javax.batch.api.partition.AbstractPartitionAnalyzer} 253 * 254 * @param lock a lock that will be locked around accessing the partition queue 255 */ 256 public void setPartitionLock(ReentrantLock lock) { 257 this.partitionLock = lock; 258 } 259 260 /** 261 * Create a {@link Step} from the configuration provided. 262 * 263 * @see FactoryBean#getObject() 264 */ 265 @Override 266 public Step getObject() throws Exception { 267 if (hasChunkElement) { 268 Assert.isNull(tasklet, "Step [" + name 269 + "] has both a <chunk/> element and a 'ref' attribute referencing a Tasklet."); 270 271 validateFaultTolerantSettings(); 272 273 if (isFaultTolerant()) { 274 return createFaultTolerantStep(); 275 } 276 else { 277 return createSimpleStep(); 278 } 279 } 280 else if (tasklet != null) { 281 return createTaskletStep(); 282 } 283 else if (flow != null) { 284 return createFlowStep(); 285 } 286 else if (job != null) { 287 return createJobStep(); 288 } 289 else { 290 return createPartitionStep(); 291 } 292 } 293 294 public boolean requiresTransactionManager() { 295 // Currently all step implementations other than TaskletStep are 296 // AbstractStep and do not require a transaction manager 297 return hasChunkElement || tasklet != null; 298 } 299 300 /** 301 * @param builder {@link StepBuilderHelper} representing the step to be enhanced 302 */ 303 protected void enhanceCommonStep(StepBuilderHelper<?> builder) { 304 if (allowStartIfComplete != null) { 305 builder.allowStartIfComplete(allowStartIfComplete); 306 } 307 if (startLimit != null) { 308 builder.startLimit(startLimit); 309 } 310 builder.repository(jobRepository); 311 builder.transactionManager(transactionManager); 312 for (Object listener : stepExecutionListeners) { 313 if(listener instanceof StepExecutionListener) { 314 builder.listener((StepExecutionListener) listener); 315 } else if(listener instanceof javax.batch.api.listener.StepListener) { 316 builder.listener(new StepListenerAdapter((javax.batch.api.listener.StepListener) listener)); 317 } 318 } 319 } 320 321 protected Step createPartitionStep() { 322 323 PartitionStepBuilder builder; 324 if (partitioner != null) { 325 builder = new StepBuilder(name).partitioner(step != null ? step.getName() : name, partitioner).step(step); 326 } 327 else { 328 builder = new StepBuilder(name).partitioner(step); 329 } 330 enhanceCommonStep(builder); 331 332 if (partitionHandler != null) { 333 builder.partitionHandler(partitionHandler); 334 } 335 else { 336 builder.gridSize(gridSize); 337 builder.taskExecutor(taskExecutor); 338 } 339 340 builder.aggregator(stepExecutionAggregator); 341 342 return builder.build(); 343 344 } 345 346 protected Step createFaultTolerantStep() { 347 348 FaultTolerantStepBuilder<I, O> builder = getFaultTolerantStepBuilder(this.name); 349 350 if (commitInterval != null) { 351 builder.chunk(commitInterval); 352 } 353 builder.chunk(chunkCompletionPolicy); 354 enhanceTaskletStepBuilder(builder); 355 356 builder.reader(itemReader); 357 builder.writer(itemWriter); 358 builder.processor(itemProcessor); 359 360 if (processorTransactional != null && !processorTransactional) { 361 builder.processorNonTransactional(); 362 } 363 364 if (readerTransactionalQueue!=null && readerTransactionalQueue==true) { 365 builder.readerIsTransactionalQueue(); 366 } 367 368 for (SkipListener<I, O> listener : skipListeners) { 369 builder.listener(listener); 370 } 371 372 for (org.springframework.batch.core.jsr.RetryListener listener : jsrRetryListeners) { 373 builder.listener(listener); 374 } 375 376 registerItemListeners(builder); 377 378 if (skipPolicy != null) { 379 builder.skipPolicy(skipPolicy); 380 } 381 else if (skipLimit!=null) { 382 builder.skipLimit(skipLimit); 383 for (Class<? extends Throwable> type : skippableExceptionClasses.keySet()) { 384 if (skippableExceptionClasses.get(type)) { 385 builder.skip(type); 386 } 387 else { 388 builder.noSkip(type); 389 } 390 } 391 } 392 393 if (retryListeners != null) { 394 for (RetryListener listener : retryListeners) { 395 builder.listener(listener); 396 } 397 } 398 399 if (retryContextCache == null && cacheCapacity > 0) { 400 retryContextCache = new MapRetryContextCache(cacheCapacity); 401 } 402 builder.retryContextCache(retryContextCache); 403 builder.keyGenerator(keyGenerator); 404 if (retryPolicy != null) { 405 builder.retryPolicy(retryPolicy); 406 } 407 else { 408 builder.retryLimit(retryLimit); 409 builder.backOffPolicy(backOffPolicy); 410 for (Class<? extends Throwable> type : retryableExceptionClasses.keySet()) { 411 if (retryableExceptionClasses.get(type)) { 412 builder.retry(type); 413 } 414 else { 415 builder.noRetry(type); 416 } 417 } 418 } 419 420 if (noRollbackExceptionClasses != null) { 421 for (Class<? extends Throwable> type : noRollbackExceptionClasses) { 422 builder.noRollback(type); 423 } 424 } 425 426 return builder.build(); 427 428 } 429 430 protected FaultTolerantStepBuilder<I, O> getFaultTolerantStepBuilder(String stepName) { 431 return new FaultTolerantStepBuilder<I, O>(new StepBuilder(stepName)); 432 } 433 434 protected void registerItemListeners(SimpleStepBuilder<I, O> builder) { 435 for (ItemReadListener<I> listener : readListeners) { 436 builder.listener(listener); 437 } 438 for (ItemWriteListener<O> listener : writeListeners) { 439 builder.listener(listener); 440 } 441 for (ItemProcessListener<I, O> listener : processListeners) { 442 builder.listener(listener); 443 } 444 } 445 446 protected Step createSimpleStep() { 447 SimpleStepBuilder<I, O> builder = getSimpleStepBuilder(name); 448 449 setChunk(builder); 450 451 enhanceTaskletStepBuilder(builder); 452 registerItemListeners(builder); 453 builder.reader(itemReader); 454 builder.writer(itemWriter); 455 builder.processor(itemProcessor); 456 return builder.build(); 457 } 458 459 protected void setChunk(SimpleStepBuilder<I, O> builder) { 460 if (commitInterval != null) { 461 builder.chunk(commitInterval); 462 } 463 builder.chunk(chunkCompletionPolicy); 464 } 465 466 protected CompletionPolicy getCompletionPolicy() { 467 return this.chunkCompletionPolicy; 468 } 469 470 protected SimpleStepBuilder<I, O> getSimpleStepBuilder(String stepName) { 471 return new SimpleStepBuilder<I, O>(new StepBuilder(stepName)); 472 } 473 474 /** 475 * @return a new {@link TaskletStep} 476 */ 477 protected TaskletStep createTaskletStep() { 478 TaskletStepBuilder builder = new StepBuilder(name).tasklet(tasklet); 479 enhanceTaskletStepBuilder(builder); 480 return builder.build(); 481 } 482 483 @SuppressWarnings("serial") 484 protected void enhanceTaskletStepBuilder(AbstractTaskletStepBuilder<?> builder) { 485 486 enhanceCommonStep(builder); 487 for (ChunkListener listener : chunkListeners) { 488 if(listener instanceof PartitionCollectorAdapter) { 489 ((PartitionCollectorAdapter) listener).setPartitionLock(partitionLock); 490 } 491 492 builder.listener(listener); 493 494 } 495 builder.taskExecutor(taskExecutor); 496 if (throttleLimit != null) { 497 builder.throttleLimit(throttleLimit); 498 } 499 builder.transactionManager(transactionManager); 500 if (transactionTimeout != null || propagation != null || isolation != null 501 || noRollbackExceptionClasses != null) { 502 DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); 503 if (propagation != null) { 504 attribute.setPropagationBehavior(propagation.value()); 505 } 506 if (isolation != null) { 507 attribute.setIsolationLevel(isolation.value()); 508 } 509 if (transactionTimeout != null) { 510 attribute.setTimeout(transactionTimeout); 511 } 512 Collection<Class<? extends Throwable>> exceptions = noRollbackExceptionClasses == null ? new HashSet<Class<? extends Throwable>>() 513 : noRollbackExceptionClasses; 514 final BinaryExceptionClassifier classifier = new BinaryExceptionClassifier(exceptions, false); 515 builder.transactionAttribute(new DefaultTransactionAttribute(attribute) { 516 @Override 517 public boolean rollbackOn(Throwable ex) { 518 return classifier.classify(ex); 519 } 520 }); 521 } 522 if (streams != null) { 523 for (ItemStream stream : streams) { 524 builder.stream(stream); 525 } 526 } 527 528 } 529 530 protected Step createFlowStep() { 531 FlowStepBuilder builder = new StepBuilder(name).flow(flow); 532 enhanceCommonStep(builder); 533 return builder.build(); 534 } 535 536 private Step createJobStep() throws Exception { 537 538 JobStepBuilder builder = new StepBuilder(name).job(job); 539 enhanceCommonStep(builder); 540 builder.parametersExtractor(jobParametersExtractor); 541 builder.launcher(jobLauncher); 542 return builder.build(); 543 544 } 545 546 /** 547 * Validates that all components required to build a fault tolerant step are set 548 */ 549 protected void validateFaultTolerantSettings() { 550 validateDependency("skippable-exception-classes", skippableExceptionClasses, "skip-limit", skipLimit, true); 551 validateDependency("retryable-exception-classes", retryableExceptionClasses, "retry-limit", retryLimit, true); 552 validateDependency("retry-listeners", retryListeners, "retry-limit", retryLimit, false); 553 if (isPresent(processorTransactional) && !processorTransactional && isPresent(readerTransactionalQueue) 554 && readerTransactionalQueue) { 555 throw new IllegalArgumentException( 556 "The field 'processor-transactional' cannot be false if 'reader-transactional-queue' is true"); 557 } 558 } 559 560 /** 561 * Check if a field is present then a second is also. If the twoWayDependency flag is set then the opposite must 562 * also be true: if the second value is present, the first must also be. 563 * 564 * @param dependentName the name of the first field 565 * @param dependentValue the value of the first field 566 * @param name the name of the other field (which should be absent if the first is present) 567 * @param value the value of the other field 568 * @param twoWayDependency true if both depend on each other 569 * @throws IllegalArgumentException if either condition is violated 570 */ 571 private void validateDependency(String dependentName, Object dependentValue, String name, Object value, 572 boolean twoWayDependency) { 573 if (isPresent(dependentValue) && !isPresent(value)) { 574 throw new IllegalArgumentException("The field '" + dependentName + "' is not permitted on the step [" 575 + this.name + "] because there is no '" + name + "'."); 576 } 577 if (twoWayDependency && isPresent(value) && !isPresent(dependentValue)) { 578 throw new IllegalArgumentException("The field '" + name + "' is not permitted on the step [" + this.name 579 + "] because there is no '" + dependentName + "'."); 580 } 581 } 582 583 /** 584 * Is the object non-null (or if an Integer, non-zero)? 585 * 586 * @param o an object 587 * @return true if the object has a value 588 */ 589 private boolean isPresent(Object o) { 590 if (o instanceof Integer) { 591 return isPositive((Integer) o); 592 } 593 if (o instanceof Collection) { 594 return !((Collection<?>) o).isEmpty(); 595 } 596 if (o instanceof Map) { 597 return !((Map<?, ?>) o).isEmpty(); 598 } 599 return o != null; 600 } 601 602 /** 603 * @return true if the step is configured with any components that require fault tolerance 604 */ 605 protected boolean isFaultTolerant() { 606 return backOffPolicy != null || skipPolicy != null || retryPolicy != null || isPositive(skipLimit) 607 || isPositive(retryLimit) || isPositive(cacheCapacity) || isTrue(readerTransactionalQueue); 608 } 609 610 private boolean isTrue(Boolean b) { 611 return b != null && b.booleanValue(); 612 } 613 614 private boolean isPositive(Integer n) { 615 return n != null && n > 0; 616 } 617 618 @Override 619 public Class<TaskletStep> getObjectType() { 620 return TaskletStep.class; 621 } 622 623 @Override 624 public boolean isSingleton() { 625 return true; 626 } 627 628 // ========================================================= 629 // Step Attributes 630 // ========================================================= 631 632 /** 633 * Set the bean name property, which will become the name of the {@link Step} when it is created. 634 * 635 * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String) 636 */ 637 @Override 638 public void setBeanName(String name) { 639 if (this.name == null) { 640 this.name = name; 641 } 642 } 643 644 /** 645 * @param name the name to set 646 */ 647 public void setName(String name) { 648 this.name = name; 649 } 650 651 public String getName() { 652 return this.name; 653 } 654 655 // ========================================================= 656 // Flow Attributes 657 // ========================================================= 658 659 /** 660 * @param flow the flow to set 661 */ 662 public void setFlow(Flow flow) { 663 this.flow = flow; 664 } 665 666 // ========================================================= 667 // Job Attributes 668 // ========================================================= 669 670 public void setJob(Job job) { 671 this.job = job; 672 } 673 674 public void setJobParametersExtractor(JobParametersExtractor jobParametersExtractor) { 675 this.jobParametersExtractor = jobParametersExtractor; 676 } 677 678 public void setJobLauncher(JobLauncher jobLauncher) { 679 this.jobLauncher = jobLauncher; 680 } 681 682 // ========================================================= 683 // Partition Attributes 684 // ========================================================= 685 686 /** 687 * @param partitioner the partitioner to set 688 */ 689 public void setPartitioner(Partitioner partitioner) { 690 this.partitioner = partitioner; 691 } 692 693 /** 694 * @param stepExecutionAggregator the stepExecutionAggregator to set 695 */ 696 public void setStepExecutionAggregator(StepExecutionAggregator stepExecutionAggregator) { 697 this.stepExecutionAggregator = stepExecutionAggregator; 698 } 699 700 /** 701 * @return stepExecutionAggregator the current step's {@link StepExecutionAggregator} 702 */ 703 protected StepExecutionAggregator getStepExecutionAggergator() { 704 return this.stepExecutionAggregator; 705 } 706 707 /** 708 * @param partitionHandler the partitionHandler to set 709 */ 710 public void setPartitionHandler(PartitionHandler partitionHandler) { 711 this.partitionHandler = partitionHandler; 712 } 713 714 /** 715 * @return partitionHandler the current step's {@link PartitionHandler} 716 */ 717 protected PartitionHandler getPartitionHandler() { 718 return this.partitionHandler; 719 } 720 721 /** 722 * @param gridSize the gridSize to set 723 */ 724 public void setGridSize(int gridSize) { 725 this.gridSize = gridSize; 726 } 727 728 /** 729 * @param step the step to set 730 */ 731 public void setStep(Step step) { 732 this.step = step; 733 } 734 735 // ========================================================= 736 // Tasklet Attributes 737 // ========================================================= 738 739 /** 740 * Public setter for the flag to indicate that the step should be replayed on a restart, even if successful the 741 * first time. 742 * 743 * @param allowStartIfComplete the shouldAllowStartIfComplete to set 744 */ 745 public void setAllowStartIfComplete(boolean allowStartIfComplete) { 746 this.allowStartIfComplete = allowStartIfComplete; 747 748 } 749 750 /** 751 * @return jobRepository 752 */ 753 public JobRepository getJobRepository() { 754 return jobRepository; 755 } 756 757 /** 758 * Public setter for {@link JobRepository}. 759 * 760 * @param jobRepository {@link JobRepository} instance to be used by the step. 761 */ 762 public void setJobRepository(JobRepository jobRepository) { 763 this.jobRepository = jobRepository; 764 } 765 766 /** 767 * The number of times that the step should be allowed to start 768 * 769 * @param startLimit int containing the number of times a step should be allowed to start. 770 */ 771 public void setStartLimit(int startLimit) { 772 this.startLimit = startLimit; 773 } 774 775 /** 776 * A preconfigured {@link Tasklet} to use. 777 * 778 * @param tasklet {@link Tasklet} instance to be used by step. 779 */ 780 public void setTasklet(Tasklet tasklet) { 781 this.tasklet = tasklet; 782 } 783 784 protected Tasklet getTasklet() { 785 return this.tasklet; 786 } 787 788 /** 789 * @return transactionManager instance of {@link PlatformTransactionManager} 790 * used by the step. 791 */ 792 public PlatformTransactionManager getTransactionManager() { 793 return transactionManager; 794 } 795 796 /** 797 * @param transactionManager the transaction manager to set 798 */ 799 public void setTransactionManager(PlatformTransactionManager transactionManager) { 800 this.transactionManager = transactionManager; 801 } 802 803 // ========================================================= 804 // Tasklet Elements 805 // ========================================================= 806 807 /** 808 * The listeners to inject into the {@link Step}. Any instance of {@link StepListener} can be used, and will then 809 * receive callbacks at the appropriate stage in the step. 810 * 811 * @param listeners an array of listeners 812 */ 813 @SuppressWarnings("unchecked") 814 public void setListeners(Object[] listeners) { 815 for (Object listener : listeners) { 816 if (listener instanceof SkipListener) { 817 SkipListener<I, O> skipListener = (SkipListener<I, O>) listener; 818 skipListeners.add(skipListener); 819 } 820 if(listener instanceof SkipReadListener) { 821 SkipListener<I, O> skipListener = new SkipListenerAdapter<I, O>((SkipReadListener) listener, null, null); 822 skipListeners.add(skipListener); 823 } 824 if(listener instanceof SkipProcessListener) { 825 SkipListener<I, O> skipListener = new SkipListenerAdapter<I, O>(null,(SkipProcessListener) listener, null); 826 skipListeners.add(skipListener); 827 } 828 if(listener instanceof SkipWriteListener) { 829 SkipListener<I, O> skipListener = new SkipListenerAdapter<I, O>(null, null, (SkipWriteListener) listener); 830 skipListeners.add(skipListener); 831 } 832 if (listener instanceof StepExecutionListener) { 833 StepExecutionListener stepExecutionListener = (StepExecutionListener) listener; 834 stepExecutionListeners.add(stepExecutionListener); 835 } 836 if(listener instanceof javax.batch.api.listener.StepListener) { 837 StepExecutionListener stepExecutionListener = new StepListenerAdapter((javax.batch.api.listener.StepListener) listener); 838 stepExecutionListeners.add(stepExecutionListener); 839 } 840 if (listener instanceof ChunkListener) { 841 ChunkListener chunkListener = (ChunkListener) listener; 842 chunkListeners.add(chunkListener); 843 } 844 if(listener instanceof javax.batch.api.chunk.listener.ChunkListener) { 845 ChunkListener chunkListener = new ChunkListenerAdapter((javax.batch.api.chunk.listener.ChunkListener) listener); 846 chunkListeners.add(chunkListener); 847 } 848 if (listener instanceof ItemReadListener) { 849 ItemReadListener<I> readListener = (ItemReadListener<I>) listener; 850 readListeners.add(readListener); 851 } 852 if(listener instanceof javax.batch.api.chunk.listener.ItemReadListener) { 853 ItemReadListener<I> itemListener = new ItemReadListenerAdapter<I>((javax.batch.api.chunk.listener.ItemReadListener) listener); 854 readListeners.add(itemListener); 855 } 856 if (listener instanceof ItemWriteListener) { 857 ItemWriteListener<O> writeListener = (ItemWriteListener<O>) listener; 858 writeListeners.add(writeListener); 859 } 860 if(listener instanceof javax.batch.api.chunk.listener.ItemWriteListener) { 861 ItemWriteListener<O> itemListener = new ItemWriteListenerAdapter<O>((javax.batch.api.chunk.listener.ItemWriteListener) listener); 862 writeListeners.add(itemListener); 863 } 864 if (listener instanceof ItemProcessListener) { 865 ItemProcessListener<I, O> processListener = (ItemProcessListener<I, O>) listener; 866 processListeners.add(processListener); 867 } 868 if(listener instanceof javax.batch.api.chunk.listener.ItemProcessListener) { 869 ItemProcessListener<I,O> itemListener = new ItemProcessListenerAdapter<I, O>((javax.batch.api.chunk.listener.ItemProcessListener) listener); 870 processListeners.add(itemListener); 871 } 872 if(listener instanceof RetryReadListener) { 873 jsrRetryListeners.add(new RetryReadListenerAdapter((RetryReadListener) listener)); 874 } 875 if(listener instanceof RetryProcessListener) { 876 jsrRetryListeners.add(new RetryProcessListenerAdapter((RetryProcessListener) listener)); 877 } 878 if(listener instanceof RetryWriteListener) { 879 jsrRetryListeners.add(new RetryWriteListenerAdapter((RetryWriteListener) listener)); 880 } 881 if(listener instanceof PartitionCollector) { 882 PartitionCollectorAdapter adapter = new PartitionCollectorAdapter(partitionQueue, (PartitionCollector) listener); 883 chunkListeners.add(adapter); 884 } 885 } 886 } 887 888 /** 889 * Exception classes that may not cause a rollback if encountered in the right place. 890 * 891 * @param noRollbackExceptionClasses the noRollbackExceptionClasses to set 892 */ 893 public void setNoRollbackExceptionClasses(Collection<Class<? extends Throwable>> noRollbackExceptionClasses) { 894 this.noRollbackExceptionClasses = noRollbackExceptionClasses; 895 } 896 897 /** 898 * @param transactionTimeout the transactionTimeout to set 899 */ 900 public void setTransactionTimeout(int transactionTimeout) { 901 this.transactionTimeout = transactionTimeout; 902 } 903 904 /** 905 * @param isolation the isolation to set 906 */ 907 public void setIsolation(Isolation isolation) { 908 this.isolation = isolation; 909 } 910 911 /** 912 * @param propagation the propagation to set 913 */ 914 public void setPropagation(Propagation propagation) { 915 this.propagation = propagation; 916 } 917 918 // ========================================================= 919 // Parent Attributes - can be provided in parent bean but not namespace 920 // ========================================================= 921 922 /** 923 * A backoff policy to be applied to retry process. 924 * 925 * @param backOffPolicy the {@link BackOffPolicy} to set 926 */ 927 public void setBackOffPolicy(BackOffPolicy backOffPolicy) { 928 this.backOffPolicy = backOffPolicy; 929 } 930 931 /** 932 * A retry policy to apply when exceptions occur. If this is specified then the retry limit and retryable exceptions 933 * will be ignored. 934 * 935 * @param retryPolicy the {@link RetryPolicy} to set 936 */ 937 public void setRetryPolicy(RetryPolicy retryPolicy) { 938 this.retryPolicy = retryPolicy; 939 } 940 941 /** 942 * @param retryContextCache the {@link RetryContextCache} to set 943 */ 944 public void setRetryContextCache(RetryContextCache retryContextCache) { 945 this.retryContextCache = retryContextCache; 946 } 947 948 /** 949 * A key generator that can be used to compare items with previously recorded items in a retry. Only used if the 950 * reader is a transactional queue. 951 * 952 * @param keyGenerator the {@link KeyGenerator} to set 953 */ 954 public void setKeyGenerator(KeyGenerator keyGenerator) { 955 this.keyGenerator = keyGenerator; 956 } 957 958 // ========================================================= 959 // Chunk Attributes 960 // ========================================================= 961 962 /** 963 * Public setter for the capacity of the cache in the retry policy. If more items than this fail without being 964 * skipped or recovered an exception will be thrown. This is to guard against inadvertent infinite loops generated 965 * by item identity problems.<br> 966 * <br> 967 * The default value should be high enough and more for most purposes. To breach the limit in a single-threaded step 968 * typically you have to have this many failures in a single transaction. Defaults to the value in the 969 * {@link MapRetryContextCache}.<br> 970 * 971 * @param cacheCapacity the cache capacity to set (greater than 0 else ignored) 972 */ 973 public void setCacheCapacity(int cacheCapacity) { 974 this.cacheCapacity = cacheCapacity; 975 } 976 977 /** 978 * Public setter for the {@link CompletionPolicy} applying to the chunk level. A transaction will be committed when 979 * this policy decides to complete. Defaults to a {@link SimpleCompletionPolicy} with chunk size equal to the 980 * commitInterval property. 981 * 982 * @param chunkCompletionPolicy the chunkCompletionPolicy to set 983 */ 984 public void setChunkCompletionPolicy(CompletionPolicy chunkCompletionPolicy) { 985 this.chunkCompletionPolicy = chunkCompletionPolicy; 986 } 987 988 /** 989 * Set the commit interval. Either set this or the chunkCompletionPolicy but not both. 990 * 991 * @param commitInterval 1 by default 992 */ 993 public void setCommitInterval(int commitInterval) { 994 this.commitInterval = commitInterval; 995 } 996 997 protected Integer getCommitInterval() { 998 return this.commitInterval; 999 } 1000 1001 /** 1002 * Flag to signal that the reader is transactional (usually a JMS consumer) so that items are re-presented after a 1003 * rollback. The default is false and readers are assumed to be forward-only. 1004 * 1005 * @param isReaderTransactionalQueue the value of the flag 1006 */ 1007 public void setIsReaderTransactionalQueue(boolean isReaderTransactionalQueue) { 1008 this.readerTransactionalQueue = isReaderTransactionalQueue; 1009 } 1010 1011 /** 1012 * Flag to signal that the processor is transactional, in which case it should be called for every item in every 1013 * transaction. If false then we can cache the processor results between transactions in the case of a rollback. 1014 * 1015 * @param processorTransactional the value to set 1016 */ 1017 public void setProcessorTransactional(Boolean processorTransactional) { 1018 this.processorTransactional = processorTransactional; 1019 } 1020 1021 /** 1022 * Public setter for the retry limit. Each item can be retried up to this limit. Note this limit includes the 1023 * initial attempt to process the item, therefore <code>retryLimit == 1</code> by default. 1024 * 1025 * @param retryLimit the retry limit to set, must be greater or equal to 1. 1026 */ 1027 public void setRetryLimit(int retryLimit) { 1028 this.retryLimit = retryLimit; 1029 } 1030 1031 /** 1032 * Public setter for a limit that determines skip policy. If this value is positive then an exception in chunk 1033 * processing will cause the item to be skipped and no exception propagated until the limit is reached. If it is 1034 * zero then all exceptions will be propagated from the chunk and cause the step to abort. 1035 * 1036 * @param skipLimit the value to set. Default is 0 (never skip). 1037 */ 1038 public void setSkipLimit(int skipLimit) { 1039 this.skipLimit = skipLimit; 1040 } 1041 1042 /** 1043 * Public setter for a skip policy. If this value is set then the skip limit and skippable exceptions are ignored. 1044 * 1045 * @param skipPolicy the {@link SkipPolicy} to set 1046 */ 1047 public void setSkipPolicy(SkipPolicy skipPolicy) { 1048 this.skipPolicy = skipPolicy; 1049 } 1050 1051 /** 1052 * Public setter for the {@link TaskExecutor}. If this is set, then it will be used to execute the chunk processing 1053 * inside the {@link Step}. 1054 * 1055 * @param taskExecutor the taskExecutor to set 1056 */ 1057 public void setTaskExecutor(TaskExecutor taskExecutor) { 1058 this.taskExecutor = taskExecutor; 1059 } 1060 1061 /** 1062 * Public setter for the throttle limit. This limits the number of tasks queued for concurrent processing to prevent 1063 * thread pools from being overwhelmed. Defaults to {@link TaskExecutorRepeatTemplate#DEFAULT_THROTTLE_LIMIT}. 1064 * 1065 * @param throttleLimit the throttle limit to set. 1066 */ 1067 public void setThrottleLimit(Integer throttleLimit) { 1068 this.throttleLimit = throttleLimit; 1069 } 1070 1071 /** 1072 * @param itemReader the {@link ItemReader} to set 1073 */ 1074 public void setItemReader(ItemReader<? extends I> itemReader) { 1075 this.itemReader = itemReader; 1076 } 1077 1078 /** 1079 * @param itemProcessor the {@link ItemProcessor} to set 1080 */ 1081 public void setItemProcessor(ItemProcessor<? super I, ? extends O> itemProcessor) { 1082 this.itemProcessor = itemProcessor; 1083 } 1084 1085 /** 1086 * @param itemWriter the {@link ItemWriter} to set 1087 */ 1088 public void setItemWriter(ItemWriter<? super O> itemWriter) { 1089 this.itemWriter = itemWriter; 1090 } 1091 1092 // ========================================================= 1093 // Chunk Elements 1094 // ========================================================= 1095 1096 /** 1097 * Public setter for the {@link RetryListener}s. 1098 * 1099 * @param retryListeners the {@link RetryListener}s to set 1100 */ 1101 public void setRetryListeners(RetryListener... retryListeners) { 1102 this.retryListeners = retryListeners; 1103 } 1104 1105 /** 1106 * Public setter for exception classes that when raised won't crash the job but will result in transaction rollback 1107 * and the item which handling caused the exception will be skipped. 1108 * 1109 * @param exceptionClasses {@link Map} containing the {@link Throwable}s as 1110 * the keys and the values are {@link Boolean}s, that if true the item is skipped. 1111 */ 1112 public void setSkippableExceptionClasses(Map<Class<? extends Throwable>, Boolean> exceptionClasses) { 1113 this.skippableExceptionClasses = exceptionClasses; 1114 } 1115 1116 /** 1117 * Public setter for exception classes that will retry the item when raised. 1118 * 1119 * @param retryableExceptionClasses the retryableExceptionClasses to set 1120 */ 1121 public void setRetryableExceptionClasses(Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses) { 1122 this.retryableExceptionClasses = retryableExceptionClasses; 1123 } 1124 1125 /** 1126 * The streams to inject into the {@link Step}. Any instance of {@link ItemStream} can be used, and will then 1127 * receive callbacks at the appropriate stage in the step. 1128 * 1129 * @param streams an array of listeners 1130 */ 1131 public void setStreams(ItemStream[] streams) { 1132 this.streams = streams; 1133 } 1134 1135 // ========================================================= 1136 // Additional 1137 // ========================================================= 1138 1139 /** 1140 * @param hasChunkElement true if step has <chunk/> element. 1141 */ 1142 public void setHasChunkElement(boolean hasChunkElement) { 1143 this.hasChunkElement = hasChunkElement; 1144 } 1145 1146 /** 1147 * @return true if the defined step has a <chunk/> element 1148 */ 1149 protected boolean hasChunkElement() { 1150 return this.hasChunkElement; 1151 } 1152 1153 /** 1154 * @return true if the defined step has a <tasklet/> element 1155 */ 1156 protected boolean hasTasklet() { 1157 return this.tasklet != null; 1158 } 1159 1160 /** 1161 * @return true if the defined step has a <partition/> element 1162 */ 1163 protected boolean hasPartitionElement() { 1164 return this.partitionHandler != null; 1165 } 1166}