001package org.springframework.batch.integration.partition;
002
003import org.springframework.batch.core.BatchStatus;
004import org.springframework.batch.core.JobInterruptedException;
005import org.springframework.batch.core.Step;
006import org.springframework.batch.core.StepExecution;
007import org.springframework.batch.core.explore.JobExplorer;
008import org.springframework.batch.core.step.NoSuchStepException;
009import org.springframework.batch.core.step.StepLocator;
010import org.springframework.integration.annotation.MessageEndpoint;
011import org.springframework.integration.annotation.ServiceActivator;
012
013/**
014 * A {@link MessageEndpoint} that can handle a {@link StepExecutionRequest} and
015 * return a {@link StepExecution} as the result. Typically these need to be
016 * aggregated into a response to a partition handler.
017 * 
018 * @author Dave Syer
019 * 
020 */
021@MessageEndpoint
022public class StepExecutionRequestHandler {
023
024        private JobExplorer jobExplorer;
025
026        private StepLocator stepLocator;
027
028        /**
029         * Used to locate a {@link Step} to execute for each request.
030         * @param stepLocator a {@link StepLocator}
031         */
032        public void setStepLocator(StepLocator stepLocator) {
033                this.stepLocator = stepLocator;
034        }
035
036        /**
037         * An explorer that should be used to check for {@link StepExecution}
038         * completion.
039         * 
040         * @param jobExplorer a {@link JobExplorer} that is linked to the shared
041         * repository used by all remote workers.
042         */
043        public void setJobExplorer(JobExplorer jobExplorer) {
044                this.jobExplorer = jobExplorer;
045        }
046
047        @ServiceActivator
048        public StepExecution handle(StepExecutionRequest request) {
049
050                Long jobExecutionId = request.getJobExecutionId();
051                Long stepExecutionId = request.getStepExecutionId();
052                StepExecution stepExecution = jobExplorer.getStepExecution(jobExecutionId, stepExecutionId);
053                if (stepExecution == null) {
054                        throw new NoSuchStepException("No StepExecution could be located for this request: " + request);
055                }
056
057                String stepName = request.getStepName();
058                Step step = stepLocator.getStep(stepName);
059                if (step == null) {
060                        throw new NoSuchStepException(String.format("No Step with name [%s] could be located.", stepName));
061                }
062
063                try {
064                        step.execute(stepExecution);
065                }
066                catch (JobInterruptedException e) {
067                        stepExecution.setStatus(BatchStatus.STOPPED);
068                        // The receiver should update the stepExecution in repository
069                }
070                catch (Throwable e) {
071                        stepExecution.addFailureException(e);
072                        stepExecution.setStatus(BatchStatus.FAILED);
073                        // The receiver should update the stepExecution in repository
074                }
075
076                return stepExecution;
077
078        }
079
080}