001package org.springframework.batch.integration.partition; 002 003import org.springframework.batch.core.BatchStatus; 004import org.springframework.batch.core.JobInterruptedException; 005import org.springframework.batch.core.Step; 006import org.springframework.batch.core.StepExecution; 007import org.springframework.batch.core.explore.JobExplorer; 008import org.springframework.batch.core.step.NoSuchStepException; 009import org.springframework.batch.core.step.StepLocator; 010import org.springframework.integration.annotation.MessageEndpoint; 011import org.springframework.integration.annotation.ServiceActivator; 012 013/** 014 * A {@link MessageEndpoint} that can handle a {@link StepExecutionRequest} and 015 * return a {@link StepExecution} as the result. Typically these need to be 016 * aggregated into a response to a partition handler. 017 * 018 * @author Dave Syer 019 * 020 */ 021@MessageEndpoint 022public class StepExecutionRequestHandler { 023 024 private JobExplorer jobExplorer; 025 026 private StepLocator stepLocator; 027 028 /** 029 * Used to locate a {@link Step} to execute for each request. 030 * @param stepLocator a {@link StepLocator} 031 */ 032 public void setStepLocator(StepLocator stepLocator) { 033 this.stepLocator = stepLocator; 034 } 035 036 /** 037 * An explorer that should be used to check for {@link StepExecution} 038 * completion. 039 * 040 * @param jobExplorer a {@link JobExplorer} that is linked to the shared 041 * repository used by all remote workers. 042 */ 043 public void setJobExplorer(JobExplorer jobExplorer) { 044 this.jobExplorer = jobExplorer; 045 } 046 047 @ServiceActivator 048 public StepExecution handle(StepExecutionRequest request) { 049 050 Long jobExecutionId = request.getJobExecutionId(); 051 Long stepExecutionId = request.getStepExecutionId(); 052 StepExecution stepExecution = jobExplorer.getStepExecution(jobExecutionId, stepExecutionId); 053 if (stepExecution == null) { 054 throw new NoSuchStepException("No StepExecution could be located for this request: " + request); 055 } 056 057 String stepName = request.getStepName(); 058 Step step = stepLocator.getStep(stepName); 059 if (step == null) { 060 throw new NoSuchStepException(String.format("No Step with name [%s] could be located.", stepName)); 061 } 062 063 try { 064 step.execute(stepExecution); 065 } 066 catch (JobInterruptedException e) { 067 stepExecution.setStatus(BatchStatus.STOPPED); 068 // The receiver should update the stepExecution in repository 069 } 070 catch (Throwable e) { 071 stepExecution.addFailureException(e); 072 stepExecution.setStatus(BatchStatus.FAILED); 073 // The receiver should update the stepExecution in repository 074 } 075 076 return stepExecution; 077 078 } 079 080}