001/*
002 * Copyright 2006-2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.job.flow.support.state;
017
018import java.util.ArrayList;
019import java.util.Collection;
020import java.util.concurrent.Callable;
021import java.util.concurrent.ExecutionException;
022import java.util.concurrent.Future;
023import java.util.concurrent.FutureTask;
024
025import org.springframework.batch.core.job.flow.Flow;
026import org.springframework.batch.core.job.flow.FlowExecution;
027import org.springframework.batch.core.job.flow.FlowExecutionException;
028import org.springframework.batch.core.job.flow.FlowExecutionStatus;
029import org.springframework.batch.core.job.flow.FlowExecutor;
030import org.springframework.batch.core.job.flow.FlowHolder;
031import org.springframework.batch.core.job.flow.State;
032import org.springframework.core.task.SyncTaskExecutor;
033import org.springframework.core.task.TaskExecutor;
034import org.springframework.core.task.TaskRejectedException;
035
036/**
037 * A {@link State} implementation that splits a {@link Flow} into multiple
038 * parallel subflows.
039 *
040 * @author Dave Syer
041 * @since 2.0
042 */
043public class SplitState extends AbstractState implements FlowHolder {
044
045        private final Collection<Flow> flows;
046
047        private TaskExecutor taskExecutor = new SyncTaskExecutor();
048
049        private FlowExecutionAggregator aggregator = new MaxValueFlowExecutionAggregator();
050
051        /**
052         * @param flows collection of {@link Flow} instances.
053         * @param name the name of the state.
054         */
055        public SplitState(Collection<Flow> flows, String name) {
056                super(name);
057                this.flows = flows;
058        }
059
060        /**
061         * Public setter for the taskExecutor.
062         * @param taskExecutor the taskExecutor to set
063         */
064        public void setTaskExecutor(TaskExecutor taskExecutor) {
065                this.taskExecutor = taskExecutor;
066        }
067
068        /**
069         * @return the flows
070         */
071        @Override
072        public Collection<Flow> getFlows() {
073                return flows;
074        }
075
076        /**
077         * Execute the flows in parallel by passing them to the {@link TaskExecutor}
078         * and wait for all of them to finish before proceeding.
079         *
080         * @see State#handle(FlowExecutor)
081         */
082        @Override
083        public FlowExecutionStatus handle(final FlowExecutor executor) throws Exception {
084
085                // TODO: collect the last StepExecution from the flows as well, so they
086                // can be abandoned if necessary
087                Collection<Future<FlowExecution>> tasks = new ArrayList<Future<FlowExecution>>();
088
089                for (final Flow flow : flows) {
090
091                        final FutureTask<FlowExecution> task = new FutureTask<FlowExecution>(new Callable<FlowExecution>() {
092                                @Override
093                                public FlowExecution call() throws Exception {
094                                        return flow.start(executor);
095                                }
096                        });
097
098                        tasks.add(task);
099
100                        try {
101                                taskExecutor.execute(task);
102                        }
103                        catch (TaskRejectedException e) {
104                                throw new FlowExecutionException("TaskExecutor rejected task for flow=" + flow.getName());
105                        }
106
107                }
108
109                Collection<FlowExecution> results = new ArrayList<FlowExecution>();
110
111                // Could use a CompletionService here?
112                for (Future<FlowExecution> task : tasks) {
113                        try {
114                                results.add(task.get());
115                        }
116                        catch (ExecutionException e) {
117                                // Unwrap the expected exceptions
118                                Throwable cause = e.getCause();
119                                if (cause instanceof Exception) {
120                                        throw (Exception) cause;
121                                } else {
122                                        throw e;
123                                }
124                        }
125                }
126
127                return doAggregation(results, executor);
128        }
129
130        protected FlowExecutionStatus doAggregation(Collection<FlowExecution> results, FlowExecutor executor) {
131                return aggregator.aggregate(results);
132        }
133
134        /*
135         * (non-Javadoc)
136         *
137         * @see org.springframework.batch.core.job.flow.State#isEndState()
138         */
139        @Override
140        public boolean isEndState() {
141                return false;
142        }
143}