001/*
002 * Copyright 2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.jsr.step.builder;
017
018import java.util.ArrayList;
019
020import org.springframework.batch.core.ChunkListener;
021import org.springframework.batch.core.Step;
022import org.springframework.batch.core.StepListener;
023import org.springframework.batch.core.jsr.configuration.support.BatchPropertyContext;
024import org.springframework.batch.core.jsr.step.BatchletStep;
025import org.springframework.batch.core.jsr.step.item.JsrChunkProcessor;
026import org.springframework.batch.core.jsr.step.item.JsrChunkProvider;
027import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
028import org.springframework.batch.core.step.builder.SimpleStepBuilder;
029import org.springframework.batch.core.step.builder.StepBuilder;
030import org.springframework.batch.core.step.builder.StepBuilderException;
031import org.springframework.batch.core.step.item.ChunkOrientedTasklet;
032import org.springframework.batch.core.step.item.ChunkProcessor;
033import org.springframework.batch.core.step.item.ChunkProvider;
034import org.springframework.batch.core.step.tasklet.Tasklet;
035import org.springframework.batch.core.step.tasklet.TaskletStep;
036import org.springframework.batch.item.ItemStream;
037import org.springframework.batch.repeat.RepeatOperations;
038import org.springframework.batch.repeat.support.RepeatTemplate;
039import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate;
040import org.springframework.util.Assert;
041
042/**
043 * A step builder that extends the {@link FaultTolerantStepBuilder} to create JSR-352
044 * specific {@link ChunkProvider} and {@link ChunkProcessor} supporting the chunking
045 * pattern defined by the spec.
046 *
047 * @author Michael Minella
048 *
049 * @param <I> The input type for the step
050 * @param <O> The output type for the step
051 */
052public class JsrSimpleStepBuilder<I, O> extends SimpleStepBuilder<I, O> {
053
054        private BatchPropertyContext batchPropertyContext;
055
056        public JsrSimpleStepBuilder(StepBuilder parent) {
057                super(parent);
058        }
059
060        public JsrPartitionStepBuilder partitioner(Step step) {
061                return new JsrPartitionStepBuilder(this).step(step);
062        }
063
064        public void setBatchPropertyContext(BatchPropertyContext batchPropertyContext) {
065                this.batchPropertyContext = batchPropertyContext;
066        }
067
068        /**
069         * Build the step from the components collected by the fluent setters. Delegates first to {@link #enhance(Step)} and
070         * then to {@link #createTasklet()} in subclasses to create the actual tasklet.
071         *
072         * @return a tasklet step fully configured and read to execute
073         */
074        @Override
075        public TaskletStep build() {
076                registerStepListenerAsItemListener();
077                registerAsStreamsAndListeners(getReader(), getProcessor(), getWriter());
078                registerStepListenerAsChunkListener();
079
080                BatchletStep step = new BatchletStep(getName(), batchPropertyContext);
081
082                super.enhance(step);
083
084                step.setChunkListeners(chunkListeners.toArray(new ChunkListener[0]));
085
086                if (getTransactionAttribute() != null) {
087                        step.setTransactionAttribute(getTransactionAttribute());
088                }
089
090                if (getStepOperations() == null) {
091
092                        stepOperations(new RepeatTemplate());
093
094                        if (getTaskExecutor() != null) {
095                                TaskExecutorRepeatTemplate repeatTemplate = new TaskExecutorRepeatTemplate();
096                                repeatTemplate.setTaskExecutor(getTaskExecutor());
097                                repeatTemplate.setThrottleLimit(getThrottleLimit());
098                                stepOperations(repeatTemplate);
099                        }
100
101                        ((RepeatTemplate) getStepOperations()).setExceptionHandler(getExceptionHandler());
102
103                }
104                step.setStepOperations(getStepOperations());
105                step.setTasklet(createTasklet());
106
107                ItemStream[] streams = getStreams().toArray(new ItemStream[0]);
108                step.setStreams(streams);
109
110                try {
111                        step.afterPropertiesSet();
112                }
113                catch (Exception e) {
114                        throw new StepBuilderException(e);
115                }
116
117                return step;
118
119        }
120
121        @Override
122        protected Tasklet createTasklet() {
123                Assert.state(getReader() != null, "ItemReader must be provided");
124                Assert.state(getProcessor() != null || getWriter() != null, "ItemWriter or ItemProcessor must be provided");
125                RepeatOperations repeatOperations = createRepeatOperations();
126                ChunkProvider<I> chunkProvider = new JsrChunkProvider<I>();
127                JsrChunkProcessor<I, O> chunkProcessor = new JsrChunkProcessor<I, O>(getReader(), getProcessor(), getWriter(), repeatOperations);
128                chunkProcessor.setListeners(new ArrayList<StepListener>(getItemListeners()));
129                ChunkOrientedTasklet<I> tasklet = new ChunkOrientedTasklet<I>(chunkProvider, chunkProcessor);
130                tasklet.setBuffering(!isReaderTransactionalQueue());
131                return tasklet;
132        }
133
134        private RepeatOperations createRepeatOperations() {
135                RepeatTemplate repeatOperations = new RepeatTemplate();
136                repeatOperations.setCompletionPolicy(getChunkCompletionPolicy());
137                repeatOperations.setExceptionHandler(getExceptionHandler());
138                return repeatOperations;
139        }
140}