001/*
002 * Copyright 2006-2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.partition.support;
018
019import org.springframework.batch.core.BatchStatus;
020import org.springframework.batch.core.JobExecutionException;
021import org.springframework.batch.core.Step;
022import org.springframework.batch.core.StepExecution;
023import org.springframework.batch.core.partition.PartitionHandler;
024import org.springframework.batch.core.partition.StepExecutionSplitter;
025import org.springframework.batch.core.step.AbstractStep;
026import org.springframework.batch.item.ExecutionContext;
027import org.springframework.util.Assert;
028
029import java.util.Collection;
030
031/**
032 * Implementation of {@link Step} which partitions the execution and spreads the
033 * load using a {@link PartitionHandler}.
034 *
035 * @author Dave Syer
036 * @since 2.0
037 */
038public class PartitionStep extends AbstractStep {
039
040        private StepExecutionSplitter stepExecutionSplitter;
041
042        private PartitionHandler partitionHandler;
043
044        private StepExecutionAggregator stepExecutionAggregator = new DefaultStepExecutionAggregator();
045
046        /**
047         * A {@link PartitionHandler} which can send out step executions for remote
048         * processing and bring back the results.
049         *
050         * @param partitionHandler the {@link PartitionHandler} to set
051         */
052        public void setPartitionHandler(PartitionHandler partitionHandler) {
053                this.partitionHandler = partitionHandler;
054        }
055
056        /**
057         * A {@link StepExecutionAggregator} that can aggregate step executions when
058         * they come back from the handler. Defaults to a
059         * {@link DefaultStepExecutionAggregator}.
060         *
061         * @param stepExecutionAggregator the {@link StepExecutionAggregator} to set
062         */
063        public void setStepExecutionAggregator(StepExecutionAggregator stepExecutionAggregator) {
064                this.stepExecutionAggregator = stepExecutionAggregator;
065        }
066
067        /**
068         * Public setter for mandatory property {@link StepExecutionSplitter}.
069         * @param stepExecutionSplitter the {@link StepExecutionSplitter} to set
070         */
071        public void setStepExecutionSplitter(StepExecutionSplitter stepExecutionSplitter) {
072                this.stepExecutionSplitter = stepExecutionSplitter;
073        }
074
075        /**
076         * Assert that mandatory properties are set (stepExecutionSplitter,
077         * partitionHandler) and delegate top superclass.
078         *
079         * @see AbstractStep#afterPropertiesSet()
080         */
081        @Override
082        public void afterPropertiesSet() throws Exception {
083                Assert.notNull(stepExecutionSplitter, "StepExecutionSplitter must be provided");
084                Assert.notNull(partitionHandler, "PartitionHandler must be provided");
085                super.afterPropertiesSet();
086        }
087
088        /**
089         * Delegate execution to the {@link PartitionHandler} provided. The
090         * {@link StepExecution} passed in here becomes the parent or master
091         * execution for the partition, summarising the status on exit of the
092         * logical grouping of work carried out by the {@link PartitionHandler}. The
093         * individual step executions and their input parameters (through
094         * {@link ExecutionContext}) for the partition elements are provided by the
095         * {@link StepExecutionSplitter}.
096         *
097         * @param stepExecution the master step execution for the partition
098         *
099         * @see Step#execute(StepExecution)
100         */
101        @Override
102        protected void doExecute(StepExecution stepExecution) throws Exception {
103                stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());
104
105                // Wait for task completion and then aggregate the results
106                Collection<StepExecution> executions = partitionHandler.handle(stepExecutionSplitter, stepExecution);
107                stepExecution.upgradeStatus(BatchStatus.COMPLETED);
108                stepExecutionAggregator.aggregate(stepExecution, executions);
109
110                // If anything failed or had a problem we need to crap out
111                if (stepExecution.getStatus().isUnsuccessful()) {
112                        throw new JobExecutionException("Partition handler returned an unsuccessful step");
113                }
114        }
115
116        protected StepExecutionSplitter getStepExecutionSplitter() {
117                return stepExecutionSplitter;
118        }
119
120        protected PartitionHandler getPartitionHandler() {
121                return partitionHandler;
122        }
123}