001/* 002 * Copyright 2006-2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.core.partition.support; 018 019import org.springframework.batch.core.BatchStatus; 020import org.springframework.batch.core.ExitStatus; 021import org.springframework.batch.core.Step; 022import org.springframework.batch.core.StepExecution; 023import org.springframework.batch.core.partition.PartitionHandler; 024import org.springframework.batch.core.step.StepHolder; 025import org.springframework.beans.factory.InitializingBean; 026import org.springframework.beans.factory.annotation.Required; 027import org.springframework.core.task.SyncTaskExecutor; 028import org.springframework.core.task.TaskExecutor; 029import org.springframework.core.task.TaskRejectedException; 030import org.springframework.util.Assert; 031 032import java.util.HashSet; 033import java.util.Set; 034import java.util.concurrent.Callable; 035import java.util.concurrent.Future; 036import java.util.concurrent.FutureTask; 037 038/** 039 * A {@link PartitionHandler} that uses a {@link TaskExecutor} to execute the 040 * partitioned {@link Step} locally in multiple threads. This can be an 041 * effective approach for scaling batch steps that are IO intensive, like 042 * directory and filesystem scanning and copying. 043 * <br> 044 * By default, the thread pool is synchronous. 045 * 046 * @author Sebastien Gerard 047 * @author Dave Syer 048 * @since 2.0 049 */ 050public class TaskExecutorPartitionHandler extends AbstractPartitionHandler implements StepHolder, InitializingBean { 051 052 private TaskExecutor taskExecutor = new SyncTaskExecutor(); 053 054 private Step step; 055 056 @Override 057 public void afterPropertiesSet() throws Exception { 058 } 059 060 /** 061 * Setter for the {@link TaskExecutor} that is used to farm out step 062 * executions to multiple threads. 063 * @param taskExecutor a {@link TaskExecutor} 064 */ 065 public void setTaskExecutor(TaskExecutor taskExecutor) { 066 this.taskExecutor = taskExecutor; 067 } 068 069 /** 070 * Setter for the {@link Step} that will be used to execute the partitioned 071 * {@link StepExecution}. This is a regular Spring Batch step, with all the 072 * business logic required to complete an execution based on the input 073 * parameters in its {@link StepExecution} context. 074 * 075 * @param step the {@link Step} instance to use to execute business logic 076 */ 077 @Required 078 public void setStep(Step step) { 079 this.step = step; 080 } 081 082 /** 083 * The step instance that will be executed in parallel by this handler. 084 * 085 * @return the step instance that will be used 086 * @see StepHolder#getStep() 087 */ 088 @Override 089 public Step getStep() { 090 return this.step; 091 } 092 093 @Override 094 protected Set<StepExecution> doHandle(StepExecution masterStepExecution, 095 Set<StepExecution> partitionStepExecutions) throws Exception { 096 Assert.notNull(step, "A Step must be provided."); 097 final Set<Future<StepExecution>> tasks = new HashSet<Future<StepExecution>>(getGridSize()); 098 final Set<StepExecution> result = new HashSet<StepExecution>(); 099 100 for (final StepExecution stepExecution : partitionStepExecutions) { 101 final FutureTask<StepExecution> task = createTask(step, stepExecution); 102 103 try { 104 taskExecutor.execute(task); 105 tasks.add(task); 106 } catch (TaskRejectedException e) { 107 // couldn't execute one of the tasks 108 ExitStatus exitStatus = ExitStatus.FAILED 109 .addExitDescription("TaskExecutor rejected the task for this step."); 110 /* 111 * Set the status in case the caller is tracking it through the 112 * JobExecution. 113 */ 114 stepExecution.setStatus(BatchStatus.FAILED); 115 stepExecution.setExitStatus(exitStatus); 116 result.add(stepExecution); 117 } 118 } 119 120 for (Future<StepExecution> task : tasks) { 121 result.add(task.get()); 122 } 123 124 return result; 125 } 126 127 /** 128 * Creates the task executing the given step in the context of the given execution. 129 * 130 * @param step the step to execute 131 * @param stepExecution the given execution 132 * @return the task executing the given step 133 */ 134 protected FutureTask<StepExecution> createTask(final Step step, 135 final StepExecution stepExecution) { 136 return new FutureTask<StepExecution>(new Callable<StepExecution>() { 137 @Override 138 public StepExecution call() throws Exception { 139 step.execute(stepExecution); 140 return stepExecution; 141 } 142 }); 143 } 144 145}