001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.step.builder;
017
018import org.springframework.batch.core.Step;
019import org.springframework.batch.core.partition.PartitionHandler;
020import org.springframework.batch.core.partition.StepExecutionSplitter;
021import org.springframework.batch.core.partition.support.PartitionStep;
022import org.springframework.batch.core.partition.support.Partitioner;
023import org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter;
024import org.springframework.batch.core.partition.support.StepExecutionAggregator;
025import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
026import org.springframework.core.task.SyncTaskExecutor;
027import org.springframework.core.task.TaskExecutor;
028
029/**
030 * Step builder for {@link PartitionStep} instances. A partition step executes the same step (possibly remotely)
031 * multiple times with different input parameters (in the form of execution context). Useful for parallelization.
032 *
033 * @author Dave Syer
034 * @author Mahmoud Ben Hassine
035 * @author Dimitrios Liapis
036 *
037 * @since 2.2
038 */
039public class PartitionStepBuilder extends StepBuilderHelper<PartitionStepBuilder> {
040
041        private TaskExecutor taskExecutor;
042
043        private Partitioner partitioner;
044
045        private static final int DEFAULT_GRID_SIZE = 6;
046
047        private Step step;
048
049        private PartitionHandler partitionHandler;
050
051        private int gridSize = DEFAULT_GRID_SIZE;
052
053        private StepExecutionSplitter splitter;
054
055        private StepExecutionAggregator aggregator;
056
057        private String stepName;
058
059        /**
060         * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used.
061         *
062         * @param parent a parent helper containing common step properties
063         */
064        public PartitionStepBuilder(StepBuilderHelper<?> parent) {
065                super(parent);
066        }
067
068        /**
069         * Add a partitioner which can be used to create a {@link StepExecutionSplitter}. Use either this or an explicit
070         * {@link #splitter(StepExecutionSplitter)} but not both.
071         *
072         * @param slaveStepName the name of the slave step (used to construct step execution names)
073         * @param partitioner a partitioner to use
074         * @return this for fluent chaining
075         */
076        public PartitionStepBuilder partitioner(String slaveStepName, Partitioner partitioner) {
077                this.stepName = slaveStepName;
078                this.partitioner = partitioner;
079                return this;
080        }
081
082        /**
083         * Provide an actual step instance to execute in parallel. If an explicit
084         * {@link #partitionHandler(PartitionHandler)} is provided, the step is optional and is only used to extract
085         * configuration data (name and other basic properties of a step).
086         *
087         * @param step a step to execute in parallel
088         * @return this for fluent chaining
089         */
090        public PartitionStepBuilder step(Step step) {
091                this.step = step;
092                return this;
093        }
094
095        /**
096         * Provide a task executor to use when constructing a {@link PartitionHandler} from the {@link #step(Step)}. Mainly
097         * used for running a step locally in parallel, but can be used to execute remotely if the step is remote. Not used
098         * if an explicit {@link #partitionHandler(PartitionHandler)} is provided.
099         *
100         * @param taskExecutor a task executor to use when executing steps in parallel
101         * @return this for fluent chaining
102         */
103        public PartitionStepBuilder taskExecutor(TaskExecutor taskExecutor) {
104                this.taskExecutor = taskExecutor;
105                return this;
106        }
107
108        /**
109         * Provide an explicit partition handler that will carry out the work of the partition step. The partition handler
110         * is the main SPI for adapting a partition step to a specific distributed computation environment. Optional if you
111         * only need local or remote processing through the Step interface.
112         *
113         * @see #step(Step) for setting up a default handler that works with a local or remote Step
114         *
115         * @param partitionHandler a partition handler
116         * @return this for fluent chaining
117         */
118        public PartitionStepBuilder partitionHandler(PartitionHandler partitionHandler) {
119                this.partitionHandler = partitionHandler;
120                return this;
121        }
122
123        /**
124         * A hint to the {@link #splitter(StepExecutionSplitter)} about how many step executions are required. If running
125         * locally or remotely through a {@link #taskExecutor(TaskExecutor)} determines precisely the number of step
126         * executions in the first attempt at a partition step execution.
127         *
128         * @param gridSize the grid size
129         * @return this for fluent chaining
130         */
131        public PartitionStepBuilder gridSize(int gridSize) {
132                this.gridSize = gridSize;
133                return this;
134        }
135
136        /**
137         * Provide an explicit {@link StepExecutionSplitter} instead of having one build from the
138         * {@link #partitioner(String, Partitioner)}. Useful if you need more control over the splitting.
139         *
140         * @param splitter a step execution splitter
141         * @return this for fluent chaining
142         */
143        public PartitionStepBuilder splitter(StepExecutionSplitter splitter) {
144                this.splitter = splitter;
145                return this;
146        }
147
148        /**
149         * Provide a step execution aggregator for aggregating partitioned step executions into a single result for the
150         * {@link PartitionStep} itself.  Default is a simple implementation that works in most cases.
151         *
152         * @param aggregator a step execution aggregator
153         * @return this for fluent chaining
154         */
155        public PartitionStepBuilder aggregator(StepExecutionAggregator aggregator) {
156                this.aggregator = aggregator;
157                return this;
158        }
159
160        public Step build() {
161                PartitionStep step = new PartitionStep();
162                step.setName(getName());
163                super.enhance(step);
164
165                if (partitionHandler != null) {
166                        step.setPartitionHandler(partitionHandler);
167                }
168                else {
169                        TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler();
170                        partitionHandler.setStep(this.step);
171                        if (taskExecutor == null) {
172                                taskExecutor = new SyncTaskExecutor();
173                        }
174                        partitionHandler.setGridSize(gridSize);
175                        partitionHandler.setTaskExecutor(taskExecutor);
176                        step.setPartitionHandler(partitionHandler);
177                }
178
179                if (splitter != null) {
180                        step.setStepExecutionSplitter(splitter);
181                }
182                else {
183
184                        boolean allowStartIfComplete = isAllowStartIfComplete();
185                        String name = stepName;
186                        if (this.step != null) {
187                                try {
188                                        allowStartIfComplete = this.step.isAllowStartIfComplete();
189                                        name = this.step.getName();
190                                }
191                                catch (Exception e) {
192                                        logger.info("Ignored exception from step asking for name and allowStartIfComplete flag. "
193                                                        + "Using default from enclosing PartitionStep (" + name + "," + allowStartIfComplete + ").");
194                                }
195                        }
196                        SimpleStepExecutionSplitter splitter = new SimpleStepExecutionSplitter();
197                        splitter.setPartitioner(partitioner);
198                        splitter.setJobRepository(getJobRepository());
199                        splitter.setAllowStartIfComplete(allowStartIfComplete);
200                        splitter.setStepName(name);
201                        this.splitter = splitter;
202                        step.setStepExecutionSplitter(splitter);
203
204                }
205
206                if (aggregator != null) {
207                        step.setStepExecutionAggregator(aggregator);
208                }
209
210                try {
211                        step.afterPropertiesSet();
212                }
213                catch (Exception e) {
214                        throw new StepBuilderException(e);
215                }
216
217                return step;
218
219        }
220
221        protected TaskExecutor getTaskExecutor() {
222                return taskExecutor;
223        }
224
225        protected Partitioner getPartitioner() {
226                return partitioner;
227        }
228
229        protected Step getStep() {
230                return step;
231        }
232
233        protected PartitionHandler getPartitionHandler() {
234                return partitionHandler;
235        }
236
237        protected int getGridSize() {
238                return gridSize;
239        }
240
241        protected StepExecutionSplitter getSplitter() {
242                return splitter;
243        }
244
245        protected StepExecutionAggregator getAggregator() {
246                return aggregator;
247        }
248
249        protected String getStepName() {
250                return stepName;
251        }
252}