001/*
002 * Copyright 2006-2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.configuration.xml;
018
019import java.io.Serializable;
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.HashSet;
023import java.util.LinkedHashSet;
024import java.util.Map;
025import java.util.Queue;
026import java.util.Set;
027import java.util.concurrent.locks.ReentrantLock;
028import javax.batch.api.chunk.listener.RetryProcessListener;
029import javax.batch.api.chunk.listener.RetryReadListener;
030import javax.batch.api.chunk.listener.RetryWriteListener;
031import javax.batch.api.chunk.listener.SkipProcessListener;
032import javax.batch.api.chunk.listener.SkipReadListener;
033import javax.batch.api.chunk.listener.SkipWriteListener;
034import javax.batch.api.partition.PartitionCollector;
035
036import org.springframework.batch.core.ChunkListener;
037import org.springframework.batch.core.ItemProcessListener;
038import org.springframework.batch.core.ItemReadListener;
039import org.springframework.batch.core.ItemWriteListener;
040import org.springframework.batch.core.Job;
041import org.springframework.batch.core.SkipListener;
042import org.springframework.batch.core.Step;
043import org.springframework.batch.core.StepExecutionListener;
044import org.springframework.batch.core.StepListener;
045import org.springframework.batch.core.job.flow.Flow;
046import org.springframework.batch.core.jsr.ChunkListenerAdapter;
047import org.springframework.batch.core.jsr.ItemProcessListenerAdapter;
048import org.springframework.batch.core.jsr.ItemReadListenerAdapter;
049import org.springframework.batch.core.jsr.ItemWriteListenerAdapter;
050import org.springframework.batch.core.jsr.RetryProcessListenerAdapter;
051import org.springframework.batch.core.jsr.RetryReadListenerAdapter;
052import org.springframework.batch.core.jsr.RetryWriteListenerAdapter;
053import org.springframework.batch.core.jsr.SkipListenerAdapter;
054import org.springframework.batch.core.jsr.StepListenerAdapter;
055import org.springframework.batch.core.jsr.partition.PartitionCollectorAdapter;
056import org.springframework.batch.core.launch.JobLauncher;
057import org.springframework.batch.core.partition.PartitionHandler;
058import org.springframework.batch.core.partition.support.Partitioner;
059import org.springframework.batch.core.partition.support.StepExecutionAggregator;
060import org.springframework.batch.core.repository.JobRepository;
061import org.springframework.batch.core.step.builder.AbstractTaskletStepBuilder;
062import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
063import org.springframework.batch.core.step.builder.FlowStepBuilder;
064import org.springframework.batch.core.step.builder.JobStepBuilder;
065import org.springframework.batch.core.step.builder.PartitionStepBuilder;
066import org.springframework.batch.core.step.builder.SimpleStepBuilder;
067import org.springframework.batch.core.step.builder.StepBuilder;
068import org.springframework.batch.core.step.builder.StepBuilderHelper;
069import org.springframework.batch.core.step.builder.TaskletStepBuilder;
070import org.springframework.batch.core.step.factory.FaultTolerantStepFactoryBean;
071import org.springframework.batch.core.step.factory.SimpleStepFactoryBean;
072import org.springframework.batch.core.step.item.KeyGenerator;
073import org.springframework.batch.core.step.job.JobParametersExtractor;
074import org.springframework.batch.core.step.skip.SkipPolicy;
075import org.springframework.batch.core.step.tasklet.Tasklet;
076import org.springframework.batch.core.step.tasklet.TaskletStep;
077import org.springframework.batch.item.ItemProcessor;
078import org.springframework.batch.item.ItemReader;
079import org.springframework.batch.item.ItemStream;
080import org.springframework.batch.item.ItemWriter;
081import org.springframework.batch.repeat.CompletionPolicy;
082import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
083import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate;
084import org.springframework.beans.factory.BeanNameAware;
085import org.springframework.beans.factory.FactoryBean;
086import org.springframework.classify.BinaryExceptionClassifier;
087import org.springframework.core.task.TaskExecutor;
088import org.springframework.retry.RetryListener;
089import org.springframework.retry.RetryPolicy;
090import org.springframework.retry.backoff.BackOffPolicy;
091import org.springframework.retry.policy.MapRetryContextCache;
092import org.springframework.retry.policy.RetryContextCache;
093import org.springframework.transaction.PlatformTransactionManager;
094import org.springframework.transaction.annotation.Isolation;
095import org.springframework.transaction.annotation.Propagation;
096import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
097import org.springframework.util.Assert;
098
099/**
100 * This {@link FactoryBean} is used by the batch namespace parser to create {@link Step} objects. Stores all of the
101 * properties that are configurable on the <step/> (and its inner <tasklet/>). Based on which properties are
102 * configured, the {@link #getObject()} method will delegate to the appropriate class for generating the {@link Step}.
103 *
104 * @author Dan Garrette
105 * @author Josh Long
106 * @author Michael Minella
107 * @author Chris Schaefer
108 * @see SimpleStepFactoryBean
109 * @see FaultTolerantStepFactoryBean
110 * @see TaskletStep
111 * @since 2.0
112 */
113public class StepParserStepFactoryBean<I, O> implements FactoryBean<Step>, BeanNameAware {
114
115        //
116        // Step Attributes
117        //
118        private String name;
119
120        //
121        // Tasklet Attributes
122        //
123        private Boolean allowStartIfComplete;
124
125        private JobRepository jobRepository;
126
127        private Integer startLimit;
128
129        private Tasklet tasklet;
130
131        private PlatformTransactionManager transactionManager;
132
133        private Set<Object> stepExecutionListeners = new LinkedHashSet<Object>();
134
135        //
136        // Flow Elements
137        //
138        private Flow flow;
139
140        //
141        // Job Elements
142        //
143        private Job job;
144
145        private JobLauncher jobLauncher;
146
147        private JobParametersExtractor jobParametersExtractor;
148
149        //
150        // Partition Elements
151        //
152        private Partitioner partitioner;
153
154        private static final int DEFAULT_GRID_SIZE = 6;
155
156        private Step step;
157
158        private PartitionHandler partitionHandler;
159
160        private int gridSize = DEFAULT_GRID_SIZE;
161
162        private Queue<Serializable> partitionQueue;
163
164        private ReentrantLock partitionLock;
165
166        //
167        // Tasklet Elements
168        //
169        private Collection<Class<? extends Throwable>> noRollbackExceptionClasses;
170
171        private Integer transactionTimeout;
172
173        private Propagation propagation;
174
175        private Isolation isolation;
176
177        private Set<ChunkListener> chunkListeners = new LinkedHashSet<ChunkListener>();
178
179        //
180        // Chunk Attributes
181        //
182        private int cacheCapacity = 0;
183
184        private CompletionPolicy chunkCompletionPolicy;
185
186        private Integer commitInterval;
187
188        private Boolean readerTransactionalQueue;
189
190        private Boolean processorTransactional;
191
192        private int retryLimit = 0;
193
194        private BackOffPolicy backOffPolicy;
195
196        private RetryPolicy retryPolicy;
197
198        private RetryContextCache retryContextCache;
199
200        private KeyGenerator keyGenerator;
201
202        private Integer skipLimit;
203
204        private SkipPolicy skipPolicy;
205
206        private TaskExecutor taskExecutor;
207
208        private Integer throttleLimit;
209
210        private ItemReader<? extends I> itemReader;
211
212        private ItemProcessor<? super I, ? extends O> itemProcessor;
213
214        private ItemWriter<? super O> itemWriter;
215
216        //
217        // Chunk Elements
218        //
219        private RetryListener[] retryListeners;
220
221        private Map<Class<? extends Throwable>, Boolean> skippableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>();
222
223        private Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>();
224
225        private ItemStream[] streams;
226
227        private Set<ItemReadListener<I>> readListeners = new LinkedHashSet<ItemReadListener<I>>();
228
229        private Set<ItemWriteListener<O>> writeListeners = new LinkedHashSet<ItemWriteListener<O>>();
230
231        private Set<ItemProcessListener<I, O>> processListeners = new LinkedHashSet<ItemProcessListener<I, O>>();
232
233        private Set<SkipListener<I, O>> skipListeners = new LinkedHashSet<SkipListener<I, O>>();
234
235        private Set<org.springframework.batch.core.jsr.RetryListener> jsrRetryListeners = new LinkedHashSet<org.springframework.batch.core.jsr.RetryListener>();
236
237        //
238        // Additional
239        //
240        private boolean hasChunkElement = false;
241
242        private StepExecutionAggregator stepExecutionAggregator;
243
244        /**
245         * @param queue The {@link Queue} that is used for communication between {@link javax.batch.api.partition.PartitionCollector} and {@link javax.batch.api.partition.PartitionAnalyzer}
246         */
247        public void setPartitionQueue(Queue<Serializable> queue) {
248                this.partitionQueue = queue;
249        }
250
251        /**
252         * Used to coordinate access to the partition queue between the {@link javax.batch.api.partition.PartitionCollector} and {@link javax.batch.api.partition.AbstractPartitionAnalyzer}
253         *
254         * @param lock a lock that will be locked around accessing the partition queue
255         */
256        public void setPartitionLock(ReentrantLock lock) {
257                this.partitionLock = lock;
258        }
259
260        /**
261         * Create a {@link Step} from the configuration provided.
262         *
263         * @see FactoryBean#getObject()
264         */
265        @Override
266        public Step getObject() throws Exception {
267                if (hasChunkElement) {
268                        Assert.isNull(tasklet, "Step [" + name
269                                        + "] has both a <chunk/> element and a 'ref' attribute  referencing a Tasklet.");
270
271                        validateFaultTolerantSettings();
272
273                        if (isFaultTolerant()) {
274                                return createFaultTolerantStep();
275                        }
276                        else {
277                                return createSimpleStep();
278                        }
279                }
280                else if (tasklet != null) {
281                        return createTaskletStep();
282                }
283                else if (flow != null) {
284                        return createFlowStep();
285                }
286                else if (job != null) {
287                        return createJobStep();
288                }
289                else {
290                        return createPartitionStep();
291                }
292        }
293
294        public boolean requiresTransactionManager() {
295                // Currently all step implementations other than TaskletStep are
296                // AbstractStep and do not require a transaction manager
297                return hasChunkElement || tasklet != null;
298        }
299
300        /**
301         * @param builder {@link StepBuilderHelper} representing the step to be enhanced
302         */
303        protected void enhanceCommonStep(StepBuilderHelper<?> builder) {
304                if (allowStartIfComplete != null) {
305                        builder.allowStartIfComplete(allowStartIfComplete);
306                }
307                if (startLimit != null) {
308                        builder.startLimit(startLimit);
309                }
310                builder.repository(jobRepository);
311                builder.transactionManager(transactionManager);
312                for (Object listener : stepExecutionListeners) {
313                        if(listener instanceof StepExecutionListener) {
314                                builder.listener((StepExecutionListener) listener);
315                        } else if(listener instanceof javax.batch.api.listener.StepListener) {
316                                builder.listener(new StepListenerAdapter((javax.batch.api.listener.StepListener) listener));
317                        }
318                }
319        }
320
321        protected Step createPartitionStep() {
322
323                PartitionStepBuilder builder;
324                if (partitioner != null) {
325                        builder = new StepBuilder(name).partitioner(step != null ? step.getName() : name, partitioner).step(step);
326                }
327                else {
328                        builder = new StepBuilder(name).partitioner(step);
329                }
330                enhanceCommonStep(builder);
331
332                if (partitionHandler != null) {
333                        builder.partitionHandler(partitionHandler);
334                }
335                else {
336                        builder.gridSize(gridSize);
337                        builder.taskExecutor(taskExecutor);
338                }
339
340                builder.aggregator(stepExecutionAggregator);
341
342                return builder.build();
343
344        }
345
346        protected Step createFaultTolerantStep() {
347
348                FaultTolerantStepBuilder<I, O> builder = getFaultTolerantStepBuilder(this.name);
349
350                if (commitInterval != null) {
351                        builder.chunk(commitInterval);
352                }
353                builder.chunk(chunkCompletionPolicy);
354                enhanceTaskletStepBuilder(builder);
355
356                builder.reader(itemReader);
357                builder.writer(itemWriter);
358                builder.processor(itemProcessor);
359
360                if (processorTransactional != null && !processorTransactional) {
361                        builder.processorNonTransactional();
362                }
363
364                if (readerTransactionalQueue!=null && readerTransactionalQueue==true) {
365                        builder.readerIsTransactionalQueue();
366                }
367
368                for (SkipListener<I, O> listener : skipListeners) {
369                        builder.listener(listener);
370                }
371
372                for (org.springframework.batch.core.jsr.RetryListener listener : jsrRetryListeners) {
373                        builder.listener(listener);
374                }
375
376                registerItemListeners(builder);
377
378                if (skipPolicy != null) {
379                        builder.skipPolicy(skipPolicy);
380                }
381                else if (skipLimit!=null) {
382                        builder.skipLimit(skipLimit);
383                        for (Class<? extends Throwable> type : skippableExceptionClasses.keySet()) {
384                                if (skippableExceptionClasses.get(type)) {
385                                        builder.skip(type);
386                                }
387                                else {
388                                        builder.noSkip(type);
389                                }
390                        }
391                }
392
393                if (retryListeners != null) {
394                        for (RetryListener listener : retryListeners) {
395                                builder.listener(listener);
396                        }
397                }
398
399                if (retryContextCache == null && cacheCapacity > 0) {
400                        retryContextCache = new MapRetryContextCache(cacheCapacity);
401                }
402                builder.retryContextCache(retryContextCache);
403                builder.keyGenerator(keyGenerator);
404                if (retryPolicy != null) {
405                        builder.retryPolicy(retryPolicy);
406                }
407                else {
408                        builder.retryLimit(retryLimit);
409                        builder.backOffPolicy(backOffPolicy);
410                        for (Class<? extends Throwable> type : retryableExceptionClasses.keySet()) {
411                                if (retryableExceptionClasses.get(type)) {
412                                        builder.retry(type);
413                                }
414                                else {
415                                        builder.noRetry(type);
416                                }
417                        }
418                }
419
420                if (noRollbackExceptionClasses != null) {
421                        for (Class<? extends Throwable> type : noRollbackExceptionClasses) {
422                                builder.noRollback(type);
423                        }
424                }
425
426                return builder.build();
427
428        }
429
430        protected FaultTolerantStepBuilder<I, O> getFaultTolerantStepBuilder(String stepName) {
431                return new FaultTolerantStepBuilder<I, O>(new StepBuilder(stepName));
432        }
433
434        protected void registerItemListeners(SimpleStepBuilder<I, O> builder) {
435                for (ItemReadListener<I> listener : readListeners) {
436                        builder.listener(listener);
437                }
438                for (ItemWriteListener<O> listener : writeListeners) {
439                        builder.listener(listener);
440                }
441                for (ItemProcessListener<I, O> listener : processListeners) {
442                        builder.listener(listener);
443                }
444        }
445
446        protected Step createSimpleStep() {
447                SimpleStepBuilder<I, O> builder = getSimpleStepBuilder(name);
448
449                setChunk(builder);
450
451                enhanceTaskletStepBuilder(builder);
452                registerItemListeners(builder);
453                builder.reader(itemReader);
454                builder.writer(itemWriter);
455                builder.processor(itemProcessor);
456                return builder.build();
457        }
458
459        protected void setChunk(SimpleStepBuilder<I, O> builder) {
460                if (commitInterval != null) {
461                        builder.chunk(commitInterval);
462                }
463                builder.chunk(chunkCompletionPolicy);
464        }
465
466        protected CompletionPolicy getCompletionPolicy() {
467                return this.chunkCompletionPolicy;
468        }
469
470        protected SimpleStepBuilder<I, O> getSimpleStepBuilder(String stepName) {
471                return new SimpleStepBuilder<I, O>(new StepBuilder(stepName));
472        }
473
474        /**
475         * @return a new {@link TaskletStep}
476         */
477        protected TaskletStep createTaskletStep() {
478                TaskletStepBuilder builder = new StepBuilder(name).tasklet(tasklet);
479                enhanceTaskletStepBuilder(builder);
480                return builder.build();
481        }
482
483        @SuppressWarnings("serial")
484        protected void enhanceTaskletStepBuilder(AbstractTaskletStepBuilder<?> builder) {
485
486                enhanceCommonStep(builder);
487                for (ChunkListener listener : chunkListeners) {
488                        if(listener instanceof PartitionCollectorAdapter) {
489                                ((PartitionCollectorAdapter) listener).setPartitionLock(partitionLock);
490                        }
491
492                        builder.listener(listener);
493
494                }
495                builder.taskExecutor(taskExecutor);
496                if (throttleLimit != null) {
497                        builder.throttleLimit(throttleLimit);
498                }
499                builder.transactionManager(transactionManager);
500                if (transactionTimeout != null || propagation != null || isolation != null
501                                || noRollbackExceptionClasses != null) {
502                        DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
503                        if (propagation != null) {
504                                attribute.setPropagationBehavior(propagation.value());
505                        }
506                        if (isolation != null) {
507                                attribute.setIsolationLevel(isolation.value());
508                        }
509                        if (transactionTimeout != null) {
510                                attribute.setTimeout(transactionTimeout);
511                        }
512                        Collection<Class<? extends Throwable>> exceptions = noRollbackExceptionClasses == null ? new HashSet<Class<? extends Throwable>>()
513                                        : noRollbackExceptionClasses;
514                        final BinaryExceptionClassifier classifier = new BinaryExceptionClassifier(exceptions, false);
515                        builder.transactionAttribute(new DefaultTransactionAttribute(attribute) {
516                                @Override
517                                public boolean rollbackOn(Throwable ex) {
518                                        return classifier.classify(ex);
519                                }
520                        });
521                }
522                if (streams != null) {
523                        for (ItemStream stream : streams) {
524                                builder.stream(stream);
525                        }
526                }
527
528        }
529
530        protected Step createFlowStep() {
531                FlowStepBuilder builder = new StepBuilder(name).flow(flow);
532                enhanceCommonStep(builder);
533                return builder.build();
534        }
535
536        private Step createJobStep() throws Exception {
537
538                JobStepBuilder builder = new StepBuilder(name).job(job);
539                enhanceCommonStep(builder);
540                builder.parametersExtractor(jobParametersExtractor);
541                builder.launcher(jobLauncher);
542                return builder.build();
543
544        }
545
546        /**
547         * Validates that all components required to build a fault tolerant step are set
548         */
549        protected void validateFaultTolerantSettings() {
550                validateDependency("skippable-exception-classes", skippableExceptionClasses, "skip-limit", skipLimit, true);
551                validateDependency("retryable-exception-classes", retryableExceptionClasses, "retry-limit", retryLimit, true);
552                validateDependency("retry-listeners", retryListeners, "retry-limit", retryLimit, false);
553                if (isPresent(processorTransactional) && !processorTransactional && isPresent(readerTransactionalQueue)
554                                && readerTransactionalQueue) {
555                        throw new IllegalArgumentException(
556                                        "The field 'processor-transactional' cannot be false if 'reader-transactional-queue' is true");
557                }
558        }
559
560        /**
561         * Check if a field is present then a second is also. If the twoWayDependency flag is set then the opposite must
562         * also be true: if the second value is present, the first must also be.
563         *
564         * @param dependentName the name of the first field
565         * @param dependentValue the value of the first field
566         * @param name the name of the other field (which should be absent if the first is present)
567         * @param value the value of the other field
568         * @param twoWayDependency true if both depend on each other
569         * @throws IllegalArgumentException if either condition is violated
570         */
571        private void validateDependency(String dependentName, Object dependentValue, String name, Object value,
572                        boolean twoWayDependency) {
573                if (isPresent(dependentValue) && !isPresent(value)) {
574                        throw new IllegalArgumentException("The field '" + dependentName + "' is not permitted on the step ["
575                                        + this.name + "] because there is no '" + name + "'.");
576                }
577                if (twoWayDependency && isPresent(value) && !isPresent(dependentValue)) {
578                        throw new IllegalArgumentException("The field '" + name + "' is not permitted on the step [" + this.name
579                                        + "] because there is no '" + dependentName + "'.");
580                }
581        }
582
583        /**
584         * Is the object non-null (or if an Integer, non-zero)?
585         *
586         * @param o an object
587         * @return true if the object has a value
588         */
589        private boolean isPresent(Object o) {
590                if (o instanceof Integer) {
591                        return isPositive((Integer) o);
592                }
593                if (o instanceof Collection) {
594                        return !((Collection<?>) o).isEmpty();
595                }
596                if (o instanceof Map) {
597                        return !((Map<?, ?>) o).isEmpty();
598                }
599                return o != null;
600        }
601
602        /**
603         * @return true if the step is configured with any components that require fault tolerance
604         */
605        protected boolean isFaultTolerant() {
606                return backOffPolicy != null || skipPolicy != null || retryPolicy != null || isPositive(skipLimit)
607                                || isPositive(retryLimit) || isPositive(cacheCapacity) || isTrue(readerTransactionalQueue);
608        }
609
610        private boolean isTrue(Boolean b) {
611                return b != null && b.booleanValue();
612        }
613
614        private boolean isPositive(Integer n) {
615                return n != null && n > 0;
616        }
617
618        @Override
619        public Class<TaskletStep> getObjectType() {
620                return TaskletStep.class;
621        }
622
623        @Override
624        public boolean isSingleton() {
625                return true;
626        }
627
628        // =========================================================
629        // Step Attributes
630        // =========================================================
631
632        /**
633         * Set the bean name property, which will become the name of the {@link Step} when it is created.
634         *
635         * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String)
636         */
637        @Override
638        public void setBeanName(String name) {
639                if (this.name == null) {
640                        this.name = name;
641                }
642        }
643
644        /**
645         * @param name the name to set
646         */
647        public void setName(String name) {
648                this.name = name;
649        }
650
651        public String getName() {
652                return this.name;
653        }
654
655        // =========================================================
656        // Flow Attributes
657        // =========================================================
658
659        /**
660         * @param flow the flow to set
661         */
662        public void setFlow(Flow flow) {
663                this.flow = flow;
664        }
665
666        // =========================================================
667        // Job Attributes
668        // =========================================================
669
670        public void setJob(Job job) {
671                this.job = job;
672        }
673
674        public void setJobParametersExtractor(JobParametersExtractor jobParametersExtractor) {
675                this.jobParametersExtractor = jobParametersExtractor;
676        }
677
678        public void setJobLauncher(JobLauncher jobLauncher) {
679                this.jobLauncher = jobLauncher;
680        }
681
682        // =========================================================
683        // Partition Attributes
684        // =========================================================
685
686        /**
687         * @param partitioner the partitioner to set
688         */
689        public void setPartitioner(Partitioner partitioner) {
690                this.partitioner = partitioner;
691        }
692
693        /**
694         * @param stepExecutionAggregator the stepExecutionAggregator to set
695         */
696        public void setStepExecutionAggregator(StepExecutionAggregator stepExecutionAggregator) {
697                this.stepExecutionAggregator = stepExecutionAggregator;
698        }
699
700        /**
701         * @return stepExecutionAggregator the current step's {@link StepExecutionAggregator}
702         */
703        protected StepExecutionAggregator getStepExecutionAggergator() {
704                return this.stepExecutionAggregator;
705        }
706
707        /**
708         * @param partitionHandler the partitionHandler to set
709         */
710        public void setPartitionHandler(PartitionHandler partitionHandler) {
711                this.partitionHandler = partitionHandler;
712        }
713
714        /**
715         * @return partitionHandler the current step's {@link PartitionHandler}
716         */
717        protected PartitionHandler getPartitionHandler() {
718                return this.partitionHandler;
719        }
720
721        /**
722         * @param gridSize the gridSize to set
723         */
724        public void setGridSize(int gridSize) {
725                this.gridSize = gridSize;
726        }
727
728        /**
729         * @param step the step to set
730         */
731        public void setStep(Step step) {
732                this.step = step;
733        }
734
735        // =========================================================
736        // Tasklet Attributes
737        // =========================================================
738
739        /**
740         * Public setter for the flag to indicate that the step should be replayed on a restart, even if successful the
741         * first time.
742         *
743         * @param allowStartIfComplete the shouldAllowStartIfComplete to set
744         */
745        public void setAllowStartIfComplete(boolean allowStartIfComplete) {
746                this.allowStartIfComplete = allowStartIfComplete;
747
748        }
749
750        /**
751         * @return jobRepository
752         */
753        public JobRepository getJobRepository() {
754                return jobRepository;
755        }
756
757        /**
758         * Public setter for {@link JobRepository}.
759         *
760         * @param jobRepository {@link JobRepository} instance to be used by the step.
761         */
762        public void setJobRepository(JobRepository jobRepository) {
763                this.jobRepository = jobRepository;
764        }
765
766        /**
767         * The number of times that the step should be allowed to start
768         *
769         * @param startLimit int containing the number of times a step should be allowed to start.
770         */
771        public void setStartLimit(int startLimit) {
772                this.startLimit = startLimit;
773        }
774
775        /**
776         * A preconfigured {@link Tasklet} to use.
777         *
778         * @param tasklet {@link Tasklet} instance to be used by step.
779         */
780        public void setTasklet(Tasklet tasklet) {
781                this.tasklet = tasklet;
782        }
783
784        protected Tasklet getTasklet() {
785                return this.tasklet;
786        }
787
788        /**
789         * @return transactionManager instance of {@link PlatformTransactionManager}
790         * used by the step.
791         */
792        public PlatformTransactionManager getTransactionManager() {
793                return transactionManager;
794        }
795
796        /**
797         * @param transactionManager the transaction manager to set
798         */
799        public void setTransactionManager(PlatformTransactionManager transactionManager) {
800                this.transactionManager = transactionManager;
801        }
802
803        // =========================================================
804        // Tasklet Elements
805        // =========================================================
806
807        /**
808         * The listeners to inject into the {@link Step}. Any instance of {@link StepListener} can be used, and will then
809         * receive callbacks at the appropriate stage in the step.
810         *
811         * @param listeners an array of listeners
812         */
813        @SuppressWarnings("unchecked")
814        public void setListeners(Object[] listeners) {
815                for (Object listener : listeners) {
816                        if (listener instanceof SkipListener) {
817                                SkipListener<I, O> skipListener = (SkipListener<I, O>) listener;
818                                skipListeners.add(skipListener);
819                        }
820                        if(listener instanceof SkipReadListener) {
821                                SkipListener<I, O> skipListener = new SkipListenerAdapter<I, O>((SkipReadListener) listener, null, null);
822                                skipListeners.add(skipListener);
823                        }
824                        if(listener instanceof SkipProcessListener) {
825                                SkipListener<I, O> skipListener = new SkipListenerAdapter<I, O>(null,(SkipProcessListener) listener, null);
826                                skipListeners.add(skipListener);
827                        }
828                        if(listener instanceof SkipWriteListener) {
829                                SkipListener<I, O> skipListener = new SkipListenerAdapter<I, O>(null, null, (SkipWriteListener) listener);
830                                skipListeners.add(skipListener);
831                        }
832                        if (listener instanceof StepExecutionListener) {
833                                StepExecutionListener stepExecutionListener = (StepExecutionListener) listener;
834                                stepExecutionListeners.add(stepExecutionListener);
835                        }
836                        if(listener instanceof javax.batch.api.listener.StepListener) {
837                                StepExecutionListener stepExecutionListener = new StepListenerAdapter((javax.batch.api.listener.StepListener) listener);
838                                stepExecutionListeners.add(stepExecutionListener);
839                        }
840                        if (listener instanceof ChunkListener) {
841                                ChunkListener chunkListener = (ChunkListener) listener;
842                                chunkListeners.add(chunkListener);
843                        }
844                        if(listener instanceof javax.batch.api.chunk.listener.ChunkListener) {
845                                ChunkListener chunkListener = new ChunkListenerAdapter((javax.batch.api.chunk.listener.ChunkListener) listener);
846                                chunkListeners.add(chunkListener);
847                        }
848                        if (listener instanceof ItemReadListener) {
849                                ItemReadListener<I> readListener = (ItemReadListener<I>) listener;
850                                readListeners.add(readListener);
851                        }
852                        if(listener instanceof javax.batch.api.chunk.listener.ItemReadListener) {
853                                ItemReadListener<I> itemListener = new ItemReadListenerAdapter<I>((javax.batch.api.chunk.listener.ItemReadListener) listener);
854                                readListeners.add(itemListener);
855                        }
856                        if (listener instanceof ItemWriteListener) {
857                                ItemWriteListener<O> writeListener = (ItemWriteListener<O>) listener;
858                                writeListeners.add(writeListener);
859                        }
860                        if(listener instanceof javax.batch.api.chunk.listener.ItemWriteListener) {
861                                ItemWriteListener<O> itemListener = new ItemWriteListenerAdapter<O>((javax.batch.api.chunk.listener.ItemWriteListener) listener);
862                                writeListeners.add(itemListener);
863                        }
864                        if (listener instanceof ItemProcessListener) {
865                                ItemProcessListener<I, O> processListener = (ItemProcessListener<I, O>) listener;
866                                processListeners.add(processListener);
867                        }
868                        if(listener instanceof javax.batch.api.chunk.listener.ItemProcessListener) {
869                                ItemProcessListener<I,O> itemListener = new ItemProcessListenerAdapter<I, O>((javax.batch.api.chunk.listener.ItemProcessListener) listener);
870                                processListeners.add(itemListener);
871                        }
872                        if(listener instanceof RetryReadListener) {
873                                jsrRetryListeners.add(new RetryReadListenerAdapter((RetryReadListener) listener));
874                        }
875                        if(listener instanceof RetryProcessListener) {
876                                jsrRetryListeners.add(new RetryProcessListenerAdapter((RetryProcessListener) listener));
877                        }
878                        if(listener instanceof RetryWriteListener) {
879                                jsrRetryListeners.add(new RetryWriteListenerAdapter((RetryWriteListener) listener));
880                        }
881                        if(listener instanceof PartitionCollector) {
882                                PartitionCollectorAdapter adapter = new PartitionCollectorAdapter(partitionQueue, (PartitionCollector) listener);
883                                chunkListeners.add(adapter);
884                        }
885                }
886        }
887
888        /**
889         * Exception classes that may not cause a rollback if encountered in the right place.
890         *
891         * @param noRollbackExceptionClasses the noRollbackExceptionClasses to set
892         */
893        public void setNoRollbackExceptionClasses(Collection<Class<? extends Throwable>> noRollbackExceptionClasses) {
894                this.noRollbackExceptionClasses = noRollbackExceptionClasses;
895        }
896
897        /**
898         * @param transactionTimeout the transactionTimeout to set
899         */
900        public void setTransactionTimeout(int transactionTimeout) {
901                this.transactionTimeout = transactionTimeout;
902        }
903
904        /**
905         * @param isolation the isolation to set
906         */
907        public void setIsolation(Isolation isolation) {
908                this.isolation = isolation;
909        }
910
911        /**
912         * @param propagation the propagation to set
913         */
914        public void setPropagation(Propagation propagation) {
915                this.propagation = propagation;
916        }
917
918        // =========================================================
919        // Parent Attributes - can be provided in parent bean but not namespace
920        // =========================================================
921
922        /**
923         * A backoff policy to be applied to retry process.
924         *
925         * @param backOffPolicy the {@link BackOffPolicy} to set
926         */
927        public void setBackOffPolicy(BackOffPolicy backOffPolicy) {
928                this.backOffPolicy = backOffPolicy;
929        }
930
931        /**
932         * A retry policy to apply when exceptions occur. If this is specified then the retry limit and retryable exceptions
933         * will be ignored.
934         *
935         * @param retryPolicy the {@link RetryPolicy} to set
936         */
937        public void setRetryPolicy(RetryPolicy retryPolicy) {
938                this.retryPolicy = retryPolicy;
939        }
940
941        /**
942         * @param retryContextCache the {@link RetryContextCache} to set
943         */
944        public void setRetryContextCache(RetryContextCache retryContextCache) {
945                this.retryContextCache = retryContextCache;
946        }
947
948        /**
949         * A key generator that can be used to compare items with previously recorded items in a retry. Only used if the
950         * reader is a transactional queue.
951         *
952         * @param keyGenerator the {@link KeyGenerator} to set
953         */
954        public void setKeyGenerator(KeyGenerator keyGenerator) {
955                this.keyGenerator = keyGenerator;
956        }
957
958        // =========================================================
959        // Chunk Attributes
960        // =========================================================
961
962        /**
963         * Public setter for the capacity of the cache in the retry policy. If more items than this fail without being
964         * skipped or recovered an exception will be thrown. This is to guard against inadvertent infinite loops generated
965         * by item identity problems.<br>
966         * <br>
967         * The default value should be high enough and more for most purposes. To breach the limit in a single-threaded step
968         * typically you have to have this many failures in a single transaction. Defaults to the value in the
969         * {@link MapRetryContextCache}.<br>
970         *
971         * @param cacheCapacity the cache capacity to set (greater than 0 else ignored)
972         */
973        public void setCacheCapacity(int cacheCapacity) {
974                this.cacheCapacity = cacheCapacity;
975        }
976
977        /**
978         * Public setter for the {@link CompletionPolicy} applying to the chunk level. A transaction will be committed when
979         * this policy decides to complete. Defaults to a {@link SimpleCompletionPolicy} with chunk size equal to the
980         * commitInterval property.
981         *
982         * @param chunkCompletionPolicy the chunkCompletionPolicy to set
983         */
984        public void setChunkCompletionPolicy(CompletionPolicy chunkCompletionPolicy) {
985                this.chunkCompletionPolicy = chunkCompletionPolicy;
986        }
987
988        /**
989         * Set the commit interval. Either set this or the chunkCompletionPolicy but not both.
990         *
991         * @param commitInterval 1 by default
992         */
993        public void setCommitInterval(int commitInterval) {
994                this.commitInterval = commitInterval;
995        }
996
997        protected Integer getCommitInterval() {
998                return this.commitInterval;
999        }
1000
1001        /**
1002         * Flag to signal that the reader is transactional (usually a JMS consumer) so that items are re-presented after a
1003         * rollback. The default is false and readers are assumed to be forward-only.
1004         *
1005         * @param isReaderTransactionalQueue the value of the flag
1006         */
1007        public void setIsReaderTransactionalQueue(boolean isReaderTransactionalQueue) {
1008                this.readerTransactionalQueue = isReaderTransactionalQueue;
1009        }
1010
1011        /**
1012         * Flag to signal that the processor is transactional, in which case it should be called for every item in every
1013         * transaction. If false then we can cache the processor results between transactions in the case of a rollback.
1014         *
1015         * @param processorTransactional the value to set
1016         */
1017        public void setProcessorTransactional(Boolean processorTransactional) {
1018                this.processorTransactional = processorTransactional;
1019        }
1020
1021        /**
1022         * Public setter for the retry limit. Each item can be retried up to this limit. Note this limit includes the
1023         * initial attempt to process the item, therefore <code>retryLimit == 1</code> by default.
1024         *
1025         * @param retryLimit the retry limit to set, must be greater or equal to 1.
1026         */
1027        public void setRetryLimit(int retryLimit) {
1028                this.retryLimit = retryLimit;
1029        }
1030
1031        /**
1032         * Public setter for a limit that determines skip policy. If this value is positive then an exception in chunk
1033         * processing will cause the item to be skipped and no exception propagated until the limit is reached. If it is
1034         * zero then all exceptions will be propagated from the chunk and cause the step to abort.
1035         *
1036         * @param skipLimit the value to set. Default is 0 (never skip).
1037         */
1038        public void setSkipLimit(int skipLimit) {
1039                this.skipLimit = skipLimit;
1040        }
1041
1042        /**
1043         * Public setter for a skip policy. If this value is set then the skip limit and skippable exceptions are ignored.
1044         *
1045         * @param skipPolicy the {@link SkipPolicy} to set
1046         */
1047        public void setSkipPolicy(SkipPolicy skipPolicy) {
1048                this.skipPolicy = skipPolicy;
1049        }
1050
1051        /**
1052         * Public setter for the {@link TaskExecutor}. If this is set, then it will be used to execute the chunk processing
1053         * inside the {@link Step}.
1054         *
1055         * @param taskExecutor the taskExecutor to set
1056         */
1057        public void setTaskExecutor(TaskExecutor taskExecutor) {
1058                this.taskExecutor = taskExecutor;
1059        }
1060
1061        /**
1062         * Public setter for the throttle limit. This limits the number of tasks queued for concurrent processing to prevent
1063         * thread pools from being overwhelmed. Defaults to {@link TaskExecutorRepeatTemplate#DEFAULT_THROTTLE_LIMIT}.
1064         *
1065         * @param throttleLimit the throttle limit to set.
1066         */
1067        public void setThrottleLimit(Integer throttleLimit) {
1068                this.throttleLimit = throttleLimit;
1069        }
1070
1071        /**
1072         * @param itemReader the {@link ItemReader} to set
1073         */
1074        public void setItemReader(ItemReader<? extends I> itemReader) {
1075                this.itemReader = itemReader;
1076        }
1077
1078        /**
1079         * @param itemProcessor the {@link ItemProcessor} to set
1080         */
1081        public void setItemProcessor(ItemProcessor<? super I, ? extends O> itemProcessor) {
1082                this.itemProcessor = itemProcessor;
1083        }
1084
1085        /**
1086         * @param itemWriter the {@link ItemWriter} to set
1087         */
1088        public void setItemWriter(ItemWriter<? super O> itemWriter) {
1089                this.itemWriter = itemWriter;
1090        }
1091
1092        // =========================================================
1093        // Chunk Elements
1094        // =========================================================
1095
1096        /**
1097         * Public setter for the {@link RetryListener}s.
1098         *
1099         * @param retryListeners the {@link RetryListener}s to set
1100         */
1101        public void setRetryListeners(RetryListener... retryListeners) {
1102                this.retryListeners = retryListeners;
1103        }
1104
1105        /**
1106         * Public setter for exception classes that when raised won't crash the job but will result in transaction rollback
1107         * and the item which handling caused the exception will be skipped.
1108         *
1109         * @param exceptionClasses {@link Map} containing the {@link Throwable}s as
1110         * the keys and the values are {@link Boolean}s, that if true the item is skipped.
1111         */
1112        public void setSkippableExceptionClasses(Map<Class<? extends Throwable>, Boolean> exceptionClasses) {
1113                this.skippableExceptionClasses = exceptionClasses;
1114        }
1115
1116        /**
1117         * Public setter for exception classes that will retry the item when raised.
1118         *
1119         * @param retryableExceptionClasses the retryableExceptionClasses to set
1120         */
1121        public void setRetryableExceptionClasses(Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses) {
1122                this.retryableExceptionClasses = retryableExceptionClasses;
1123        }
1124
1125        /**
1126         * The streams to inject into the {@link Step}. Any instance of {@link ItemStream} can be used, and will then
1127         * receive callbacks at the appropriate stage in the step.
1128         *
1129         * @param streams an array of listeners
1130         */
1131        public void setStreams(ItemStream[] streams) {
1132                this.streams = streams;
1133        }
1134
1135        // =========================================================
1136        // Additional
1137        // =========================================================
1138
1139        /**
1140         * @param hasChunkElement true if step has &lt;chunk/&gt; element.
1141         */
1142        public void setHasChunkElement(boolean hasChunkElement) {
1143                this.hasChunkElement = hasChunkElement;
1144        }
1145
1146        /**
1147         * @return true if the defined step has a &lt;chunk/&gt; element
1148         */
1149        protected boolean hasChunkElement() {
1150                return this.hasChunkElement;
1151        }
1152
1153        /**
1154         * @return true if the defined step has a &lt;tasklet/&gt; element
1155         */
1156        protected boolean hasTasklet() {
1157                return this.tasklet != null;
1158        }
1159
1160        /**
1161         * @return true if the defined step has a &lt;partition/&gt; element
1162         */
1163        protected boolean hasPartitionElement() {
1164                return this.partitionHandler != null;
1165        }
1166}