001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.core.partition.support; 018 019import java.util.Collection; 020import java.util.HashMap; 021import java.util.HashSet; 022import java.util.Map; 023import java.util.Map.Entry; 024import java.util.Set; 025 026import org.springframework.batch.core.BatchStatus; 027import org.springframework.batch.core.JobExecution; 028import org.springframework.batch.core.JobExecutionException; 029import org.springframework.batch.core.JobInstance; 030import org.springframework.batch.core.Step; 031import org.springframework.batch.core.StepExecution; 032import org.springframework.batch.core.partition.StepExecutionSplitter; 033import org.springframework.batch.core.repository.JobRepository; 034import org.springframework.batch.item.ExecutionContext; 035import org.springframework.beans.factory.InitializingBean; 036import org.springframework.util.Assert; 037 038/** 039 * Generic implementation of {@link StepExecutionSplitter} that delegates to a 040 * {@link Partitioner} to generate {@link ExecutionContext} instances. Takes 041 * care of restartability and identifying the step executions from previous runs 042 * of the same job. The generated {@link StepExecution} instances have names 043 * that identify them uniquely in the partition. The name is constructed from a 044 * base (name of the target step) plus a suffix taken from the 045 * {@link Partitioner} identifiers, separated by a colon, e.g. 046 * <code>{step1:partition0, step1:partition1, ...}</code>. 047 * 048 * @author Dave Syer 049 * @author Mahmoud Ben Hassine 050 * @since 2.0 051 */ 052public class SimpleStepExecutionSplitter implements StepExecutionSplitter, InitializingBean { 053 054 private static final String STEP_NAME_SEPARATOR = ":"; 055 056 private String stepName; 057 058 private Partitioner partitioner; 059 060 private boolean allowStartIfComplete = false; 061 062 private JobRepository jobRepository; 063 064 /** 065 * Default constructor for convenience in configuration. 066 */ 067 public SimpleStepExecutionSplitter() { 068 } 069 070 /** 071 * Construct a {@link SimpleStepExecutionSplitter} from its mandatory 072 * properties. 073 * 074 * @param jobRepository the {@link JobRepository} 075 * @param allowStartIfComplete flag specifying preferences on restart 076 * @param stepName the target step name 077 * @param partitioner a {@link Partitioner} to use for generating input 078 * parameters 079 */ 080 public SimpleStepExecutionSplitter(JobRepository jobRepository, boolean allowStartIfComplete, String stepName, Partitioner partitioner) { 081 this.jobRepository = jobRepository; 082 this.allowStartIfComplete = allowStartIfComplete; 083 this.partitioner = partitioner; 084 this.stepName = stepName; 085 } 086 087 /** 088 * Construct a {@link SimpleStepExecutionSplitter} from its mandatory 089 * properties. 090 * 091 * @param jobRepository the {@link JobRepository} 092 * @param step the target step (a local version of it), used to extract the 093 * name and allowStartIfComplete flags 094 * @param partitioner a {@link Partitioner} to use for generating input 095 * parameters 096 * 097 * @deprecated use {@link #SimpleStepExecutionSplitter(JobRepository, boolean, String, Partitioner)} instead 098 */ 099 @Deprecated 100 public SimpleStepExecutionSplitter(JobRepository jobRepository, Step step, Partitioner partitioner) { 101 this.jobRepository = jobRepository; 102 this.allowStartIfComplete = step.isAllowStartIfComplete(); 103 this.partitioner = partitioner; 104 this.stepName = step.getName(); 105 } 106 107 /** 108 * Check mandatory properties (step name, job repository and partitioner). 109 * 110 * @see InitializingBean#afterPropertiesSet() 111 */ 112 @Override 113 public void afterPropertiesSet() throws Exception { 114 Assert.state(jobRepository != null, "A JobRepository is required"); 115 Assert.state(stepName != null, "A step name is required"); 116 Assert.state(partitioner != null, "A Partitioner is required"); 117 } 118 119 /** 120 * Flag to indicate that the partition target step is allowed to start if an 121 * execution is complete. Defaults to the same value as the underlying step. 122 * Set this manually to override the underlying step properties. 123 * 124 * @see Step#isAllowStartIfComplete() 125 * 126 * @param allowStartIfComplete the value to set 127 */ 128 public void setAllowStartIfComplete(boolean allowStartIfComplete) { 129 this.allowStartIfComplete = allowStartIfComplete; 130 } 131 132 /** 133 * The job repository that will be used to manage the persistence of the 134 * delegate step executions. 135 * 136 * @param jobRepository the JobRepository to set 137 */ 138 public void setJobRepository(JobRepository jobRepository) { 139 this.jobRepository = jobRepository; 140 } 141 142 /** 143 * The {@link Partitioner} that will be used to generate step execution meta 144 * data for the target step. 145 * 146 * @param partitioner the partitioner to set 147 */ 148 public void setPartitioner(Partitioner partitioner) { 149 this.partitioner = partitioner; 150 } 151 152 /** 153 * The name of the target step that will be executed across the partitions. 154 * Mandatory with no default. 155 * 156 * @param stepName the step name to set 157 */ 158 public void setStepName(String stepName) { 159 this.stepName = stepName; 160 } 161 162 /** 163 * @see StepExecutionSplitter#getStepName() 164 */ 165 @Override 166 public String getStepName() { 167 return this.stepName; 168 } 169 170 /** 171 * @see StepExecutionSplitter#split(StepExecution, int) 172 */ 173 @Override 174 public Set<StepExecution> split(StepExecution stepExecution, int gridSize) throws JobExecutionException { 175 176 JobExecution jobExecution = stepExecution.getJobExecution(); 177 178 Map<String, ExecutionContext> contexts = getContexts(stepExecution, gridSize); 179 Set<StepExecution> set = new HashSet<StepExecution>(contexts.size()); 180 181 for (Entry<String, ExecutionContext> context : contexts.entrySet()) { 182 183 // Make the step execution name unique and repeatable 184 String stepName = this.stepName + STEP_NAME_SEPARATOR + context.getKey(); 185 186 StepExecution currentStepExecution = jobExecution.createStepExecution(stepName); 187 188 boolean startable = isStartable(currentStepExecution, context.getValue()); 189 190 if (startable) { 191 set.add(currentStepExecution); 192 } 193 } 194 195 jobRepository.addAll(set); 196 197 Set<StepExecution> executions = new HashSet<StepExecution>(set.size()); 198 executions.addAll(set); 199 200 return executions; 201 202 } 203 204 private Map<String, ExecutionContext> getContexts(StepExecution stepExecution, int gridSize) { 205 206 ExecutionContext context = stepExecution.getExecutionContext(); 207 String key = SimpleStepExecutionSplitter.class.getSimpleName() + ".GRID_SIZE"; 208 209 // If this is a restart we must retain the same grid size, ignoring the 210 // one passed in... 211 int splitSize = (int) context.getLong(key, gridSize); 212 context.putLong(key, splitSize); 213 214 Map<String, ExecutionContext> result; 215 if (context.isDirty()) { 216 // The context changed so we didn't already know the partitions 217 jobRepository.updateExecutionContext(stepExecution); 218 result = partitioner.partition(splitSize); 219 } 220 else { 221 if (partitioner instanceof PartitionNameProvider) { 222 result = new HashMap<String, ExecutionContext>(); 223 Collection<String> names = ((PartitionNameProvider) partitioner).getPartitionNames(splitSize); 224 for (String name : names) { 225 /* 226 * We need to return the same keys as the original (failed) 227 * execution, but the execution contexts will be discarded 228 * so they can be empty. 229 */ 230 result.put(name, new ExecutionContext()); 231 } 232 } 233 else { 234 // If no names are provided, grab the partition again. 235 result = partitioner.partition(splitSize); 236 } 237 } 238 239 return result; 240 } 241 242 /** 243 * Check if a step execution is startable. 244 * @param stepExecution the step execution to check 245 * @param context the execution context of the step 246 * @return true if the step execution is startable, false otherwise 247 * @throws JobExecutionException if unable to check if the step execution is startable 248 */ 249 protected boolean isStartable(StepExecution stepExecution, ExecutionContext context) throws JobExecutionException { 250 return getStartable(stepExecution, context); 251 } 252 253 /** 254 * Check if a step execution is startable. 255 * @param stepExecution the step execution to check 256 * @param context the execution context of the step 257 * @return true if the step execution is startable, false otherwise 258 * @throws JobExecutionException if unable to check if the step execution is startable 259 * @deprecated This method is deprecated in favor of 260 * {@link SimpleStepExecutionSplitter#isStartable} and will be removed in a 261 * future version. 262 */ 263 @Deprecated 264 protected boolean getStartable(StepExecution stepExecution, ExecutionContext context) throws JobExecutionException { 265 266 JobInstance jobInstance = stepExecution.getJobExecution().getJobInstance(); 267 String stepName = stepExecution.getStepName(); 268 StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, stepName); 269 270 boolean isRestart = (lastStepExecution != null && lastStepExecution.getStatus() != BatchStatus.COMPLETED); 271 272 if (isRestart) { 273 stepExecution.setExecutionContext(lastStepExecution.getExecutionContext()); 274 } 275 else { 276 stepExecution.setExecutionContext(context); 277 } 278 279 return shouldStart(allowStartIfComplete, stepExecution, lastStepExecution) || isRestart; 280 281 } 282 283 private boolean shouldStart(boolean allowStartIfComplete, StepExecution stepExecution, StepExecution lastStepExecution) 284 throws JobExecutionException { 285 286 if (lastStepExecution == null) { 287 return true; 288 } 289 290 BatchStatus stepStatus = lastStepExecution.getStatus(); 291 292 if (stepStatus == BatchStatus.UNKNOWN) { 293 throw new JobExecutionException("Cannot restart step from UNKNOWN status. " 294 + "The last execution ended with a failure that could not be rolled back, " 295 + "so it may be dangerous to proceed. " + "Manual intervention is probably necessary."); 296 } 297 298 if (stepStatus == BatchStatus.COMPLETED) { 299 if (!allowStartIfComplete) { 300 if (isSameJobExecution(stepExecution, lastStepExecution)) { 301 // it's always OK to start again in the same JobExecution 302 return true; 303 } 304 // step is complete, false should be returned, indicating that 305 // the step should not be started 306 return false; 307 } 308 else { 309 return true; 310 } 311 } 312 313 if (stepStatus == BatchStatus.STOPPED || stepStatus == BatchStatus.FAILED) { 314 return true; 315 } 316 317 if (stepStatus == BatchStatus.STARTED || stepStatus == BatchStatus.STARTING 318 || stepStatus == BatchStatus.STOPPING) { 319 throw new JobExecutionException( 320 "Cannot restart step from " 321 + stepStatus 322 + " status. " 323 + "The old execution may still be executing, so you may need to verify manually that this is the case."); 324 } 325 326 throw new JobExecutionException("Cannot restart step from " + stepStatus + " status. " 327 + "We believe the old execution was abandoned and therefore has been marked as un-restartable."); 328 329 } 330 331 private boolean isSameJobExecution(StepExecution stepExecution, StepExecution lastStepExecution) { 332 if (stepExecution.getJobExecutionId()==null) { 333 return lastStepExecution.getJobExecutionId()==null; 334 } 335 return stepExecution.getJobExecutionId().equals(lastStepExecution.getJobExecutionId()); 336 } 337 338}