001/* 002 * Copyright 2014 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.jsr.job; 017 018import java.util.List; 019 020import org.apache.commons.logging.Log; 021import org.apache.commons.logging.LogFactory; 022import org.springframework.batch.core.BatchStatus; 023import org.springframework.batch.core.JobExecution; 024import org.springframework.batch.core.StartLimitExceededException; 025import org.springframework.batch.core.Step; 026import org.springframework.batch.core.StepExecution; 027import org.springframework.batch.core.explore.JobExplorer; 028import org.springframework.batch.core.job.SimpleStepHandler; 029import org.springframework.batch.core.repository.JobRepository; 030import org.springframework.batch.core.repository.JobRestartException; 031import org.springframework.batch.item.ExecutionContext; 032import org.springframework.util.Assert; 033import org.springframework.util.CollectionUtils; 034import org.springframework.util.StringUtils; 035 036/** 037 * Extends {@link SimpleStepHandler} to apply JSR-352 specific logic for whether to 038 * start a step. 039 * 040 * @author Michael Minella 041 * @since 3.0 042 */ 043public class JsrStepHandler extends SimpleStepHandler { 044 045 private static final Log logger = LogFactory.getLog(JsrStepHandler.class); 046 047 private JobExplorer jobExplorer; 048 049 /** 050 * @param jobRepository instance of {@link JobRepository}. 051 * @param jobExplorer instance of {@link JobExplorer}. 052 */ 053 public JsrStepHandler(JobRepository jobRepository, JobExplorer jobExplorer) { 054 super(jobRepository, new ExecutionContext()); 055 this.jobExplorer = jobExplorer; 056 } 057 058 @Override 059 public void afterPropertiesSet() throws Exception { 060 super.afterPropertiesSet(); 061 Assert.state(jobExplorer != null, "A JobExplorer must be provided"); 062 } 063 064 065 /** 066 * Given a step and configuration, return true if the step should start, 067 * false if it should not, and throw an exception if the job should finish. 068 * @param lastStepExecution the last step execution 069 * @param jobExecution instance of {@link JobExecution} 070 * @param step instance of {@link Step} 071 * 072 * @throws StartLimitExceededException if the start limit has been exceeded 073 * for this step 074 * @throws JobRestartException if the job is in an inconsistent state from 075 * an earlier failure 076 */ 077 @Override 078 protected boolean shouldStart(StepExecution lastStepExecution, JobExecution jobExecution, Step step) 079 throws JobRestartException, StartLimitExceededException { 080 BatchStatus stepStatus; 081 String restartStep = null; 082 if (lastStepExecution == null) { 083 jobExecution.getExecutionContext().put("batch.startedStep", step.getName()); 084 stepStatus = BatchStatus.STARTING; 085 } 086 else { 087 stepStatus = lastStepExecution.getStatus(); 088 089 JobExecution lastJobExecution = getLastJobExecution(jobExecution); 090 091 if(lastJobExecution.getExecutionContext().containsKey("batch.restartStep")) { 092 restartStep = lastJobExecution.getExecutionContext().getString("batch.restartStep"); 093 094 if(CollectionUtils.isEmpty(jobExecution.getStepExecutions()) && lastJobExecution.getStatus() == BatchStatus.STOPPED && StringUtils.hasText(restartStep)) { 095 if(!restartStep.equals(step.getName()) && !jobExecution.getExecutionContext().containsKey("batch.startedStep")) { 096 logger.info("Job was stopped and should restart at step " + restartStep + ". The current step is " + step.getName()); 097 return false; 098 } else { 099 // Indicates the starting point for execution evaluation per JSR-352 100 jobExecution.getExecutionContext().put("batch.startedStep", step.getName()); 101 } 102 } 103 } 104 } 105 106 if (stepStatus == BatchStatus.UNKNOWN) { 107 throw new JobRestartException("Cannot restart step from UNKNOWN status. " 108 + "The last execution ended with a failure that could not be rolled back, " 109 + "so it may be dangerous to proceed. Manual intervention is probably necessary."); 110 } 111 112 if ((stepStatus == BatchStatus.COMPLETED && step.isAllowStartIfComplete() == false) 113 || stepStatus == BatchStatus.ABANDONED) { 114 // step is complete, false should be returned, indicating that the 115 // step should not be started 116 logger.info("Step already complete or not restartable, so no action to execute: " + lastStepExecution); 117 return false; 118 } 119 120 if (getJobRepository().getStepExecutionCount(jobExecution.getJobInstance(), step.getName()) < step.getStartLimit()) { 121 // step start count is less than start max, return true 122 return true; 123 } 124 else { 125 // start max has been exceeded, throw an exception. 126 throw new StartLimitExceededException("Maximum start limit exceeded for step: " + step.getName() 127 + "StartMax: " + step.getStartLimit()); 128 } 129 } 130 131 /** 132 * Since all JSR-352 jobs are run asynchronously, {@link JobRepository#getLastJobExecution(String, org.springframework.batch.core.JobParameters)} 133 * could return the currently running {@link JobExecution}. To get around this, we use the {@link JobExplorer} 134 * to get a list of the executions and get the most recent one that is <em>not</em> the currently running 135 * {@link JobExecution}. 136 * 137 * @param jobExecution 138 * @return the last executed JobExecution. 139 */ 140 private JobExecution getLastJobExecution(JobExecution jobExecution) { 141 List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobExecution.getJobInstance()); 142 JobExecution lastJobExecution = null; 143 144 for (JobExecution curJobExecution : jobExecutions) { 145 if(lastJobExecution == null && curJobExecution.getId().longValue() != jobExecution.getId().longValue()) { 146 lastJobExecution = curJobExecution; 147 } else if(curJobExecution.getId().longValue() != jobExecution.getId().longValue() && (lastJobExecution == null || curJobExecution.getId().longValue() > lastJobExecution.getId().longValue())) { 148 lastJobExecution = curJobExecution; 149 } 150 } 151 return lastJobExecution; 152 } 153}