001/* 002 * Copyright 2006-2014 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.core.job; 018 019import org.apache.commons.logging.Log; 020import org.apache.commons.logging.LogFactory; 021import org.springframework.batch.core.BatchStatus; 022import org.springframework.batch.core.JobExecution; 023import org.springframework.batch.core.JobInstance; 024import org.springframework.batch.core.JobInterruptedException; 025import org.springframework.batch.core.StartLimitExceededException; 026import org.springframework.batch.core.Step; 027import org.springframework.batch.core.StepExecution; 028import org.springframework.batch.core.repository.JobRepository; 029import org.springframework.batch.core.repository.JobRestartException; 030import org.springframework.batch.item.ExecutionContext; 031import org.springframework.beans.factory.InitializingBean; 032import org.springframework.util.Assert; 033 034/** 035 * Implementation of {@link StepHandler} that manages repository and restart 036 * concerns. 037 * 038 * @author Dave Syer 039 * 040 */ 041public class SimpleStepHandler implements StepHandler, InitializingBean { 042 043 private static final Log logger = LogFactory.getLog(SimpleStepHandler.class); 044 045 private JobRepository jobRepository; 046 047 private ExecutionContext executionContext; 048 049 /** 050 * Convenient default constructor for configuration usage. 051 */ 052 public SimpleStepHandler() { 053 this(null); 054 } 055 056 /** 057 * @param jobRepository a {@link org.springframework.batch.core.repository.JobRepository} 058 */ 059 public SimpleStepHandler(JobRepository jobRepository) { 060 this(jobRepository, new ExecutionContext()); 061 } 062 063 /** 064 * @param jobRepository a {@link org.springframework.batch.core.repository.JobRepository} 065 * @param executionContext the {@link org.springframework.batch.item.ExecutionContext} for the current Step 066 */ 067 public SimpleStepHandler(JobRepository jobRepository, ExecutionContext executionContext) { 068 this.jobRepository = jobRepository; 069 this.executionContext = executionContext; 070 } 071 072 /** 073 * Check mandatory properties (jobRepository). 074 * 075 * @see InitializingBean#afterPropertiesSet() 076 */ 077 @Override 078 public void afterPropertiesSet() throws Exception { 079 Assert.state(jobRepository != null, "A JobRepository must be provided"); 080 } 081 082 /** 083 * @return the used jobRepository 084 */ 085 protected JobRepository getJobRepository() { 086 return this.jobRepository; 087 } 088 089 /** 090 * @param jobRepository the jobRepository to set 091 */ 092 public void setJobRepository(JobRepository jobRepository) { 093 this.jobRepository = jobRepository; 094 } 095 096 /** 097 * A context containing values to be added to the step execution before it 098 * is handled. 099 * 100 * @param executionContext the execution context to set 101 */ 102 public void setExecutionContext(ExecutionContext executionContext) { 103 this.executionContext = executionContext; 104 } 105 106 @Override 107 public StepExecution handleStep(Step step, JobExecution execution) throws JobInterruptedException, 108 JobRestartException, StartLimitExceededException { 109 if (execution.isStopping()) { 110 throw new JobInterruptedException("JobExecution interrupted."); 111 } 112 113 JobInstance jobInstance = execution.getJobInstance(); 114 115 StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, step.getName()); 116 if (stepExecutionPartOfExistingJobExecution(execution, lastStepExecution)) { 117 // If the last execution of this step was in the same job, it's 118 // probably intentional so we want to run it again... 119 logger.info(String.format("Duplicate step [%s] detected in execution of job=[%s]. " 120 + "If either step fails, both will be executed again on restart.", step.getName(), jobInstance 121 .getJobName())); 122 lastStepExecution = null; 123 } 124 StepExecution currentStepExecution = lastStepExecution; 125 126 if (shouldStart(lastStepExecution, execution, step)) { 127 128 currentStepExecution = execution.createStepExecution(step.getName()); 129 130 boolean isRestart = (lastStepExecution != null && !lastStepExecution.getStatus().equals( 131 BatchStatus.COMPLETED)); 132 133 if (isRestart) { 134 currentStepExecution.setExecutionContext(lastStepExecution.getExecutionContext()); 135 136 if(lastStepExecution.getExecutionContext().containsKey("batch.executed")) { 137 currentStepExecution.getExecutionContext().remove("batch.executed"); 138 } 139 } 140 else { 141 currentStepExecution.setExecutionContext(new ExecutionContext(executionContext)); 142 } 143 144 jobRepository.add(currentStepExecution); 145 146 logger.info("Executing step: [" + step.getName() + "]"); 147 try { 148 step.execute(currentStepExecution); 149 currentStepExecution.getExecutionContext().put("batch.executed", true); 150 } 151 catch (JobInterruptedException e) { 152 // Ensure that the job gets the message that it is stopping 153 // and can pass it on to other steps that are executing 154 // concurrently. 155 execution.setStatus(BatchStatus.STOPPING); 156 throw e; 157 } 158 159 jobRepository.updateExecutionContext(execution); 160 161 if (currentStepExecution.getStatus() == BatchStatus.STOPPING 162 || currentStepExecution.getStatus() == BatchStatus.STOPPED) { 163 // Ensure that the job gets the message that it is stopping 164 execution.setStatus(BatchStatus.STOPPING); 165 throw new JobInterruptedException("Job interrupted by step execution"); 166 } 167 168 } 169 170 return currentStepExecution; 171 } 172 173 /** 174 * Detect whether a step execution belongs to this job execution. 175 * @param jobExecution the current job execution 176 * @param stepExecution an existing step execution 177 * @return true if the {@link org.springframework.batch.core.StepExecution} is part of the {@link org.springframework.batch.core.JobExecution} 178 */ 179 private boolean stepExecutionPartOfExistingJobExecution(JobExecution jobExecution, StepExecution stepExecution) { 180 return stepExecution != null && stepExecution.getJobExecutionId() != null 181 && stepExecution.getJobExecutionId().equals(jobExecution.getId()); 182 } 183 184 /** 185 * Given a step and configuration, return true if the step should start, 186 * false if it should not, and throw an exception if the job should finish. 187 * @param lastStepExecution the last step execution 188 * @param jobExecution the {@link JobExecution} instance to be evaluated. 189 * @param step the {@link Step} instance to be evaluated. 190 * @return true if step should start, false if it should not. 191 * 192 * @throws StartLimitExceededException if the start limit has been exceeded 193 * for this step 194 * @throws JobRestartException if the job is in an inconsistent state from 195 * an earlier failure 196 */ 197 protected boolean shouldStart(StepExecution lastStepExecution, JobExecution jobExecution, Step step) 198 throws JobRestartException, StartLimitExceededException { 199 200 BatchStatus stepStatus; 201 if (lastStepExecution == null) { 202 stepStatus = BatchStatus.STARTING; 203 } 204 else { 205 stepStatus = lastStepExecution.getStatus(); 206 } 207 208 if (stepStatus == BatchStatus.UNKNOWN) { 209 throw new JobRestartException("Cannot restart step from UNKNOWN status. " 210 + "The last execution ended with a failure that could not be rolled back, " 211 + "so it may be dangerous to proceed. Manual intervention is probably necessary."); 212 } 213 214 if ((stepStatus == BatchStatus.COMPLETED && !step.isAllowStartIfComplete()) 215 || stepStatus == BatchStatus.ABANDONED) { 216 // step is complete, false should be returned, indicating that the 217 // step should not be started 218 logger.info("Step already complete or not restartable, so no action to execute: " + lastStepExecution); 219 return false; 220 } 221 222 if (jobRepository.getStepExecutionCount(jobExecution.getJobInstance(), step.getName()) < step.getStartLimit()) { 223 // step start count is less than start max, return true 224 return true; 225 } 226 else { 227 // start max has been exceeded, throw an exception. 228 throw new StartLimitExceededException("Maximum start limit exceeded for step: " + step.getName() 229 + "StartMax: " + step.getStartLimit()); 230 } 231 } 232 233}