001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.step.builder; 017 018import org.springframework.batch.core.Step; 019import org.springframework.batch.core.partition.PartitionHandler; 020import org.springframework.batch.core.partition.StepExecutionSplitter; 021import org.springframework.batch.core.partition.support.PartitionStep; 022import org.springframework.batch.core.partition.support.Partitioner; 023import org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter; 024import org.springframework.batch.core.partition.support.StepExecutionAggregator; 025import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler; 026import org.springframework.core.task.SyncTaskExecutor; 027import org.springframework.core.task.TaskExecutor; 028 029/** 030 * Step builder for {@link PartitionStep} instances. A partition step executes the same step (possibly remotely) 031 * multiple times with different input parameters (in the form of execution context). Useful for parallelization. 032 * 033 * @author Dave Syer 034 * @author Mahmoud Ben Hassine 035 * @author Dimitrios Liapis 036 * 037 * @since 2.2 038 */ 039public class PartitionStepBuilder extends StepBuilderHelper<PartitionStepBuilder> { 040 041 private TaskExecutor taskExecutor; 042 043 private Partitioner partitioner; 044 045 private static final int DEFAULT_GRID_SIZE = 6; 046 047 private Step step; 048 049 private PartitionHandler partitionHandler; 050 051 private int gridSize = DEFAULT_GRID_SIZE; 052 053 private StepExecutionSplitter splitter; 054 055 private StepExecutionAggregator aggregator; 056 057 private String stepName; 058 059 /** 060 * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used. 061 * 062 * @param parent a parent helper containing common step properties 063 */ 064 public PartitionStepBuilder(StepBuilderHelper<?> parent) { 065 super(parent); 066 } 067 068 /** 069 * Add a partitioner which can be used to create a {@link StepExecutionSplitter}. Use either this or an explicit 070 * {@link #splitter(StepExecutionSplitter)} but not both. 071 * 072 * @param slaveStepName the name of the slave step (used to construct step execution names) 073 * @param partitioner a partitioner to use 074 * @return this for fluent chaining 075 */ 076 public PartitionStepBuilder partitioner(String slaveStepName, Partitioner partitioner) { 077 this.stepName = slaveStepName; 078 this.partitioner = partitioner; 079 return this; 080 } 081 082 /** 083 * Provide an actual step instance to execute in parallel. If an explicit 084 * {@link #partitionHandler(PartitionHandler)} is provided, the step is optional and is only used to extract 085 * configuration data (name and other basic properties of a step). 086 * 087 * @param step a step to execute in parallel 088 * @return this for fluent chaining 089 */ 090 public PartitionStepBuilder step(Step step) { 091 this.step = step; 092 return this; 093 } 094 095 /** 096 * Provide a task executor to use when constructing a {@link PartitionHandler} from the {@link #step(Step)}. Mainly 097 * used for running a step locally in parallel, but can be used to execute remotely if the step is remote. Not used 098 * if an explicit {@link #partitionHandler(PartitionHandler)} is provided. 099 * 100 * @param taskExecutor a task executor to use when executing steps in parallel 101 * @return this for fluent chaining 102 */ 103 public PartitionStepBuilder taskExecutor(TaskExecutor taskExecutor) { 104 this.taskExecutor = taskExecutor; 105 return this; 106 } 107 108 /** 109 * Provide an explicit partition handler that will carry out the work of the partition step. The partition handler 110 * is the main SPI for adapting a partition step to a specific distributed computation environment. Optional if you 111 * only need local or remote processing through the Step interface. 112 * 113 * @see #step(Step) for setting up a default handler that works with a local or remote Step 114 * 115 * @param partitionHandler a partition handler 116 * @return this for fluent chaining 117 */ 118 public PartitionStepBuilder partitionHandler(PartitionHandler partitionHandler) { 119 this.partitionHandler = partitionHandler; 120 return this; 121 } 122 123 /** 124 * A hint to the {@link #splitter(StepExecutionSplitter)} about how many step executions are required. If running 125 * locally or remotely through a {@link #taskExecutor(TaskExecutor)} determines precisely the number of step 126 * executions in the first attempt at a partition step execution. 127 * 128 * @param gridSize the grid size 129 * @return this for fluent chaining 130 */ 131 public PartitionStepBuilder gridSize(int gridSize) { 132 this.gridSize = gridSize; 133 return this; 134 } 135 136 /** 137 * Provide an explicit {@link StepExecutionSplitter} instead of having one build from the 138 * {@link #partitioner(String, Partitioner)}. Useful if you need more control over the splitting. 139 * 140 * @param splitter a step execution splitter 141 * @return this for fluent chaining 142 */ 143 public PartitionStepBuilder splitter(StepExecutionSplitter splitter) { 144 this.splitter = splitter; 145 return this; 146 } 147 148 /** 149 * Provide a step execution aggregator for aggregating partitioned step executions into a single result for the 150 * {@link PartitionStep} itself. Default is a simple implementation that works in most cases. 151 * 152 * @param aggregator a step execution aggregator 153 * @return this for fluent chaining 154 */ 155 public PartitionStepBuilder aggregator(StepExecutionAggregator aggregator) { 156 this.aggregator = aggregator; 157 return this; 158 } 159 160 public Step build() { 161 PartitionStep step = new PartitionStep(); 162 step.setName(getName()); 163 super.enhance(step); 164 165 if (partitionHandler != null) { 166 step.setPartitionHandler(partitionHandler); 167 } 168 else { 169 TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler(); 170 partitionHandler.setStep(this.step); 171 if (taskExecutor == null) { 172 taskExecutor = new SyncTaskExecutor(); 173 } 174 partitionHandler.setGridSize(gridSize); 175 partitionHandler.setTaskExecutor(taskExecutor); 176 step.setPartitionHandler(partitionHandler); 177 } 178 179 if (splitter != null) { 180 step.setStepExecutionSplitter(splitter); 181 } 182 else { 183 184 boolean allowStartIfComplete = isAllowStartIfComplete(); 185 String name = stepName; 186 if (this.step != null) { 187 try { 188 allowStartIfComplete = this.step.isAllowStartIfComplete(); 189 name = this.step.getName(); 190 } 191 catch (Exception e) { 192 logger.info("Ignored exception from step asking for name and allowStartIfComplete flag. " 193 + "Using default from enclosing PartitionStep (" + name + "," + allowStartIfComplete + ")."); 194 } 195 } 196 SimpleStepExecutionSplitter splitter = new SimpleStepExecutionSplitter(); 197 splitter.setPartitioner(partitioner); 198 splitter.setJobRepository(getJobRepository()); 199 splitter.setAllowStartIfComplete(allowStartIfComplete); 200 splitter.setStepName(name); 201 this.splitter = splitter; 202 step.setStepExecutionSplitter(splitter); 203 204 } 205 206 if (aggregator != null) { 207 step.setStepExecutionAggregator(aggregator); 208 } 209 210 try { 211 step.afterPropertiesSet(); 212 } 213 catch (Exception e) { 214 throw new StepBuilderException(e); 215 } 216 217 return step; 218 219 } 220 221 protected TaskExecutor getTaskExecutor() { 222 return taskExecutor; 223 } 224 225 protected Partitioner getPartitioner() { 226 return partitioner; 227 } 228 229 protected Step getStep() { 230 return step; 231 } 232 233 protected PartitionHandler getPartitionHandler() { 234 return partitionHandler; 235 } 236 237 protected int getGridSize() { 238 return gridSize; 239 } 240 241 protected StepExecutionSplitter getSplitter() { 242 return splitter; 243 } 244 245 protected StepExecutionAggregator getAggregator() { 246 return aggregator; 247 } 248 249 protected String getStepName() { 250 return stepName; 251 } 252}