001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.step;
017
018import java.util.Date;
019
020import org.apache.commons.logging.Log;
021import org.apache.commons.logging.LogFactory;
022import org.springframework.batch.core.BatchStatus;
023import org.springframework.batch.core.ExitStatus;
024import org.springframework.batch.core.JobInterruptedException;
025import org.springframework.batch.core.Step;
026import org.springframework.batch.core.StepExecution;
027import org.springframework.batch.core.StepExecutionListener;
028import org.springframework.batch.core.UnexpectedJobExecutionException;
029import org.springframework.batch.core.configuration.annotation.StepScope;
030import org.springframework.batch.core.launch.NoSuchJobException;
031import org.springframework.batch.core.launch.support.ExitCodeMapper;
032import org.springframework.batch.core.listener.CompositeStepExecutionListener;
033import org.springframework.batch.core.repository.JobRepository;
034import org.springframework.batch.core.scope.context.StepSynchronizationManager;
035import org.springframework.batch.item.ExecutionContext;
036import org.springframework.batch.repeat.RepeatException;
037import org.springframework.beans.factory.BeanNameAware;
038import org.springframework.beans.factory.InitializingBean;
039import org.springframework.util.Assert;
040import org.springframework.util.ClassUtils;
041
042/**
043 * A {@link Step} implementation that provides common behavior to subclasses, including registering and calling
044 * listeners.
045 *
046 * @author Dave Syer
047 * @author Ben Hale
048 * @author Robert Kasanicky
049 * @author Michael Minella
050 * @author Chris Schaefer
051 * @author Mahmoud Ben Hassine
052 */
053public abstract class AbstractStep implements Step, InitializingBean, BeanNameAware {
054
055        private static final Log logger = LogFactory.getLog(AbstractStep.class);
056
057        private String name;
058
059        private int startLimit = Integer.MAX_VALUE;
060
061        private boolean allowStartIfComplete = false;
062
063        private CompositeStepExecutionListener stepExecutionListener = new CompositeStepExecutionListener();
064
065        private JobRepository jobRepository;
066
067        /**
068         * Default constructor.
069         */
070        public AbstractStep() {
071                super();
072        }
073
074        @Override
075        public void afterPropertiesSet() throws Exception {
076                Assert.state(name != null, "A Step must have a name");
077                Assert.state(jobRepository != null, "JobRepository is mandatory");
078        }
079
080        @Override
081        public String getName() {
082                return this.name;
083        }
084
085        /**
086         * Set the name property. Always overrides the default value if this object is a Spring bean.
087         * @param name the name of the {@link Step}.
088         * @see #setBeanName(java.lang.String)
089         */
090        public void setName(String name) {
091                this.name = name;
092        }
093
094        /**
095         * Set the name property if it is not already set. Because of the order of the callbacks in a Spring container the
096         * name property will be set first if it is present. Care is needed with bean definition inheritance - if a parent
097         * bean has a name, then its children need an explicit name as well, otherwise they will not be unique.
098         *
099         * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String)
100         */
101        @Override
102        public void setBeanName(String name) {
103                if (this.name == null) {
104                        this.name = name;
105                }
106        }
107
108        @Override
109        public int getStartLimit() {
110                return this.startLimit;
111        }
112
113        /**
114         * Public setter for the startLimit.
115         *
116         * @param startLimit the startLimit to set
117         */
118        public void setStartLimit(int startLimit) {
119                this.startLimit = startLimit == 0 ? Integer.MAX_VALUE : startLimit;
120        }
121
122        @Override
123        public boolean isAllowStartIfComplete() {
124                return this.allowStartIfComplete;
125        }
126
127        /**
128         * Public setter for flag that determines whether the step should start again if it is already complete. Defaults to
129         * false.
130         *
131         * @param allowStartIfComplete the value of the flag to set
132         */
133        public void setAllowStartIfComplete(boolean allowStartIfComplete) {
134                this.allowStartIfComplete = allowStartIfComplete;
135        }
136
137        /**
138         * Convenient constructor for setting only the name property.
139         *
140         * @param name Name of the step
141         */
142        public AbstractStep(String name) {
143                this.name = name;
144        }
145
146        /**
147         * Extension point for subclasses to execute business logic. Subclasses should set the {@link ExitStatus} on the
148         * {@link StepExecution} before returning.
149         *
150         * @param stepExecution the current step context
151         * @throws Exception checked exception thrown by implementation
152         */
153        protected abstract void doExecute(StepExecution stepExecution) throws Exception;
154
155        /**
156         * Extension point for subclasses to provide callbacks to their collaborators at the beginning of a step, to open or
157         * acquire resources. Does nothing by default.
158         *
159         * @param ctx the {@link ExecutionContext} to use
160         * @throws Exception checked exception thrown by implementation
161         */
162        protected void open(ExecutionContext ctx) throws Exception {
163        }
164
165        /**
166         * Extension point for subclasses to provide callbacks to their collaborators at the end of a step (right at the end
167         * of the finally block), to close or release resources. Does nothing by default.
168         *
169         * @param ctx the {@link ExecutionContext} to use
170         * @throws Exception checked exception thrown by implementation
171         */
172        protected void close(ExecutionContext ctx) throws Exception {
173        }
174
175        /**
176         * Template method for step execution logic - calls abstract methods for resource initialization (
177         * {@link #open(ExecutionContext)}), execution logic ({@link #doExecute(StepExecution)}) and resource closing (
178         * {@link #close(ExecutionContext)}).
179         */
180        @Override
181        public final void execute(StepExecution stepExecution) throws JobInterruptedException,
182        UnexpectedJobExecutionException {
183
184                Assert.notNull(stepExecution, "stepExecution must not be null");
185
186                if (logger.isDebugEnabled()) {
187                        logger.debug("Executing: id=" + stepExecution.getId());
188                }
189                stepExecution.setStartTime(new Date());
190                stepExecution.setStatus(BatchStatus.STARTED);
191                getJobRepository().update(stepExecution);
192
193                // Start with a default value that will be trumped by anything
194                ExitStatus exitStatus = ExitStatus.EXECUTING;
195
196                doExecutionRegistration(stepExecution);
197
198                try {
199                        getCompositeListener().beforeStep(stepExecution);
200                        open(stepExecution.getExecutionContext());
201
202                        try {
203                                doExecute(stepExecution);
204                        }
205                        catch (RepeatException e) {
206                                throw e.getCause();
207                        }
208                        exitStatus = ExitStatus.COMPLETED.and(stepExecution.getExitStatus());
209
210                        // Check if someone is trying to stop us
211                        if (stepExecution.isTerminateOnly()) {
212                                throw new JobInterruptedException("JobExecution interrupted.");
213                        }
214
215                        // Need to upgrade here not set, in case the execution was stopped
216                        stepExecution.upgradeStatus(BatchStatus.COMPLETED);
217                        if (logger.isDebugEnabled()) {
218                                logger.debug("Step execution success: id=" + stepExecution.getId());
219                        }
220                }
221                catch (Throwable e) {
222                        stepExecution.upgradeStatus(determineBatchStatus(e));
223                        exitStatus = exitStatus.and(getDefaultExitStatusForFailure(e));
224                        stepExecution.addFailureException(e);
225                        if (stepExecution.getStatus() == BatchStatus.STOPPED) {
226                                logger.info(String.format("Encountered interruption executing step %s in job %s : %s", name, stepExecution.getJobExecution().getJobInstance().getJobName(), e.getMessage()));
227                                if (logger.isDebugEnabled()) {
228                                        logger.debug("Full exception", e);
229                                }
230                        }
231                        else {
232                                logger.error(String.format("Encountered an error executing step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
233                        }
234                }
235                finally {
236
237                        try {
238                                // Update the step execution to the latest known value so the
239                                // listeners can act on it
240                                exitStatus = exitStatus.and(stepExecution.getExitStatus());
241                                stepExecution.setExitStatus(exitStatus);
242                                exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution));
243                        }
244                        catch (Exception e) {
245                                logger.error(String.format("Exception in afterStep callback in step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
246                        }
247
248                        try {
249                                getJobRepository().updateExecutionContext(stepExecution);
250                        }
251                        catch (Exception e) {
252                                stepExecution.setStatus(BatchStatus.UNKNOWN);
253                                exitStatus = exitStatus.and(ExitStatus.UNKNOWN);
254                                stepExecution.addFailureException(e);
255                                logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. "
256                                                + "This job is now in an unknown state and should not be restarted.", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
257                        }
258
259                        stepExecution.setEndTime(new Date());
260                        stepExecution.setExitStatus(exitStatus);
261
262                        try {
263                                getJobRepository().update(stepExecution);
264                        }
265                        catch (Exception e) {
266                                stepExecution.setStatus(BatchStatus.UNKNOWN);
267                                stepExecution.setExitStatus(exitStatus.and(ExitStatus.UNKNOWN));
268                                stepExecution.addFailureException(e);
269                                logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. "
270                                                + "This job is now in an unknown state and should not be restarted.", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
271                        }
272
273                        try {
274                                close(stepExecution.getExecutionContext());
275                        }
276                        catch (Exception e) {
277                                logger.error(String.format("Exception while closing step execution resources in step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
278                                stepExecution.addFailureException(e);
279                        }
280
281                        doExecutionRelease();
282
283                        if (logger.isDebugEnabled()) {
284                                logger.debug("Step execution complete: " + stepExecution.getSummary());
285                        }
286                }
287        }
288
289        /**
290         * Releases the most recent {@link StepExecution}
291         */
292        protected void doExecutionRelease() {
293                StepSynchronizationManager.release();
294        }
295
296        /**
297         * Registers the {@link StepExecution} for property resolution via {@link StepScope}
298         *
299         * @param stepExecution StepExecution to use when hydrating the StepScoped beans
300         */
301        protected void doExecutionRegistration(StepExecution stepExecution) {
302                StepSynchronizationManager.register(stepExecution);
303        }
304
305        /**
306         * Determine the step status based on the exception.
307         */
308        private static BatchStatus determineBatchStatus(Throwable e) {
309                if (e instanceof JobInterruptedException || e.getCause() instanceof JobInterruptedException) {
310                        return BatchStatus.STOPPED;
311                }
312                else {
313                        return BatchStatus.FAILED;
314                }
315        }
316
317        /**
318         * Register a step listener for callbacks at the appropriate stages in a step execution.
319         *
320         * @param listener a {@link StepExecutionListener}
321         */
322        public void registerStepExecutionListener(StepExecutionListener listener) {
323                this.stepExecutionListener.register(listener);
324        }
325
326        /**
327         * Register each of the objects as listeners.
328         *
329         * @param listeners an array of listener objects of known types.
330         */
331        public void setStepExecutionListeners(StepExecutionListener[] listeners) {
332                for (int i = 0; i < listeners.length; i++) {
333                        registerStepExecutionListener(listeners[i]);
334                }
335        }
336
337        /**
338         * @return composite listener that delegates to all registered listeners.
339         */
340        protected StepExecutionListener getCompositeListener() {
341                return stepExecutionListener;
342        }
343
344        /**
345         * Public setter for {@link JobRepository}.
346         *
347         * @param jobRepository is a mandatory dependence (no default).
348         */
349        public void setJobRepository(JobRepository jobRepository) {
350                this.jobRepository = jobRepository;
351        }
352
353        protected JobRepository getJobRepository() {
354                return jobRepository;
355        }
356
357        @Override
358        public String toString() {
359                return ClassUtils.getShortName(getClass()) + ": [name=" + name + "]";
360        }
361
362        /**
363         * Default mapping from throwable to {@link ExitStatus}. Clients can modify the exit code using a
364         * {@link StepExecutionListener}.
365         *
366         * @param ex the cause of the failure
367         * @return an {@link ExitStatus}
368         */
369        private ExitStatus getDefaultExitStatusForFailure(Throwable ex) {
370                ExitStatus exitStatus;
371                if (ex instanceof JobInterruptedException || ex.getCause() instanceof JobInterruptedException) {
372                        exitStatus = ExitStatus.STOPPED.addExitDescription(JobInterruptedException.class.getName());
373                }
374                else if (ex instanceof NoSuchJobException || ex.getCause() instanceof NoSuchJobException) {
375                        exitStatus = new ExitStatus(ExitCodeMapper.NO_SUCH_JOB, ex.getClass().getName());
376                }
377                else {
378                        exitStatus = ExitStatus.FAILED.addExitDescription(ex);
379                }
380
381                return exitStatus;
382        }
383
384}