001/*
002 * Copyright 2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.jsr.step.builder;
017
018import java.util.ArrayList;
019import java.util.List;
020
021import org.springframework.batch.core.ChunkListener;
022import org.springframework.batch.core.Step;
023import org.springframework.batch.core.StepListener;
024import org.springframework.batch.core.jsr.configuration.support.BatchPropertyContext;
025import org.springframework.batch.core.jsr.step.BatchletStep;
026import org.springframework.batch.core.jsr.step.item.JsrChunkProvider;
027import org.springframework.batch.core.jsr.step.item.JsrFaultTolerantChunkProcessor;
028import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
029import org.springframework.batch.core.step.builder.StepBuilder;
030import org.springframework.batch.core.step.builder.StepBuilderException;
031import org.springframework.batch.core.step.item.ChunkOrientedTasklet;
032import org.springframework.batch.core.step.item.ChunkProcessor;
033import org.springframework.batch.core.step.item.ChunkProvider;
034import org.springframework.batch.core.step.skip.SkipPolicy;
035import org.springframework.batch.core.step.tasklet.TaskletStep;
036import org.springframework.batch.item.ItemStream;
037import org.springframework.batch.repeat.support.RepeatTemplate;
038import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate;
039
040/**
041 * A step builder that extends the {@link FaultTolerantStepBuilder} to create JSR-352
042 * specific {@link ChunkProvider} and {@link ChunkProcessor} supporting both the chunking
043 * pattern defined by the spec as well as skip/retry logic.
044 *
045 * @author Michael Minella
046 * @author Chris Schaefer
047 *
048 * @param <I> The input type for the step
049 * @param <O> The output type for the step
050 */
051public class JsrFaultTolerantStepBuilder<I, O> extends FaultTolerantStepBuilder<I, O> {
052
053        private BatchPropertyContext batchPropertyContext;
054
055        public void setBatchPropertyContext(BatchPropertyContext batchPropertyContext) {
056                this.batchPropertyContext = batchPropertyContext;
057        }
058
059        public JsrFaultTolerantStepBuilder(StepBuilder parent) {
060                super(parent);
061        }
062
063        @Override
064        public FaultTolerantStepBuilder<I, O> faultTolerant() {
065                return this;
066        }
067
068
069        /**
070         * Build the step from the components collected by the fluent setters. Delegates first to {@link #enhance(Step)} and
071         * then to {@link #createTasklet()} in subclasses to create the actual tasklet.
072         *
073         * @return a tasklet step fully configured and read to execute
074         */
075        @Override
076        public TaskletStep build() {
077                registerStepListenerAsSkipListener();
078                registerAsStreamsAndListeners(getReader(), getProcessor(), getWriter());
079
080                registerStepListenerAsChunkListener();
081
082                BatchletStep step = new BatchletStep(getName(), batchPropertyContext);
083
084                super.enhance(step);
085
086                step.setChunkListeners(chunkListeners.toArray(new ChunkListener[0]));
087
088                if (getTransactionAttribute() != null) {
089                        step.setTransactionAttribute(getTransactionAttribute());
090                }
091
092                if (getStepOperations() == null) {
093
094                        stepOperations(new RepeatTemplate());
095
096                        if (getTaskExecutor() != null) {
097                                TaskExecutorRepeatTemplate repeatTemplate = new TaskExecutorRepeatTemplate();
098                                repeatTemplate.setTaskExecutor(getTaskExecutor());
099                                repeatTemplate.setThrottleLimit(getThrottleLimit());
100                                stepOperations(repeatTemplate);
101                        }
102
103                        ((RepeatTemplate) getStepOperations()).setExceptionHandler(getExceptionHandler());
104
105                }
106                step.setStepOperations(getStepOperations());
107                step.setTasklet(createTasklet());
108
109                step.setStreams(getStreams().toArray(new ItemStream[0]));
110
111                try {
112                        step.afterPropertiesSet();
113                }
114                catch (Exception e) {
115                        throw new StepBuilderException(e);
116                }
117
118                return step;
119
120        }
121
122        @Override
123        protected ChunkProvider<I> createChunkProvider() {
124                return new JsrChunkProvider<I>();
125        }
126
127        /**
128         * Provides a JSR-352 specific implementation of a {@link ChunkProcessor} for use
129         * within the {@link ChunkOrientedTasklet}
130         *
131         * @return a JSR-352 implementation of the {@link ChunkProcessor}
132         * @see JsrFaultTolerantChunkProcessor
133         */
134        @Override
135        protected ChunkProcessor<I> createChunkProcessor() {
136                SkipPolicy skipPolicy = getFatalExceptionAwareProxy(createSkipPolicy());
137                JsrFaultTolerantChunkProcessor<I, O> chunkProcessor = 
138                                new JsrFaultTolerantChunkProcessor<I, O>(getReader(), getProcessor(),
139                                getWriter(), createChunkOperations(), createRetryOperations());
140                chunkProcessor.setSkipPolicy(skipPolicy);
141                chunkProcessor.setRollbackClassifier(getRollbackClassifier());
142                detectStreamInReader();
143                chunkProcessor.setChunkMonitor(getChunkMonitor());
144                chunkProcessor.setListeners(getChunkListeners());
145
146                return chunkProcessor;
147        }
148
149        private List<StepListener> getChunkListeners() {
150                List<StepListener> listeners = new ArrayList<StepListener>();
151                listeners.addAll(getItemListeners());
152                listeners.addAll(getSkipListeners());
153                listeners.addAll(getJsrRetryListeners());
154
155                return listeners;
156        }
157}