001/*
002 * Copyright 2013-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.jsr.partition;
017
018import java.io.Serializable;
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.HashSet;
022import java.util.Iterator;
023import java.util.List;
024import java.util.Properties;
025import java.util.Queue;
026import java.util.Set;
027import java.util.concurrent.Callable;
028import java.util.concurrent.Future;
029import java.util.concurrent.FutureTask;
030import java.util.concurrent.LinkedBlockingQueue;
031import java.util.concurrent.locks.ReentrantLock;
032
033import javax.batch.api.partition.PartitionAnalyzer;
034import javax.batch.api.partition.PartitionCollector;
035import javax.batch.api.partition.PartitionMapper;
036import javax.batch.api.partition.PartitionPlan;
037
038import org.springframework.batch.core.BatchStatus;
039import org.springframework.batch.core.ExitStatus;
040import org.springframework.batch.core.JobExecutionException;
041import org.springframework.batch.core.Step;
042import org.springframework.batch.core.StepExecution;
043import org.springframework.batch.core.jsr.configuration.support.BatchPropertyContext;
044import org.springframework.batch.core.partition.PartitionHandler;
045import org.springframework.batch.core.partition.StepExecutionSplitter;
046import org.springframework.batch.core.repository.JobRepository;
047import org.springframework.batch.item.ExecutionContext;
048import org.springframework.beans.factory.InitializingBean;
049import org.springframework.core.task.TaskRejectedException;
050import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
051import org.springframework.util.Assert;
052
053/**
054 * Executes a step instance per thread using a {@link ThreadPoolTaskExecutor} in
055 * accordance with JSR-352.  The results from each step is aggregated into a
056 * cumulative result.
057 *
058 * @author Michael Minella
059 * @author Mahmoud Ben Hassine
060 * @since 3.0
061 */
062public class JsrPartitionHandler implements PartitionHandler, InitializingBean {
063
064        private static final int DEFAULT_POLLING_INTERVAL = 500;
065
066        // TODO: Replace with proper Channel and Messages once minimum support level for Spring is 4
067        private Queue<Serializable> partitionDataQueue;
068        private ReentrantLock lock;
069        private Step step;
070        private int partitions;
071        private PartitionAnalyzer analyzer;
072        private PartitionMapper mapper;
073        private int threads;
074        private BatchPropertyContext propertyContext;
075        private JobRepository jobRepository;
076        private boolean allowStartIfComplete = false;
077        private Set<String> partitionStepNames = new HashSet<String>();
078        private int pollingInterval = DEFAULT_POLLING_INTERVAL;
079
080        /**
081         * @return the step that will be executed by each partition
082         */
083        public Step getStep() {
084                return step;
085        }
086
087        /**
088         * @return the names of each partitioned step
089         */
090        public Collection<String> getPartitionStepNames() {
091                return partitionStepNames;
092        }
093
094        /**
095         * @param allowStartIfComplete flag stating if the step should restart if it
096         *      was complete in a previous run
097         */
098        public void setAllowStartIfComplete(boolean allowStartIfComplete) {
099                this.allowStartIfComplete = allowStartIfComplete;
100        }
101
102        /**
103         * @param queue {@link Queue} to receive the output of the {@link PartitionCollector}
104         */
105        public void setPartitionDataQueue(Queue<Serializable> queue) {
106                this.partitionDataQueue = queue;
107        }
108
109        public void setPartitionLock(ReentrantLock lock) {
110                this.lock = lock;
111        }
112
113        /**
114         * @param context {@link BatchPropertyContext} to resolve partition level step properties
115         */
116        public void setPropertyContext(BatchPropertyContext context) {
117                this.propertyContext = context;
118        }
119
120        /**
121         * @param mapper {@link PartitionMapper} used to configure partitioning
122         */
123        public void setPartitionMapper(PartitionMapper mapper) {
124                this.mapper = mapper;
125        }
126
127        /**
128         * @param step the step to be executed as a partitioned step
129         */
130        public void setStep(Step step) {
131                this.step = step;
132        }
133
134        /**
135         * @param analyzer {@link PartitionAnalyzer}
136         */
137        public void setPartitionAnalyzer(PartitionAnalyzer analyzer) {
138                this.analyzer = analyzer;
139        }
140
141        /**
142         * @param threads the number of threads to execute the partitions to be run
143         * within.  The default is the number of partitions.
144         */
145        public void setThreads(int threads) {
146                this.threads = threads;
147        }
148
149        /**
150         * @param partitions the number of partitions to be executed
151         */
152        public void setPartitions(int partitions) {
153                this.partitions = partitions;
154        }
155
156        /**
157         * @param jobRepository {@link JobRepository}
158         */
159        public void setJobRepository(JobRepository jobRepository) {
160                this.jobRepository = jobRepository;
161        }
162
163        /**
164         * @param pollingInterval the duration of partitions completion polling interval
165         *                       (in milliseconds). The default value is 500ms.
166         */
167        public void setPollingInterval(int pollingInterval) {
168                this.pollingInterval = pollingInterval;
169        }
170
171        /* (non-Javadoc)
172         * @see org.springframework.batch.core.partition.PartitionHandler#handle(org.springframework.batch.core.partition.StepExecutionSplitter, org.springframework.batch.core.StepExecution)
173         */
174        @Override
175        public Collection<StepExecution> handle(StepExecutionSplitter stepSplitter,
176                        StepExecution stepExecution) throws Exception {
177                final List<Future<StepExecution>> tasks = new ArrayList<Future<StepExecution>>();
178                final Set<StepExecution> result = new HashSet<StepExecution>();
179                final ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
180
181                int stepExecutionCount = jobRepository.getStepExecutionCount(stepExecution.getJobExecution().getJobInstance(), stepExecution.getStepName());
182
183                boolean isRestart = stepExecutionCount > 1;
184
185                Set<StepExecution> partitionStepExecutions = splitStepExecution(stepExecution, isRestart);
186
187                for (StepExecution curStepExecution : partitionStepExecutions) {
188                        partitionStepNames.add(curStepExecution.getStepName());
189                }
190
191                taskExecutor.setCorePoolSize(threads);
192                taskExecutor.setMaxPoolSize(threads);
193
194                taskExecutor.initialize();
195
196                try {
197                        for (final StepExecution curStepExecution : partitionStepExecutions) {
198                                final FutureTask<StepExecution> task = createTask(step, curStepExecution);
199
200                                try {
201                                        taskExecutor.execute(task);
202                                        tasks.add(task);
203                                } catch (TaskRejectedException e) {
204                                        // couldn't execute one of the tasks
205                                        ExitStatus exitStatus = ExitStatus.FAILED
206                                                        .addExitDescription("TaskExecutor rejected the task for this step.");
207                                /*
208                                 * Set the status in case the caller is tracking it through the
209                                 * JobExecution.
210                                 */
211                                        curStepExecution.setStatus(BatchStatus.FAILED);
212                                        curStepExecution.setExitStatus(exitStatus);
213                                        result.add(stepExecution);
214                                }
215                        }
216
217                        processPartitionResults(tasks, result);
218                }
219                finally {
220                        taskExecutor.shutdown();
221                }
222
223                return result;
224        }
225
226        /**
227         * Blocks until all partitioned steps have completed.  As each step completes
228         * the PartitionAnalyzer analyzes the collector data received from each
229         * partition (if there is any).
230         *
231         * @param tasks The {@link Future} that contains the reference to the executing step
232         * @param result Set of completed {@link StepExecution}s
233         * @throws Exception
234         */
235        private void processPartitionResults(
236                        final List<Future<StepExecution>> tasks,
237                        final Set<StepExecution> result) throws Exception {
238                while(true) {
239                        Thread.sleep(pollingInterval);
240                        try {
241                                lock.lock();
242                                while(!partitionDataQueue.isEmpty()) {
243                                        analyzer.analyzeCollectorData(partitionDataQueue.remove());
244                                }
245
246                                processFinishedPartitions(tasks, result);
247
248                                if(tasks.size() == 0) {
249                                        break;
250                                }
251                        } finally {
252                                if(lock.isHeldByCurrentThread()) {
253                                        lock.unlock();
254                                }
255                        }
256                }
257        }
258
259        /**
260         * Uses either the {@link PartitionMapper} or the hard coded configuration to split
261         * the supplied master StepExecution into the slave StepExecutions.
262         *
263         * @param stepExecution master {@link StepExecution}
264         * @param isRestart true if this step is being restarted
265         * @return a {@link Set} of {@link StepExecution}s to be executed
266         * @throws Exception
267         * @throws JobExecutionException
268         */
269        private Set<StepExecution> splitStepExecution(StepExecution stepExecution,
270                        boolean isRestart) throws Exception, JobExecutionException {
271                Set<StepExecution> partitionStepExecutions = new HashSet<StepExecution>();
272                if(isRestart) {
273                        if(mapper != null) {
274                                PartitionPlan plan = mapper.mapPartitions();
275
276                                if(plan.getPartitionsOverride()) {
277                                        partitionStepExecutions = applyPartitionPlan(stepExecution, plan, false);
278
279                                        for (StepExecution curStepExecution : partitionStepExecutions) {
280                                                curStepExecution.setExecutionContext(new ExecutionContext());
281                                        }
282                                } else {
283                                        Properties[] partitionProps = plan.getPartitionProperties();
284
285                                        plan = (PartitionPlanState) stepExecution.getExecutionContext().get("partitionPlanState");
286                                        plan.setPartitionProperties(partitionProps);
287
288                                        partitionStepExecutions = applyPartitionPlan(stepExecution, plan, true);
289                                }
290
291                        } else {
292                                StepExecutionSplitter stepSplitter = new JsrStepExecutionSplitter(jobRepository, allowStartIfComplete, stepExecution.getStepName(), true);
293                                partitionStepExecutions = stepSplitter.split(stepExecution, partitions);
294                        }
295                } else {
296                        if(mapper != null) {
297                                PartitionPlan plan = mapper.mapPartitions();
298                                partitionStepExecutions = applyPartitionPlan(stepExecution, plan, true);
299                        } else {
300                                StepExecutionSplitter stepSplitter = new JsrStepExecutionSplitter(jobRepository, allowStartIfComplete, stepExecution.getStepName(), true);
301                                partitionStepExecutions = stepSplitter.split(stepExecution, partitions);
302                        }
303                }
304                return partitionStepExecutions;
305        }
306
307        private Set<StepExecution> applyPartitionPlan(StepExecution stepExecution,
308                        PartitionPlan plan, boolean restoreState) throws JobExecutionException {
309                StepExecutionSplitter stepSplitter;
310                Set<StepExecution> partitionStepExecutions;
311                if(plan.getThreads() > 0) {
312                        threads = plan.getThreads();
313                } else if(plan.getPartitions() > 0) {
314                        threads = plan.getPartitions();
315                } else {
316                        throw new IllegalArgumentException("Either a number of threads or partitions are required");
317                }
318
319                PartitionPlanState partitionPlanState = new PartitionPlanState();
320                partitionPlanState.setPartitionPlan(plan);
321
322                stepExecution.getExecutionContext().put("partitionPlanState", partitionPlanState);
323
324                stepSplitter = new JsrStepExecutionSplitter(jobRepository, allowStartIfComplete, stepExecution.getStepName(), restoreState);
325                partitionStepExecutions = stepSplitter.split(stepExecution, plan.getPartitions());
326                registerPartitionProperties(partitionStepExecutions, plan);
327                return partitionStepExecutions;
328        }
329
330        private void processFinishedPartitions(
331                        final List<Future<StepExecution>> tasks,
332                        final Set<StepExecution> result) throws Exception {
333                for(int i = 0; i < tasks.size(); i++) {
334                        Future<StepExecution> curTask = tasks.get(i);
335
336                        if(curTask.isDone()) {
337                                StepExecution curStepExecution = curTask.get();
338
339                                if(analyzer != null) {
340                                        analyzer.analyzeStatus(curStepExecution.getStatus().getBatchStatus(), curStepExecution.getExitStatus().getExitCode());
341                                }
342
343                                result.add(curStepExecution);
344
345                                tasks.remove(i);
346                                i--;
347                        }
348                }
349        }
350
351        private void registerPartitionProperties(
352                        Set<StepExecution> partitionStepExecutions, PartitionPlan plan) {
353                Properties[] partitionProperties = plan.getPartitionProperties();
354                if(partitionProperties != null) {
355                        Iterator<StepExecution> executions = partitionStepExecutions.iterator();
356
357                        int i = 0;
358                        while(executions.hasNext()) {
359                                StepExecution curExecution = executions.next();
360
361                                if(i < partitionProperties.length) {
362                                        Properties partitionPropertyValues = partitionProperties[i];
363                                        if(partitionPropertyValues != null) {
364                                                propertyContext.setStepProperties(curExecution.getStepName(), partitionPropertyValues);
365                                        }
366
367                                        i++;
368                                } else {
369                                        break;
370                                }
371                        }
372                }
373        }
374
375        /**
376         * Creates the task executing the given step in the context of the given execution.
377         *
378         * @param step the step to execute
379         * @param stepExecution the given execution
380         * @return the task executing the given step
381         */
382        protected FutureTask<StepExecution> createTask(final Step step,
383                        final StepExecution stepExecution) {
384                return new FutureTask<StepExecution>(new Callable<StepExecution>() {
385                        @Override
386                        public StepExecution call() throws Exception {
387                                step.execute(stepExecution);
388                                return stepExecution;
389                        }
390                });
391        }
392
393        /* (non-Javadoc)
394         * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
395         */
396        @Override
397        public void afterPropertiesSet() throws Exception {
398                Assert.notNull(propertyContext, "A BatchPropertyContext is required");
399                Assert.isTrue(mapper != null || (threads > 0 || partitions > 0), "Either a mapper implementation or the number of partitions/threads is required");
400                Assert.notNull(jobRepository, "A JobRepository is required");
401                Assert.isTrue(pollingInterval >= 0, "The polling interval must be positive");
402
403                if(partitionDataQueue == null) {
404                        partitionDataQueue = new LinkedBlockingQueue<Serializable>();
405                }
406
407                if(lock == null) {
408                        lock = new ReentrantLock();
409                }
410        }
411
412        /**
413         * Since a {@link PartitionPlan} could provide dynamic data (different results from run to run),
414         * the batch runtime needs to save off the results for restarts.  This class serves as a container
415         * used to save off that state.
416         *
417         * @author Michael Minella
418         * @since 3.0
419         */
420        public static class PartitionPlanState implements PartitionPlan, Serializable {
421
422                private static final long serialVersionUID = 1L;
423                private Properties[] partitionProperties;
424                private int partitions;
425                private int threads;
426
427                /**
428                 * @param plan the {@link PartitionPlan} that is the source of the state
429                 */
430                public PartitionPlanState(PartitionPlan plan) {
431                        partitionProperties = plan.getPartitionProperties();
432                        partitions = plan.getPartitions();
433                        threads = plan.getThreads();
434                }
435
436                public PartitionPlanState() {
437                }
438
439                public void setPartitionPlan(PartitionPlan plan) {
440                        this.partitionProperties = plan.getPartitionProperties();
441                        this.partitions = plan.getPartitions();
442                        this.threads = plan.getThreads();
443                }
444
445                /* (non-Javadoc)
446                 * @see javax.batch.api.partition.PartitionPlan#getPartitionProperties()
447                 */
448                @Override
449                public Properties[] getPartitionProperties() {
450                        return partitionProperties;
451                }
452
453                /* (non-Javadoc)
454                 * @see javax.batch.api.partition.PartitionPlan#getPartitions()
455                 */
456                @Override
457                public int getPartitions() {
458                        return partitions;
459                }
460
461                /* (non-Javadoc)
462                 * @see javax.batch.api.partition.PartitionPlan#getThreads()
463                 */
464                @Override
465                public int getThreads() {
466                        return threads;
467                }
468
469                /* (non-Javadoc)
470                 * @see javax.batch.api.partition.PartitionPlan#setPartitions(int)
471                 */
472                @Override
473                public void setPartitions(int count) {
474                        this.partitions = count;
475                }
476
477                /* (non-Javadoc)
478                 * @see javax.batch.api.partition.PartitionPlan#setPartitionsOverride(boolean)
479                 */
480                @Override
481                public void setPartitionsOverride(boolean override) {
482                        // Intentional No-op
483                }
484
485                /* (non-Javadoc)
486                 * @see javax.batch.api.partition.PartitionPlan#getPartitionsOverride()
487                 */
488                @Override
489                public boolean getPartitionsOverride() {
490                        return false;
491                }
492
493                /* (non-Javadoc)
494                 * @see javax.batch.api.partition.PartitionPlan#setThreads(int)
495                 */
496                @Override
497                public void setThreads(int count) {
498                        this.threads = count;
499                }
500
501                /* (non-Javadoc)
502                 * @see javax.batch.api.partition.PartitionPlan#setPartitionProperties(java.util.Properties[])
503                 */
504                @Override
505                public void setPartitionProperties(Properties[] props) {
506                        this.partitionProperties = props;
507                }
508        }
509}