001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.partition.support;
018
019import java.util.Collection;
020import java.util.HashMap;
021import java.util.HashSet;
022import java.util.Map;
023import java.util.Map.Entry;
024import java.util.Set;
025
026import org.springframework.batch.core.BatchStatus;
027import org.springframework.batch.core.JobExecution;
028import org.springframework.batch.core.JobExecutionException;
029import org.springframework.batch.core.JobInstance;
030import org.springframework.batch.core.Step;
031import org.springframework.batch.core.StepExecution;
032import org.springframework.batch.core.partition.StepExecutionSplitter;
033import org.springframework.batch.core.repository.JobRepository;
034import org.springframework.batch.item.ExecutionContext;
035import org.springframework.beans.factory.InitializingBean;
036import org.springframework.util.Assert;
037
038/**
039 * Generic implementation of {@link StepExecutionSplitter} that delegates to a
040 * {@link Partitioner} to generate {@link ExecutionContext} instances. Takes
041 * care of restartability and identifying the step executions from previous runs
042 * of the same job. The generated {@link StepExecution} instances have names
043 * that identify them uniquely in the partition. The name is constructed from a
044 * base (name of the target step) plus a suffix taken from the
045 * {@link Partitioner} identifiers, separated by a colon, e.g.
046 * <code>{step1:partition0, step1:partition1, ...}</code>.
047 *
048 * @author Dave Syer
049 * @author Mahmoud Ben Hassine
050 * @since 2.0
051 */
052public class SimpleStepExecutionSplitter implements StepExecutionSplitter, InitializingBean {
053
054        private static final String STEP_NAME_SEPARATOR = ":";
055
056        private String stepName;
057
058        private Partitioner partitioner;
059
060        private boolean allowStartIfComplete = false;
061
062        private JobRepository jobRepository;
063
064        /**
065         * Default constructor for convenience in configuration.
066         */
067        public SimpleStepExecutionSplitter() {
068        }
069
070        /**
071         * Construct a {@link SimpleStepExecutionSplitter} from its mandatory
072         * properties.
073         *
074         * @param jobRepository the {@link JobRepository}
075         * @param allowStartIfComplete flag specifying preferences on restart
076         * @param stepName the target step name
077         * @param partitioner a {@link Partitioner} to use for generating input
078         * parameters
079         */
080        public SimpleStepExecutionSplitter(JobRepository jobRepository, boolean allowStartIfComplete, String stepName, Partitioner partitioner) {
081                this.jobRepository = jobRepository;
082                this.allowStartIfComplete = allowStartIfComplete;
083                this.partitioner = partitioner;
084                this.stepName = stepName;
085        }
086
087        /**
088         * Construct a {@link SimpleStepExecutionSplitter} from its mandatory
089         * properties.
090         *
091         * @param jobRepository the {@link JobRepository}
092         * @param step the target step (a local version of it), used to extract the
093         * name and allowStartIfComplete flags
094         * @param partitioner a {@link Partitioner} to use for generating input
095         * parameters
096         *
097         * @deprecated use {@link #SimpleStepExecutionSplitter(JobRepository, boolean, String, Partitioner)} instead
098         */
099        @Deprecated
100        public SimpleStepExecutionSplitter(JobRepository jobRepository, Step step, Partitioner partitioner) {
101                this.jobRepository = jobRepository;
102                this.allowStartIfComplete = step.isAllowStartIfComplete();
103                this.partitioner = partitioner;
104                this.stepName = step.getName();
105        }
106
107        /**
108         * Check mandatory properties (step name, job repository and partitioner).
109         *
110         * @see InitializingBean#afterPropertiesSet()
111         */
112        @Override
113        public void afterPropertiesSet() throws Exception {
114                Assert.state(jobRepository != null, "A JobRepository is required");
115                Assert.state(stepName != null, "A step name is required");
116                Assert.state(partitioner != null, "A Partitioner is required");
117        }
118
119        /**
120         * Flag to indicate that the partition target step is allowed to start if an
121         * execution is complete. Defaults to the same value as the underlying step.
122         * Set this manually to override the underlying step properties.
123         *
124         * @see Step#isAllowStartIfComplete()
125         *
126         * @param allowStartIfComplete the value to set
127         */
128        public void setAllowStartIfComplete(boolean allowStartIfComplete) {
129                this.allowStartIfComplete = allowStartIfComplete;
130        }
131
132        /**
133         * The job repository that will be used to manage the persistence of the
134         * delegate step executions.
135         *
136         * @param jobRepository the JobRepository to set
137         */
138        public void setJobRepository(JobRepository jobRepository) {
139                this.jobRepository = jobRepository;
140        }
141
142        /**
143         * The {@link Partitioner} that will be used to generate step execution meta
144         * data for the target step.
145         *
146         * @param partitioner the partitioner to set
147         */
148        public void setPartitioner(Partitioner partitioner) {
149                this.partitioner = partitioner;
150        }
151
152        /**
153         * The name of the target step that will be executed across the partitions.
154         * Mandatory with no default.
155         *
156         * @param stepName the step name to set
157         */
158        public void setStepName(String stepName) {
159                this.stepName = stepName;
160        }
161
162        /**
163         * @see StepExecutionSplitter#getStepName()
164         */
165        @Override
166        public String getStepName() {
167                return this.stepName;
168        }
169
170        /**
171         * @see StepExecutionSplitter#split(StepExecution, int)
172         */
173        @Override
174        public Set<StepExecution> split(StepExecution stepExecution, int gridSize) throws JobExecutionException {
175
176                JobExecution jobExecution = stepExecution.getJobExecution();
177
178                Map<String, ExecutionContext> contexts = getContexts(stepExecution, gridSize);
179                Set<StepExecution> set = new HashSet<StepExecution>(contexts.size());
180
181                for (Entry<String, ExecutionContext> context : contexts.entrySet()) {
182
183                        // Make the step execution name unique and repeatable
184                        String stepName = this.stepName + STEP_NAME_SEPARATOR + context.getKey();
185
186                        StepExecution currentStepExecution = jobExecution.createStepExecution(stepName);
187
188                        boolean startable = isStartable(currentStepExecution, context.getValue());
189
190                        if (startable) {
191                                set.add(currentStepExecution);
192                        }
193                }
194
195                jobRepository.addAll(set);
196
197                Set<StepExecution> executions = new HashSet<StepExecution>(set.size());
198                executions.addAll(set);
199
200                return executions;
201
202        }
203
204        private Map<String, ExecutionContext> getContexts(StepExecution stepExecution, int gridSize) {
205
206                ExecutionContext context = stepExecution.getExecutionContext();
207                String key = SimpleStepExecutionSplitter.class.getSimpleName() + ".GRID_SIZE";
208
209                // If this is a restart we must retain the same grid size, ignoring the
210                // one passed in...
211                int splitSize = (int) context.getLong(key, gridSize);
212                context.putLong(key, splitSize);
213
214                Map<String, ExecutionContext> result;
215                if (context.isDirty()) {
216                        // The context changed so we didn't already know the partitions
217                        jobRepository.updateExecutionContext(stepExecution);
218                        result = partitioner.partition(splitSize);
219                }
220                else {
221                        if (partitioner instanceof PartitionNameProvider) {
222                                result = new HashMap<String, ExecutionContext>();
223                                Collection<String> names = ((PartitionNameProvider) partitioner).getPartitionNames(splitSize);
224                                for (String name : names) {
225                                        /*
226                                         * We need to return the same keys as the original (failed)
227                                         * execution, but the execution contexts will be discarded
228                                         * so they can be empty.
229                                         */
230                                        result.put(name, new ExecutionContext());
231                                }
232                        }
233                        else {
234                                // If no names are provided, grab the partition again.
235                                result = partitioner.partition(splitSize);
236                        }
237                }
238
239                return result;
240        }
241
242        /**
243         * Check if a step execution is startable.
244         * @param stepExecution the step execution to check
245         * @param context the execution context of the step
246         * @return true if the step execution is startable, false otherwise
247         * @throws JobExecutionException if unable to check if the step execution is startable
248         */
249        protected boolean isStartable(StepExecution stepExecution, ExecutionContext context) throws JobExecutionException {
250                return getStartable(stepExecution, context);
251        }
252
253        /**
254         * Check if a step execution is startable.
255         * @param stepExecution the step execution to check
256         * @param context the execution context of the step
257         * @return true if the step execution is startable, false otherwise
258         * @throws JobExecutionException if unable to check if the step execution is startable
259         * @deprecated This method is deprecated in favor of
260         * {@link SimpleStepExecutionSplitter#isStartable} and will be removed in a
261         * future version.
262         */
263        @Deprecated
264        protected boolean getStartable(StepExecution stepExecution, ExecutionContext context) throws JobExecutionException {
265
266                JobInstance jobInstance = stepExecution.getJobExecution().getJobInstance();
267                String stepName = stepExecution.getStepName();
268                StepExecution lastStepExecution = jobRepository.getLastStepExecution(jobInstance, stepName);
269
270                boolean isRestart = (lastStepExecution != null && lastStepExecution.getStatus() != BatchStatus.COMPLETED);
271
272                if (isRestart) {
273                        stepExecution.setExecutionContext(lastStepExecution.getExecutionContext());
274                }
275                else {
276                        stepExecution.setExecutionContext(context);
277                }
278
279                return shouldStart(allowStartIfComplete, stepExecution, lastStepExecution) || isRestart;
280
281        }
282
283        private boolean shouldStart(boolean allowStartIfComplete, StepExecution stepExecution, StepExecution lastStepExecution)
284                        throws JobExecutionException {
285
286                if (lastStepExecution == null) {
287                        return true;
288                }
289
290                BatchStatus stepStatus = lastStepExecution.getStatus();
291
292                if (stepStatus == BatchStatus.UNKNOWN) {
293                        throw new JobExecutionException("Cannot restart step from UNKNOWN status.  "
294                                        + "The last execution ended with a failure that could not be rolled back, "
295                                        + "so it may be dangerous to proceed.  " + "Manual intervention is probably necessary.");
296                }
297
298                if (stepStatus == BatchStatus.COMPLETED) {
299                        if (!allowStartIfComplete) {
300                                if (isSameJobExecution(stepExecution, lastStepExecution)) {
301                                        // it's always OK to start again in the same JobExecution
302                                        return true;
303                                }
304                                // step is complete, false should be returned, indicating that
305                                // the step should not be started
306                                return false;
307                        }
308                        else {
309                                return true;
310                        }
311                }
312
313                if (stepStatus == BatchStatus.STOPPED || stepStatus == BatchStatus.FAILED) {
314                        return true;
315                }
316
317                if (stepStatus == BatchStatus.STARTED || stepStatus == BatchStatus.STARTING
318                                || stepStatus == BatchStatus.STOPPING) {
319                        throw new JobExecutionException(
320                                        "Cannot restart step from "
321                                                        + stepStatus
322                                                        + " status.  "
323                                                        + "The old execution may still be executing, so you may need to verify manually that this is the case.");
324                }
325
326                throw new JobExecutionException("Cannot restart step from " + stepStatus + " status.  "
327                                + "We believe the old execution was abandoned and therefore has been marked as un-restartable.");
328
329        }
330
331        private boolean isSameJobExecution(StepExecution stepExecution, StepExecution lastStepExecution) {
332                if (stepExecution.getJobExecutionId()==null) {
333                        return lastStepExecution.getJobExecutionId()==null;
334                }
335                return stepExecution.getJobExecutionId().equals(lastStepExecution.getJobExecutionId());
336        }
337
338}