001/*
002 * Copyright 2013-2014 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.jsr.step;
017
018import java.util.Collection;
019
020import javax.batch.api.partition.PartitionReducer;
021import javax.batch.api.partition.PartitionReducer.PartitionStatus;
022
023import org.springframework.batch.core.BatchStatus;
024import org.springframework.batch.core.JobExecutionException;
025import org.springframework.batch.core.Step;
026import org.springframework.batch.core.StepExecution;
027import org.springframework.batch.core.jsr.partition.JsrPartitionHandler;
028import org.springframework.batch.core.jsr.partition.support.JsrStepExecutionAggregator;
029import org.springframework.batch.core.partition.PartitionHandler;
030import org.springframework.batch.core.partition.StepExecutionSplitter;
031import org.springframework.batch.core.partition.support.StepExecutionAggregator;
032import org.springframework.batch.core.step.NoSuchStepException;
033import org.springframework.batch.core.step.StepLocator;
034import org.springframework.batch.item.ExecutionContext;
035
036/**
037 * An extension of the {@link PartitionStep} that provides additional semantics
038 * required by JSR-352.  Specifically, this implementation adds the required
039 * lifecycle calls to the {@link PartitionReducer} if it is used.
040 *
041 * @author Michael Minella
042 * @since 3.0
043 */
044public class PartitionStep extends org.springframework.batch.core.partition.support.PartitionStep implements StepLocator {
045
046        private PartitionReducer reducer;
047        private boolean hasReducer = false;
048        private StepExecutionAggregator stepExecutionAggregator = new JsrStepExecutionAggregator();
049
050        public void setPartitionReducer(PartitionReducer reducer) {
051                this.reducer = reducer;
052                hasReducer = reducer != null;
053        }
054
055        /**
056         * Delegate execution to the {@link PartitionHandler} provided. The
057         * {@link StepExecution} passed in here becomes the parent or master
058         * execution for the partition, summarizing the status on exit of the
059         * logical grouping of work carried out by the {@link PartitionHandler}. The
060         * individual step executions and their input parameters (through
061         * {@link ExecutionContext}) for the partition elements are provided by the
062         * {@link StepExecutionSplitter}.
063         *
064         * @param stepExecution the master step execution for the partition
065         *
066         * @see Step#execute(StepExecution)
067         */
068        @Override
069        protected void doExecute(StepExecution stepExecution) throws Exception {
070
071                if(hasReducer) {
072                        reducer.beginPartitionedStep();
073                }
074
075                // Wait for task completion and then aggregate the results
076                Collection<StepExecution> stepExecutions = getPartitionHandler().handle(null, stepExecution);
077                stepExecution.upgradeStatus(BatchStatus.COMPLETED);
078                stepExecutionAggregator.aggregate(stepExecution, stepExecutions);
079
080                if (stepExecution.getStatus().isUnsuccessful()) {
081                        if (hasReducer) {
082                                reducer.rollbackPartitionedStep();
083                                reducer.afterPartitionedStepCompletion(PartitionStatus.ROLLBACK);
084                        }
085                        throw new JobExecutionException("Partition handler returned an unsuccessful step");
086                }
087
088                if (hasReducer) {
089                        reducer.beforePartitionedStepCompletion();
090                        reducer.afterPartitionedStepCompletion(PartitionStatus.COMMIT);
091                }
092        }
093
094        /* (non-Javadoc)
095         * @see org.springframework.batch.core.step.StepLocator#getStepNames()
096         */
097        @Override
098        public Collection<String> getStepNames() {
099                return ((JsrPartitionHandler) getPartitionHandler()).getPartitionStepNames();
100        }
101
102        /* (non-Javadoc)
103         * @see org.springframework.batch.core.step.StepLocator#getStep(java.lang.String)
104         */
105        @Override
106        public Step getStep(String stepName) throws NoSuchStepException {
107                JsrPartitionHandler partitionHandler =  (JsrPartitionHandler) getPartitionHandler();
108                Collection<String> names = partitionHandler.getPartitionStepNames();
109
110                if(names.contains(stepName)) {
111                        return partitionHandler.getStep();
112                } else {
113                        throw new NoSuchStepException(stepName + " was not found");
114                }
115        }
116}