001/*
002 * Copyright 2006-2014 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.job;
018
019import org.apache.commons.logging.Log;
020import org.apache.commons.logging.LogFactory;
021import org.springframework.batch.core.BatchStatus;
022import org.springframework.batch.core.JobExecution;
023import org.springframework.batch.core.JobInstance;
024import org.springframework.batch.core.JobInterruptedException;
025import org.springframework.batch.core.StartLimitExceededException;
026import org.springframework.batch.core.Step;
027import org.springframework.batch.core.StepExecution;
028import org.springframework.batch.core.repository.JobRepository;
029import org.springframework.batch.core.repository.JobRestartException;
030import org.springframework.batch.item.ExecutionContext;
031import org.springframework.beans.factory.InitializingBean;
032import org.springframework.util.Assert;
033
034/**
035 * Implementation of {@link StepHandler} that manages repository and restart
036 * concerns.
037 *
038 * @author Dave Syer
039 *
040 */
041public class SimpleStepHandler implements StepHandler, InitializingBean {
042
043        private static final Log logger = LogFactory.getLog(SimpleStepHandler.class);
044
045        private JobRepository jobRepository;
046
047        private ExecutionContext executionContext;
048
049        /**
050         * Convenient default constructor for configuration usage.
051         */
052        public SimpleStepHandler() {
053                this(null);
054        }
055
056        /**
057         * @param jobRepository a {@link org.springframework.batch.core.repository.JobRepository}
058         */
059        public SimpleStepHandler(JobRepository jobRepository) {
060                this(jobRepository, new ExecutionContext());
061        }
062
063        /**
064         * @param jobRepository a {@link org.springframework.batch.core.repository.JobRepository}
065         * @param executionContext the {@link org.springframework.batch.item.ExecutionContext} for the current Step
066         */
067        public SimpleStepHandler(JobRepository jobRepository, ExecutionContext executionContext) {
068                this.jobRepository = jobRepository;
069                this.executionContext = executionContext;
070        }
071
072        /**
073         * Check mandatory properties (jobRepository).
074         *
075         * @see InitializingBean#afterPropertiesSet()
076         */
077        @Override
078        public void afterPropertiesSet() throws Exception {
079                Assert.state(jobRepository != null, "A JobRepository must be provided");
080        }
081
082        /**
083         * @return the used jobRepository
084         */
085        protected JobRepository getJobRepository() {
086                return this.jobRepository;
087        }
088
089        /**
090         * @param jobRepository the jobRepository to set
091         */
092        public void setJobRepository(JobRepository jobRepository) {
093                this.jobRepository = jobRepository;
094        }
095
096        /**
097         * A context containing values to be added to the step execution before it
098         * is handled.
099         *
100         * @param executionContext the execution context to set
101         */
102        public void setExecutionContext(ExecutionContext executionContext) {
103                this.executionContext = executionContext;
104        }
105
106        @Override
107        public StepExecution handleStep(Step step, JobExecution execution) throws JobInterruptedException,
108        JobRestartException, StartLimitExceededException {
109                if (execution.isStopping()) {
110                        throw new JobInterruptedException("JobExecution interrupted.");
111                }
112
113                JobInstance jobInstance = execution.getJobInstance();
114
115                StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, step.getName());
116                if (stepExecutionPartOfExistingJobExecution(execution, lastStepExecution)) {
117                        // If the last execution of this step was in the same job, it's
118                        // probably intentional so we want to run it again...
119                        logger.info(String.format("Duplicate step [%s] detected in execution of job=[%s]. "
120                                        + "If either step fails, both will be executed again on restart.", step.getName(), jobInstance
121                                        .getJobName()));
122                        lastStepExecution = null;
123                }
124                StepExecution currentStepExecution = lastStepExecution;
125
126                if (shouldStart(lastStepExecution, execution, step)) {
127
128                        currentStepExecution = execution.createStepExecution(step.getName());
129
130                        boolean isRestart = (lastStepExecution != null && !lastStepExecution.getStatus().equals(
131                                        BatchStatus.COMPLETED));
132
133                        if (isRestart) {
134                                currentStepExecution.setExecutionContext(lastStepExecution.getExecutionContext());
135
136                                if(lastStepExecution.getExecutionContext().containsKey("batch.executed")) {
137                                        currentStepExecution.getExecutionContext().remove("batch.executed");
138                                }
139                        }
140                        else {
141                                currentStepExecution.setExecutionContext(new ExecutionContext(executionContext));
142                        }
143
144                        jobRepository.add(currentStepExecution);
145
146                        logger.info("Executing step: [" + step.getName() + "]");
147                        try {
148                                step.execute(currentStepExecution);
149                                currentStepExecution.getExecutionContext().put("batch.executed", true);
150                        }
151                        catch (JobInterruptedException e) {
152                                // Ensure that the job gets the message that it is stopping
153                                // and can pass it on to other steps that are executing
154                                // concurrently.
155                                execution.setStatus(BatchStatus.STOPPING);
156                                throw e;
157                        }
158
159                        jobRepository.updateExecutionContext(execution);
160
161                        if (currentStepExecution.getStatus() == BatchStatus.STOPPING
162                                        || currentStepExecution.getStatus() == BatchStatus.STOPPED) {
163                                // Ensure that the job gets the message that it is stopping
164                                execution.setStatus(BatchStatus.STOPPING);
165                                throw new JobInterruptedException("Job interrupted by step execution");
166                        }
167
168                }
169
170                return currentStepExecution;
171        }
172
173        /**
174         * Detect whether a step execution belongs to this job execution.
175         * @param jobExecution the current job execution
176         * @param stepExecution an existing step execution
177         * @return true if the {@link org.springframework.batch.core.StepExecution} is part of the {@link org.springframework.batch.core.JobExecution}
178         */
179        private boolean stepExecutionPartOfExistingJobExecution(JobExecution jobExecution, StepExecution stepExecution) {
180                return stepExecution != null && stepExecution.getJobExecutionId() != null
181                                && stepExecution.getJobExecutionId().equals(jobExecution.getId());
182        }
183
184        /**
185         * Given a step and configuration, return true if the step should start,
186         * false if it should not, and throw an exception if the job should finish.
187         * @param lastStepExecution the last step execution
188         * @param jobExecution the {@link JobExecution} instance to be evaluated.
189         * @param step the {@link Step} instance to be evaluated.
190         * @return true if step should start, false if it should not.
191         *
192         * @throws StartLimitExceededException if the start limit has been exceeded
193         * for this step
194         * @throws JobRestartException if the job is in an inconsistent state from
195         * an earlier failure
196         */
197        protected boolean shouldStart(StepExecution lastStepExecution, JobExecution jobExecution, Step step)
198                        throws JobRestartException, StartLimitExceededException {
199
200                BatchStatus stepStatus;
201                if (lastStepExecution == null) {
202                        stepStatus = BatchStatus.STARTING;
203                }
204                else {
205                        stepStatus = lastStepExecution.getStatus();
206                }
207
208                if (stepStatus == BatchStatus.UNKNOWN) {
209                        throw new JobRestartException("Cannot restart step from UNKNOWN status. "
210                                        + "The last execution ended with a failure that could not be rolled back, "
211                                        + "so it may be dangerous to proceed. Manual intervention is probably necessary.");
212                }
213
214                if ((stepStatus == BatchStatus.COMPLETED && !step.isAllowStartIfComplete())
215                                || stepStatus == BatchStatus.ABANDONED) {
216                        // step is complete, false should be returned, indicating that the
217                        // step should not be started
218                        logger.info("Step already complete or not restartable, so no action to execute: " + lastStepExecution);
219                        return false;
220                }
221
222                if (jobRepository.getStepExecutionCount(jobExecution.getJobInstance(), step.getName()) < step.getStartLimit()) {
223                        // step start count is less than start max, return true
224                        return true;
225                }
226                else {
227                        // start max has been exceeded, throw an exception.
228                        throw new StartLimitExceededException("Maximum start limit exceeded for step: " + step.getName()
229                                        + "StartMax: " + step.getStartLimit());
230                }
231        }
232
233}