001/*
002 * Copyright 2006-2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.partition.support;
018
019import org.springframework.batch.core.BatchStatus;
020import org.springframework.batch.core.ExitStatus;
021import org.springframework.batch.core.Step;
022import org.springframework.batch.core.StepExecution;
023import org.springframework.batch.core.partition.PartitionHandler;
024import org.springframework.batch.core.step.StepHolder;
025import org.springframework.beans.factory.InitializingBean;
026import org.springframework.beans.factory.annotation.Required;
027import org.springframework.core.task.SyncTaskExecutor;
028import org.springframework.core.task.TaskExecutor;
029import org.springframework.core.task.TaskRejectedException;
030import org.springframework.util.Assert;
031
032import java.util.HashSet;
033import java.util.Set;
034import java.util.concurrent.Callable;
035import java.util.concurrent.Future;
036import java.util.concurrent.FutureTask;
037
038/**
039 * A {@link PartitionHandler} that uses a {@link TaskExecutor} to execute the
040 * partitioned {@link Step} locally in multiple threads. This can be an
041 * effective approach for scaling batch steps that are IO intensive, like
042 * directory and filesystem scanning and copying.
043 * <br>
044 * By default, the thread pool is synchronous.
045 *
046 * @author Sebastien Gerard
047 * @author Dave Syer
048 * @since 2.0
049 */
050public class TaskExecutorPartitionHandler extends AbstractPartitionHandler implements StepHolder, InitializingBean {
051
052        private TaskExecutor taskExecutor = new SyncTaskExecutor();
053
054        private Step step;
055
056    @Override
057        public void afterPropertiesSet() throws Exception {
058        }
059
060        /**
061         * Setter for the {@link TaskExecutor} that is used to farm out step
062         * executions to multiple threads.
063         * @param taskExecutor a {@link TaskExecutor}
064         */
065        public void setTaskExecutor(TaskExecutor taskExecutor) {
066                this.taskExecutor = taskExecutor;
067        }
068
069        /**
070         * Setter for the {@link Step} that will be used to execute the partitioned
071         * {@link StepExecution}. This is a regular Spring Batch step, with all the
072         * business logic required to complete an execution based on the input
073         * parameters in its {@link StepExecution} context.
074         *
075         * @param step the {@link Step} instance to use to execute business logic
076         */
077    @Required
078        public void setStep(Step step) {
079                this.step = step;
080        }
081
082        /**
083         * The step instance that will be executed in parallel by this handler.
084         *
085         * @return the step instance that will be used
086         * @see StepHolder#getStep()
087         */
088    @Override
089        public Step getStep() {
090                return this.step;
091        }
092
093    @Override
094    protected Set<StepExecution> doHandle(StepExecution masterStepExecution,
095                                          Set<StepExecution> partitionStepExecutions) throws Exception {
096        Assert.notNull(step, "A Step must be provided.");
097        final Set<Future<StepExecution>> tasks = new HashSet<Future<StepExecution>>(getGridSize());
098        final Set<StepExecution> result = new HashSet<StepExecution>();
099
100        for (final StepExecution stepExecution : partitionStepExecutions) {
101            final FutureTask<StepExecution> task = createTask(step, stepExecution);
102
103            try {
104                taskExecutor.execute(task);
105                tasks.add(task);
106            } catch (TaskRejectedException e) {
107                // couldn't execute one of the tasks
108                ExitStatus exitStatus = ExitStatus.FAILED
109                        .addExitDescription("TaskExecutor rejected the task for this step.");
110                /*
111                 * Set the status in case the caller is tracking it through the
112                 * JobExecution.
113                 */
114                stepExecution.setStatus(BatchStatus.FAILED);
115                stepExecution.setExitStatus(exitStatus);
116                result.add(stepExecution);
117            }
118        }
119
120        for (Future<StepExecution> task : tasks) {
121            result.add(task.get());
122        }
123
124        return result;
125        }
126
127    /**
128     * Creates the task executing the given step in the context of the given execution.
129     *
130     * @param step the step to execute
131     * @param stepExecution the given execution
132     * @return the task executing the given step
133     */
134    protected FutureTask<StepExecution> createTask(final Step step,
135                                                   final StepExecution stepExecution) {
136        return new FutureTask<StepExecution>(new Callable<StepExecution>() {
137            @Override
138            public StepExecution call() throws Exception {
139                step.execute(stepExecution);
140                return stepExecution;
141            }
142        });
143    }
144
145}