001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.core.job.flow; 018 019import org.springframework.batch.core.BatchStatus; 020import org.springframework.batch.core.ExitStatus; 021import org.springframework.batch.core.JobExecution; 022import org.springframework.batch.core.JobInterruptedException; 023import org.springframework.batch.core.StartLimitExceededException; 024import org.springframework.batch.core.Step; 025import org.springframework.batch.core.StepExecution; 026import org.springframework.batch.core.job.StepHandler; 027import org.springframework.batch.core.repository.JobRepository; 028import org.springframework.batch.core.repository.JobRestartException; 029import org.springframework.lang.Nullable; 030 031/** 032 * Implementation of {@link FlowExecutor} for use in components that need to 033 * execute a flow related to a {@link JobExecution}. 034 * 035 * @author Dave Syer 036 * @author Michael Minella 037 * @author Mahmoud Ben Hassine 038 * 039 */ 040public class JobFlowExecutor implements FlowExecutor { 041 042 private final ThreadLocal<StepExecution> stepExecutionHolder = new ThreadLocal<StepExecution>(); 043 044 private final JobExecution execution; 045 046 protected ExitStatus exitStatus = ExitStatus.EXECUTING; 047 048 private final StepHandler stepHandler; 049 050 private final JobRepository jobRepository; 051 052 /** 053 * @param jobRepository instance of {@link JobRepository}. 054 * @param stepHandler instance of {@link StepHandler}. 055 * @param execution instance of {@link JobExecution}. 056 */ 057 public JobFlowExecutor(JobRepository jobRepository, StepHandler stepHandler, JobExecution execution) { 058 this.jobRepository = jobRepository; 059 this.stepHandler = stepHandler; 060 this.execution = execution; 061 stepExecutionHolder.set(null); 062 } 063 064 @Override 065 public String executeStep(Step step) throws JobInterruptedException, JobRestartException, 066 StartLimitExceededException { 067 boolean isRerun = isStepRestart(step); 068 StepExecution stepExecution = stepHandler.handleStep(step, execution); 069 stepExecutionHolder.set(stepExecution); 070 071 if (stepExecution == null) { 072 return ExitStatus.COMPLETED.getExitCode(); 073 } 074 if (stepExecution.isTerminateOnly()) { 075 throw new JobInterruptedException("Step requested termination: "+stepExecution, stepExecution.getStatus()); 076 } 077 078 if(isRerun) { 079 stepExecution.getExecutionContext().put("batch.restart", true); 080 } 081 082 return stepExecution.getExitStatus().getExitCode(); 083 } 084 085 private boolean isStepRestart(Step step) { 086 int count = jobRepository.getStepExecutionCount(execution.getJobInstance(), step.getName()); 087 088 return count > 0; 089 } 090 091 @Override 092 public void abandonStepExecution() { 093 StepExecution lastStepExecution = stepExecutionHolder.get(); 094 if (lastStepExecution != null && lastStepExecution.getStatus().isGreaterThan(BatchStatus.STOPPING)) { 095 lastStepExecution.upgradeStatus(BatchStatus.ABANDONED); 096 jobRepository.update(lastStepExecution); 097 } 098 } 099 100 @Override 101 public void updateJobExecutionStatus(FlowExecutionStatus status) { 102 execution.setStatus(findBatchStatus(status)); 103 exitStatus = exitStatus.and(new ExitStatus(status.getName())); 104 execution.setExitStatus(exitStatus); 105 } 106 107 @Override 108 public JobExecution getJobExecution() { 109 return execution; 110 } 111 112 @Override 113 @Nullable 114 public StepExecution getStepExecution() { 115 return stepExecutionHolder.get(); 116 } 117 118 @Override 119 public void close(FlowExecution result) { 120 stepExecutionHolder.set(null); 121 } 122 123 @Override 124 public boolean isRestart() { 125 if (getStepExecution() != null && getStepExecution().getStatus() == BatchStatus.ABANDONED) { 126 /* 127 * This is assumed to be the last step execution and it was marked 128 * abandoned, so we are in a restart of a stopped step. 129 */ 130 // TODO: mark the step execution in some more definitive way? 131 return true; 132 } 133 return execution.getStepExecutions().isEmpty(); 134 } 135 136 @Override 137 public void addExitStatus(String code) { 138 exitStatus = exitStatus.and(new ExitStatus(code)); 139 } 140 141 /** 142 * @param status {@link FlowExecutionStatus} to convert. 143 * @return A {@link BatchStatus} appropriate for the {@link FlowExecutionStatus} provided 144 */ 145 protected BatchStatus findBatchStatus(FlowExecutionStatus status) { 146 for (BatchStatus batchStatus : BatchStatus.values()) { 147 if (status.getName().startsWith(batchStatus.toString())) { 148 return batchStatus; 149 } 150 } 151 return BatchStatus.UNKNOWN; 152 } 153 154}