001/* 002 * Copyright 2013-2014 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.jsr.step; 017 018import java.util.Collection; 019 020import javax.batch.api.partition.PartitionReducer; 021import javax.batch.api.partition.PartitionReducer.PartitionStatus; 022 023import org.springframework.batch.core.BatchStatus; 024import org.springframework.batch.core.JobExecutionException; 025import org.springframework.batch.core.Step; 026import org.springframework.batch.core.StepExecution; 027import org.springframework.batch.core.jsr.partition.JsrPartitionHandler; 028import org.springframework.batch.core.jsr.partition.support.JsrStepExecutionAggregator; 029import org.springframework.batch.core.partition.PartitionHandler; 030import org.springframework.batch.core.partition.StepExecutionSplitter; 031import org.springframework.batch.core.partition.support.StepExecutionAggregator; 032import org.springframework.batch.core.step.NoSuchStepException; 033import org.springframework.batch.core.step.StepLocator; 034import org.springframework.batch.item.ExecutionContext; 035 036/** 037 * An extension of the {@link PartitionStep} that provides additional semantics 038 * required by JSR-352. Specifically, this implementation adds the required 039 * lifecycle calls to the {@link PartitionReducer} if it is used. 040 * 041 * @author Michael Minella 042 * @since 3.0 043 */ 044public class PartitionStep extends org.springframework.batch.core.partition.support.PartitionStep implements StepLocator { 045 046 private PartitionReducer reducer; 047 private boolean hasReducer = false; 048 private StepExecutionAggregator stepExecutionAggregator = new JsrStepExecutionAggregator(); 049 050 public void setPartitionReducer(PartitionReducer reducer) { 051 this.reducer = reducer; 052 hasReducer = reducer != null; 053 } 054 055 /** 056 * Delegate execution to the {@link PartitionHandler} provided. The 057 * {@link StepExecution} passed in here becomes the parent or master 058 * execution for the partition, summarizing the status on exit of the 059 * logical grouping of work carried out by the {@link PartitionHandler}. The 060 * individual step executions and their input parameters (through 061 * {@link ExecutionContext}) for the partition elements are provided by the 062 * {@link StepExecutionSplitter}. 063 * 064 * @param stepExecution the master step execution for the partition 065 * 066 * @see Step#execute(StepExecution) 067 */ 068 @Override 069 protected void doExecute(StepExecution stepExecution) throws Exception { 070 071 if(hasReducer) { 072 reducer.beginPartitionedStep(); 073 } 074 075 // Wait for task completion and then aggregate the results 076 Collection<StepExecution> stepExecutions = getPartitionHandler().handle(null, stepExecution); 077 stepExecution.upgradeStatus(BatchStatus.COMPLETED); 078 stepExecutionAggregator.aggregate(stepExecution, stepExecutions); 079 080 if (stepExecution.getStatus().isUnsuccessful()) { 081 if (hasReducer) { 082 reducer.rollbackPartitionedStep(); 083 reducer.afterPartitionedStepCompletion(PartitionStatus.ROLLBACK); 084 } 085 throw new JobExecutionException("Partition handler returned an unsuccessful step"); 086 } 087 088 if (hasReducer) { 089 reducer.beforePartitionedStepCompletion(); 090 reducer.afterPartitionedStepCompletion(PartitionStatus.COMMIT); 091 } 092 } 093 094 /* (non-Javadoc) 095 * @see org.springframework.batch.core.step.StepLocator#getStepNames() 096 */ 097 @Override 098 public Collection<String> getStepNames() { 099 return ((JsrPartitionHandler) getPartitionHandler()).getPartitionStepNames(); 100 } 101 102 /* (non-Javadoc) 103 * @see org.springframework.batch.core.step.StepLocator#getStep(java.lang.String) 104 */ 105 @Override 106 public Step getStep(String stepName) throws NoSuchStepException { 107 JsrPartitionHandler partitionHandler = (JsrPartitionHandler) getPartitionHandler(); 108 Collection<String> names = partitionHandler.getPartitionStepNames(); 109 110 if(names.contains(stepName)) { 111 return partitionHandler.getStep(); 112 } else { 113 throw new NoSuchStepException(stepName + " was not found"); 114 } 115 } 116}