001/* 002 * Copyright 2006-2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.job.flow.support.state; 017 018import java.util.ArrayList; 019import java.util.Collection; 020import java.util.concurrent.Callable; 021import java.util.concurrent.ExecutionException; 022import java.util.concurrent.Future; 023import java.util.concurrent.FutureTask; 024 025import org.springframework.batch.core.job.flow.Flow; 026import org.springframework.batch.core.job.flow.FlowExecution; 027import org.springframework.batch.core.job.flow.FlowExecutionException; 028import org.springframework.batch.core.job.flow.FlowExecutionStatus; 029import org.springframework.batch.core.job.flow.FlowExecutor; 030import org.springframework.batch.core.job.flow.FlowHolder; 031import org.springframework.batch.core.job.flow.State; 032import org.springframework.core.task.SyncTaskExecutor; 033import org.springframework.core.task.TaskExecutor; 034import org.springframework.core.task.TaskRejectedException; 035 036/** 037 * A {@link State} implementation that splits a {@link Flow} into multiple 038 * parallel subflows. 039 * 040 * @author Dave Syer 041 * @since 2.0 042 */ 043public class SplitState extends AbstractState implements FlowHolder { 044 045 private final Collection<Flow> flows; 046 047 private TaskExecutor taskExecutor = new SyncTaskExecutor(); 048 049 private FlowExecutionAggregator aggregator = new MaxValueFlowExecutionAggregator(); 050 051 /** 052 * @param flows collection of {@link Flow} instances. 053 * @param name the name of the state. 054 */ 055 public SplitState(Collection<Flow> flows, String name) { 056 super(name); 057 this.flows = flows; 058 } 059 060 /** 061 * Public setter for the taskExecutor. 062 * @param taskExecutor the taskExecutor to set 063 */ 064 public void setTaskExecutor(TaskExecutor taskExecutor) { 065 this.taskExecutor = taskExecutor; 066 } 067 068 /** 069 * @return the flows 070 */ 071 @Override 072 public Collection<Flow> getFlows() { 073 return flows; 074 } 075 076 /** 077 * Execute the flows in parallel by passing them to the {@link TaskExecutor} 078 * and wait for all of them to finish before proceeding. 079 * 080 * @see State#handle(FlowExecutor) 081 */ 082 @Override 083 public FlowExecutionStatus handle(final FlowExecutor executor) throws Exception { 084 085 // TODO: collect the last StepExecution from the flows as well, so they 086 // can be abandoned if necessary 087 Collection<Future<FlowExecution>> tasks = new ArrayList<Future<FlowExecution>>(); 088 089 for (final Flow flow : flows) { 090 091 final FutureTask<FlowExecution> task = new FutureTask<FlowExecution>(new Callable<FlowExecution>() { 092 @Override 093 public FlowExecution call() throws Exception { 094 return flow.start(executor); 095 } 096 }); 097 098 tasks.add(task); 099 100 try { 101 taskExecutor.execute(task); 102 } 103 catch (TaskRejectedException e) { 104 throw new FlowExecutionException("TaskExecutor rejected task for flow=" + flow.getName()); 105 } 106 107 } 108 109 Collection<FlowExecution> results = new ArrayList<FlowExecution>(); 110 111 // Could use a CompletionService here? 112 for (Future<FlowExecution> task : tasks) { 113 try { 114 results.add(task.get()); 115 } 116 catch (ExecutionException e) { 117 // Unwrap the expected exceptions 118 Throwable cause = e.getCause(); 119 if (cause instanceof Exception) { 120 throw (Exception) cause; 121 } else { 122 throw e; 123 } 124 } 125 } 126 127 return doAggregation(results, executor); 128 } 129 130 protected FlowExecutionStatus doAggregation(Collection<FlowExecution> results, FlowExecutor executor) { 131 return aggregator.aggregate(results); 132 } 133 134 /* 135 * (non-Javadoc) 136 * 137 * @see org.springframework.batch.core.job.flow.State#isEndState() 138 */ 139 @Override 140 public boolean isEndState() { 141 return false; 142 } 143}