001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.job.flow;
018
019import org.springframework.batch.core.BatchStatus;
020import org.springframework.batch.core.ExitStatus;
021import org.springframework.batch.core.JobExecution;
022import org.springframework.batch.core.JobInterruptedException;
023import org.springframework.batch.core.StartLimitExceededException;
024import org.springframework.batch.core.Step;
025import org.springframework.batch.core.StepExecution;
026import org.springframework.batch.core.job.StepHandler;
027import org.springframework.batch.core.repository.JobRepository;
028import org.springframework.batch.core.repository.JobRestartException;
029import org.springframework.lang.Nullable;
030
031/**
032 * Implementation of {@link FlowExecutor} for use in components that need to
033 * execute a flow related to a {@link JobExecution}.
034 *
035 * @author Dave Syer
036 * @author Michael Minella
037 * @author Mahmoud Ben Hassine
038 *
039 */
040public class JobFlowExecutor implements FlowExecutor {
041
042        private final ThreadLocal<StepExecution> stepExecutionHolder = new ThreadLocal<StepExecution>();
043
044        private final JobExecution execution;
045
046        protected ExitStatus exitStatus = ExitStatus.EXECUTING;
047
048        private final StepHandler stepHandler;
049
050        private final JobRepository jobRepository;
051
052        /**
053         * @param jobRepository instance of {@link JobRepository}.
054         * @param stepHandler instance of {@link StepHandler}.
055         * @param execution instance of {@link JobExecution}.
056         */
057        public JobFlowExecutor(JobRepository jobRepository, StepHandler stepHandler, JobExecution execution) {
058                this.jobRepository = jobRepository;
059                this.stepHandler = stepHandler;
060                this.execution = execution;
061                stepExecutionHolder.set(null);
062        }
063
064        @Override
065        public String executeStep(Step step) throws JobInterruptedException, JobRestartException,
066        StartLimitExceededException {
067                boolean isRerun = isStepRestart(step);
068                StepExecution stepExecution = stepHandler.handleStep(step, execution);
069                stepExecutionHolder.set(stepExecution);
070
071                if (stepExecution == null) {
072                        return  ExitStatus.COMPLETED.getExitCode();
073                }
074                if (stepExecution.isTerminateOnly()) {
075                        throw new JobInterruptedException("Step requested termination: "+stepExecution, stepExecution.getStatus());
076                }
077
078                if(isRerun) {
079                        stepExecution.getExecutionContext().put("batch.restart", true);
080                }
081
082                return stepExecution.getExitStatus().getExitCode();
083        }
084
085        private boolean isStepRestart(Step step) {
086                int count = jobRepository.getStepExecutionCount(execution.getJobInstance(), step.getName());
087
088                return count > 0;
089        }
090
091        @Override
092        public void abandonStepExecution() {
093                StepExecution lastStepExecution = stepExecutionHolder.get();
094                if (lastStepExecution != null && lastStepExecution.getStatus().isGreaterThan(BatchStatus.STOPPING)) {
095                        lastStepExecution.upgradeStatus(BatchStatus.ABANDONED);
096                        jobRepository.update(lastStepExecution);
097                }
098        }
099
100        @Override
101        public void updateJobExecutionStatus(FlowExecutionStatus status) {
102                execution.setStatus(findBatchStatus(status));
103                exitStatus = exitStatus.and(new ExitStatus(status.getName()));
104                execution.setExitStatus(exitStatus);
105        }
106
107        @Override
108        public JobExecution getJobExecution() {
109                return execution;
110        }
111
112        @Override
113        @Nullable
114        public StepExecution getStepExecution() {
115                return stepExecutionHolder.get();
116        }
117
118        @Override
119        public void close(FlowExecution result) {
120                stepExecutionHolder.set(null);
121        }
122
123        @Override
124        public boolean isRestart() {
125                if (getStepExecution() != null && getStepExecution().getStatus() == BatchStatus.ABANDONED) {
126                        /*
127                         * This is assumed to be the last step execution and it was marked
128                         * abandoned, so we are in a restart of a stopped step.
129                         */
130                        // TODO: mark the step execution in some more definitive way?
131                        return true;
132                }
133                return execution.getStepExecutions().isEmpty();
134        }
135
136        @Override
137        public void addExitStatus(String code) {
138                exitStatus = exitStatus.and(new ExitStatus(code));
139        }
140
141        /**
142         * @param status {@link FlowExecutionStatus} to convert.
143         * @return A {@link BatchStatus} appropriate for the {@link FlowExecutionStatus} provided
144         */
145        protected BatchStatus findBatchStatus(FlowExecutionStatus status) {
146                for (BatchStatus batchStatus : BatchStatus.values()) {
147                        if (status.getName().startsWith(batchStatus.toString())) {
148                                return batchStatus;
149                        }
150                }
151                return BatchStatus.UNKNOWN;
152        }
153
154}