001/*
002 * Copyright 2006-2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.partition.support;
018
019import java.util.ArrayList;
020import java.util.Collection;
021
022import org.springframework.batch.core.StepExecution;
023import org.springframework.batch.core.explore.JobExplorer;
024import org.springframework.beans.factory.InitializingBean;
025import org.springframework.util.Assert;
026
027/**
028 * Convenience class for aggregating a set of {@link StepExecution} instances
029 * when the input comes from remote steps, so the data need to be refreshed from
030 * the repository.
031 *
032 * @author Dave Syer
033 * @since 2.1
034 */
035public class RemoteStepExecutionAggregator implements StepExecutionAggregator, InitializingBean {
036
037        private StepExecutionAggregator delegate = new DefaultStepExecutionAggregator();
038
039        private JobExplorer jobExplorer;
040
041        /**
042         * Create a new instance (useful for configuration purposes).
043         */
044        public RemoteStepExecutionAggregator() {
045        }
046
047        /**
048         * Create a new instance with a job explorer that can be used to refresh the
049         * data when aggregating.
050         *
051         * @param jobExplorer the {@link JobExplorer} to use
052         */
053        public RemoteStepExecutionAggregator(JobExplorer jobExplorer) {
054                super();
055                this.jobExplorer = jobExplorer;
056        }
057
058        /**
059         * @param jobExplorer the jobExplorer to set
060         */
061        public void setJobExplorer(JobExplorer jobExplorer) {
062                this.jobExplorer = jobExplorer;
063        }
064
065        /**
066         * @param delegate the delegate to set
067         */
068        public void setDelegate(StepExecutionAggregator delegate) {
069                this.delegate = delegate;
070        }
071
072        /**
073         * @throws Exception if the job explorer is not provided
074         */
075        @Override
076        public void afterPropertiesSet() throws Exception {
077                Assert.state(jobExplorer != null, "A JobExplorer must be provided");
078        }
079
080        /**
081         * Aggregates the input executions into the result {@link StepExecution}
082         * delegating to the delegate aggregator once the input has been refreshed
083         * from the {@link JobExplorer}.
084         *
085         * @see StepExecutionAggregator #aggregate(StepExecution, Collection)
086         */
087        @Override
088        public void aggregate(StepExecution result, Collection<StepExecution> executions) {
089                Assert.notNull(result, "To aggregate into a result it must be non-null.");
090                if (executions == null) {
091                        return;
092                }
093                Collection<StepExecution> updates = new ArrayList<StepExecution>();
094                for (StepExecution stepExecution : executions) {
095                        Long id = stepExecution.getId();
096                        Assert.state(id != null, "StepExecution has null id. It must be saved first: " + stepExecution);
097                        StepExecution update = jobExplorer.getStepExecution(stepExecution.getJobExecutionId(), id);
098                        Assert.state(update != null, "Could not reload StepExecution from JobRepository: " + stepExecution);
099                        updates.add(update);
100                }
101                delegate.aggregate(result, updates);
102        }
103
104}