001/*
002 * Copyright 2006-2014 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.job.flow.support;
017
018import java.util.ArrayList;
019import java.util.Collection;
020import java.util.Comparator;
021import java.util.HashMap;
022import java.util.HashSet;
023import java.util.LinkedHashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027import java.util.TreeSet;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031
032import org.springframework.batch.core.Step;
033import org.springframework.batch.core.StepExecution;
034import org.springframework.batch.core.job.flow.Flow;
035import org.springframework.batch.core.job.flow.FlowExecution;
036import org.springframework.batch.core.job.flow.FlowExecutionException;
037import org.springframework.batch.core.job.flow.FlowExecutionStatus;
038import org.springframework.batch.core.job.flow.FlowExecutor;
039import org.springframework.batch.core.job.flow.State;
040import org.springframework.beans.factory.InitializingBean;
041
042/**
043 * A {@link Flow} that branches conditionally depending on the exit status of
044 * the last {@link State}. The input parameters are the state transitions (in no
045 * particular order). The start state name can be specified explicitly (and must
046 * exist in the set of transitions), or computed from the existing transitions,
047 * if unambiguous.
048 *
049 * @author Dave Syer
050 * @author Michael Minella
051 * @since 2.0
052 */
053public class SimpleFlow implements Flow, InitializingBean {
054
055        private static final Log logger = LogFactory.getLog(SimpleFlow.class);
056
057        private State startState;
058
059        private Map<String, Set<StateTransition>> transitionMap = new HashMap<String, Set<StateTransition>>();
060
061        private Map<String, State> stateMap = new HashMap<String, State>();
062
063        private List<StateTransition> stateTransitions = new ArrayList<StateTransition>();
064
065        private final String name;
066
067        private Comparator<StateTransition> stateTransitionComparator;
068
069        public void setStateTransitionComparator(Comparator<StateTransition> stateTransitionComparator) {
070                this.stateTransitionComparator = stateTransitionComparator;
071        }
072
073        /**
074         * Create a flow with the given name.
075         *
076         * @param name the name of the flow
077         */
078        public SimpleFlow(String name) {
079                this.name = name;
080        }
081
082        public State getStartState() {
083                return this.startState;
084        }
085
086        /**
087         * Get the name for this flow.
088         *
089         * @see Flow#getName()
090         */
091        @Override
092        public String getName() {
093                return name;
094        }
095
096        /**
097         * Public setter for the stateTransitions.
098         *
099         * @param stateTransitions the stateTransitions to set
100         */
101        public void setStateTransitions(List<StateTransition> stateTransitions) {
102
103                this.stateTransitions = stateTransitions;
104        }
105
106        /**
107         * {@inheritDoc}
108         */
109        @Override
110        public State getState(String stateName) {
111                return stateMap.get(stateName);
112        }
113
114        /**
115         * {@inheritDoc}
116         */
117        @Override
118        public Collection<State> getStates() {
119                return new HashSet<State>(stateMap.values());
120        }
121
122        /**
123         * Locate start state and pre-populate data structures needed for execution.
124         *
125         * @see InitializingBean#afterPropertiesSet()
126         */
127        @Override
128        public void afterPropertiesSet() throws Exception {
129                if (startState == null) {
130                        initializeTransitions();
131                }
132        }
133
134        /**
135         * @see Flow#start(FlowExecutor)
136         */
137        @Override
138        public FlowExecution start(FlowExecutor executor) throws FlowExecutionException {
139                if (startState == null) {
140                        initializeTransitions();
141                }
142                State state = startState;
143                String stateName = state.getName();
144                return resume(stateName, executor);
145        }
146
147        /**
148         * @see Flow#resume(String, FlowExecutor)
149         */
150        @Override
151        public FlowExecution resume(String stateName, FlowExecutor executor) throws FlowExecutionException {
152
153                FlowExecutionStatus status = FlowExecutionStatus.UNKNOWN;
154                State state = stateMap.get(stateName);
155
156                if (logger.isDebugEnabled()) {
157                        logger.debug("Resuming state="+stateName+" with status="+status);
158                }
159                StepExecution stepExecution = null;
160
161                // Terminate if there are no more states
162                while (isFlowContinued(state, status, stepExecution)) {
163                        stateName = state.getName();
164
165                        try {
166                                if (logger.isDebugEnabled()) {
167                                        logger.debug("Handling state="+stateName);
168                                }
169                                status = state.handle(executor);
170                                stepExecution = executor.getStepExecution();
171                        }
172                        catch (FlowExecutionException e) {
173                                executor.close(new FlowExecution(stateName, status));
174                                throw e;
175                        }
176                        catch (Exception e) {
177                                executor.close(new FlowExecution(stateName, status));
178                                throw new FlowExecutionException(String.format("Ended flow=%s at state=%s with exception", name,
179                                                                                                                                          stateName), e);
180                        }
181
182                        if (logger.isDebugEnabled()) {
183                                logger.debug("Completed state="+stateName+" with status="+status);
184                        }
185
186                        state = nextState(stateName, status, stepExecution);
187                }
188
189                FlowExecution result = new FlowExecution(stateName, status);
190                executor.close(result);
191                return result;
192
193        }
194
195        protected Map<String, Set<StateTransition>> getTransitionMap() {
196                return transitionMap;
197        }
198
199        protected Map<String, State> getStateMap() {
200                return stateMap;
201        }
202
203        /**
204         * @param stateName the name of the next state.
205         * @param status {@link FlowExecutionStatus} instance.
206         * @param stepExecution {@link StepExecution} instance.
207         * @return the next {@link Step} (or null if this is the end)
208         * @throws FlowExecutionException thrown if error occurs during nextState processing.
209         */
210        protected State nextState(String stateName, FlowExecutionStatus status, StepExecution stepExecution) throws FlowExecutionException {
211                Set<StateTransition> set = transitionMap.get(stateName);
212
213                if (set == null) {
214                        throw new FlowExecutionException(String.format("No transitions found in flow=%s for state=%s", getName(),
215                                                                                                                                  stateName));
216                }
217
218                String next = null;
219                String exitCode = status.getName();
220
221                for (StateTransition stateTransition : set) {
222                        if (stateTransition.matches(exitCode) || (exitCode.equals("PENDING") && stateTransition.matches("STOPPED"))) {
223                                if (stateTransition.isEnd()) {
224                                        // End of job
225                                        return null;
226                                }
227                                next = stateTransition.getNext();
228                                break;
229                        }
230                }
231
232                if (next == null) {
233                        throw new FlowExecutionException(String.format("Next state not found in flow=%s for state=%s with exit status=%s", getName(), stateName, status.getName()));
234                }
235
236                if (!stateMap.containsKey(next)) {
237                        throw new FlowExecutionException(String.format("Next state not specified in flow=%s for next=%s",
238                                                                                                                                  getName(), next));
239                }
240
241                return stateMap.get(next);
242
243        }
244
245        protected boolean isFlowContinued(State state, FlowExecutionStatus status, StepExecution stepExecution) {
246                boolean continued = true;
247
248                continued = state != null && status!=FlowExecutionStatus.STOPPED;
249
250                if(stepExecution != null) {
251                        Boolean reRun = (Boolean) stepExecution.getExecutionContext().get("batch.restart");
252                        Boolean executed = (Boolean) stepExecution.getExecutionContext().get("batch.executed");
253
254                        if((executed == null || !executed) && reRun != null && reRun && status == FlowExecutionStatus.STOPPED && !state.getName().endsWith(stepExecution.getStepName()) ) {
255                                continued = true;
256                        }
257                }
258
259                return continued;
260        }
261
262        private boolean stateNameEndsWithStepName(State state, StepExecution stepExecution) {
263                return !(stepExecution == null || state == null) && !state.getName().endsWith(stepExecution.getStepName());
264        }
265
266        /**
267         * Analyse the transitions provided and generate all the information needed
268         * to execute the flow.
269         */
270        private void initializeTransitions() {
271                startState = null;
272                transitionMap.clear();
273                stateMap.clear();
274                boolean hasEndStep = false;
275
276                if (stateTransitions.isEmpty()) {
277                        throw new IllegalArgumentException("No start state was found. You must specify at least one step in a job.");
278                }
279
280                for (StateTransition stateTransition : stateTransitions) {
281                        State state = stateTransition.getState();
282                        String stateName = state.getName();
283                        stateMap.put(stateName, state);
284                }
285
286                for (StateTransition stateTransition : stateTransitions) {
287
288                        State state = stateTransition.getState();
289
290                        if (!stateTransition.isEnd()) {
291
292                                String next = stateTransition.getNext();
293
294                                if (!stateMap.containsKey(next)) {
295                                        throw new IllegalArgumentException("Missing state for [" + stateTransition + "]");
296                                }
297
298                        }
299                        else {
300                                hasEndStep = true;
301                        }
302
303                        String name = state.getName();
304
305                        Set<StateTransition> set = transitionMap.get(name);
306                        if (set == null) {
307                                // If no comparator is provided, we will maintain the order of insertion
308                                if(stateTransitionComparator == null) {
309                                        set = new LinkedHashSet<StateTransition>();
310                                } else {
311                                        set = new TreeSet<StateTransition>(stateTransitionComparator);
312                                }
313
314                                transitionMap.put(name, set);
315                        }
316                        set.add(stateTransition);
317
318                }
319
320                if (!hasEndStep) {
321                        throw new IllegalArgumentException(
322                                                                                                          "No end state was found.  You must specify at least one transition with no next state.");
323                }
324
325                startState = stateTransitions.get(0).getState();
326
327        }
328}