001/* 002 * Copyright 2006-2014 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.job.flow.support; 017 018import java.util.ArrayList; 019import java.util.Collection; 020import java.util.Comparator; 021import java.util.HashMap; 022import java.util.HashSet; 023import java.util.LinkedHashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import java.util.TreeSet; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031 032import org.springframework.batch.core.Step; 033import org.springframework.batch.core.StepExecution; 034import org.springframework.batch.core.job.flow.Flow; 035import org.springframework.batch.core.job.flow.FlowExecution; 036import org.springframework.batch.core.job.flow.FlowExecutionException; 037import org.springframework.batch.core.job.flow.FlowExecutionStatus; 038import org.springframework.batch.core.job.flow.FlowExecutor; 039import org.springframework.batch.core.job.flow.State; 040import org.springframework.beans.factory.InitializingBean; 041 042/** 043 * A {@link Flow} that branches conditionally depending on the exit status of 044 * the last {@link State}. The input parameters are the state transitions (in no 045 * particular order). The start state name can be specified explicitly (and must 046 * exist in the set of transitions), or computed from the existing transitions, 047 * if unambiguous. 048 * 049 * @author Dave Syer 050 * @author Michael Minella 051 * @since 2.0 052 */ 053public class SimpleFlow implements Flow, InitializingBean { 054 055 private static final Log logger = LogFactory.getLog(SimpleFlow.class); 056 057 private State startState; 058 059 private Map<String, Set<StateTransition>> transitionMap = new HashMap<String, Set<StateTransition>>(); 060 061 private Map<String, State> stateMap = new HashMap<String, State>(); 062 063 private List<StateTransition> stateTransitions = new ArrayList<StateTransition>(); 064 065 private final String name; 066 067 private Comparator<StateTransition> stateTransitionComparator; 068 069 public void setStateTransitionComparator(Comparator<StateTransition> stateTransitionComparator) { 070 this.stateTransitionComparator = stateTransitionComparator; 071 } 072 073 /** 074 * Create a flow with the given name. 075 * 076 * @param name the name of the flow 077 */ 078 public SimpleFlow(String name) { 079 this.name = name; 080 } 081 082 public State getStartState() { 083 return this.startState; 084 } 085 086 /** 087 * Get the name for this flow. 088 * 089 * @see Flow#getName() 090 */ 091 @Override 092 public String getName() { 093 return name; 094 } 095 096 /** 097 * Public setter for the stateTransitions. 098 * 099 * @param stateTransitions the stateTransitions to set 100 */ 101 public void setStateTransitions(List<StateTransition> stateTransitions) { 102 103 this.stateTransitions = stateTransitions; 104 } 105 106 /** 107 * {@inheritDoc} 108 */ 109 @Override 110 public State getState(String stateName) { 111 return stateMap.get(stateName); 112 } 113 114 /** 115 * {@inheritDoc} 116 */ 117 @Override 118 public Collection<State> getStates() { 119 return new HashSet<State>(stateMap.values()); 120 } 121 122 /** 123 * Locate start state and pre-populate data structures needed for execution. 124 * 125 * @see InitializingBean#afterPropertiesSet() 126 */ 127 @Override 128 public void afterPropertiesSet() throws Exception { 129 if (startState == null) { 130 initializeTransitions(); 131 } 132 } 133 134 /** 135 * @see Flow#start(FlowExecutor) 136 */ 137 @Override 138 public FlowExecution start(FlowExecutor executor) throws FlowExecutionException { 139 if (startState == null) { 140 initializeTransitions(); 141 } 142 State state = startState; 143 String stateName = state.getName(); 144 return resume(stateName, executor); 145 } 146 147 /** 148 * @see Flow#resume(String, FlowExecutor) 149 */ 150 @Override 151 public FlowExecution resume(String stateName, FlowExecutor executor) throws FlowExecutionException { 152 153 FlowExecutionStatus status = FlowExecutionStatus.UNKNOWN; 154 State state = stateMap.get(stateName); 155 156 if (logger.isDebugEnabled()) { 157 logger.debug("Resuming state="+stateName+" with status="+status); 158 } 159 StepExecution stepExecution = null; 160 161 // Terminate if there are no more states 162 while (isFlowContinued(state, status, stepExecution)) { 163 stateName = state.getName(); 164 165 try { 166 if (logger.isDebugEnabled()) { 167 logger.debug("Handling state="+stateName); 168 } 169 status = state.handle(executor); 170 stepExecution = executor.getStepExecution(); 171 } 172 catch (FlowExecutionException e) { 173 executor.close(new FlowExecution(stateName, status)); 174 throw e; 175 } 176 catch (Exception e) { 177 executor.close(new FlowExecution(stateName, status)); 178 throw new FlowExecutionException(String.format("Ended flow=%s at state=%s with exception", name, 179 stateName), e); 180 } 181 182 if (logger.isDebugEnabled()) { 183 logger.debug("Completed state="+stateName+" with status="+status); 184 } 185 186 state = nextState(stateName, status, stepExecution); 187 } 188 189 FlowExecution result = new FlowExecution(stateName, status); 190 executor.close(result); 191 return result; 192 193 } 194 195 protected Map<String, Set<StateTransition>> getTransitionMap() { 196 return transitionMap; 197 } 198 199 protected Map<String, State> getStateMap() { 200 return stateMap; 201 } 202 203 /** 204 * @param stateName the name of the next state. 205 * @param status {@link FlowExecutionStatus} instance. 206 * @param stepExecution {@link StepExecution} instance. 207 * @return the next {@link Step} (or null if this is the end) 208 * @throws FlowExecutionException thrown if error occurs during nextState processing. 209 */ 210 protected State nextState(String stateName, FlowExecutionStatus status, StepExecution stepExecution) throws FlowExecutionException { 211 Set<StateTransition> set = transitionMap.get(stateName); 212 213 if (set == null) { 214 throw new FlowExecutionException(String.format("No transitions found in flow=%s for state=%s", getName(), 215 stateName)); 216 } 217 218 String next = null; 219 String exitCode = status.getName(); 220 221 for (StateTransition stateTransition : set) { 222 if (stateTransition.matches(exitCode) || (exitCode.equals("PENDING") && stateTransition.matches("STOPPED"))) { 223 if (stateTransition.isEnd()) { 224 // End of job 225 return null; 226 } 227 next = stateTransition.getNext(); 228 break; 229 } 230 } 231 232 if (next == null) { 233 throw new FlowExecutionException(String.format("Next state not found in flow=%s for state=%s with exit status=%s", getName(), stateName, status.getName())); 234 } 235 236 if (!stateMap.containsKey(next)) { 237 throw new FlowExecutionException(String.format("Next state not specified in flow=%s for next=%s", 238 getName(), next)); 239 } 240 241 return stateMap.get(next); 242 243 } 244 245 protected boolean isFlowContinued(State state, FlowExecutionStatus status, StepExecution stepExecution) { 246 boolean continued = true; 247 248 continued = state != null && status!=FlowExecutionStatus.STOPPED; 249 250 if(stepExecution != null) { 251 Boolean reRun = (Boolean) stepExecution.getExecutionContext().get("batch.restart"); 252 Boolean executed = (Boolean) stepExecution.getExecutionContext().get("batch.executed"); 253 254 if((executed == null || !executed) && reRun != null && reRun && status == FlowExecutionStatus.STOPPED && !state.getName().endsWith(stepExecution.getStepName()) ) { 255 continued = true; 256 } 257 } 258 259 return continued; 260 } 261 262 private boolean stateNameEndsWithStepName(State state, StepExecution stepExecution) { 263 return !(stepExecution == null || state == null) && !state.getName().endsWith(stepExecution.getStepName()); 264 } 265 266 /** 267 * Analyse the transitions provided and generate all the information needed 268 * to execute the flow. 269 */ 270 private void initializeTransitions() { 271 startState = null; 272 transitionMap.clear(); 273 stateMap.clear(); 274 boolean hasEndStep = false; 275 276 if (stateTransitions.isEmpty()) { 277 throw new IllegalArgumentException("No start state was found. You must specify at least one step in a job."); 278 } 279 280 for (StateTransition stateTransition : stateTransitions) { 281 State state = stateTransition.getState(); 282 String stateName = state.getName(); 283 stateMap.put(stateName, state); 284 } 285 286 for (StateTransition stateTransition : stateTransitions) { 287 288 State state = stateTransition.getState(); 289 290 if (!stateTransition.isEnd()) { 291 292 String next = stateTransition.getNext(); 293 294 if (!stateMap.containsKey(next)) { 295 throw new IllegalArgumentException("Missing state for [" + stateTransition + "]"); 296 } 297 298 } 299 else { 300 hasEndStep = true; 301 } 302 303 String name = state.getName(); 304 305 Set<StateTransition> set = transitionMap.get(name); 306 if (set == null) { 307 // If no comparator is provided, we will maintain the order of insertion 308 if(stateTransitionComparator == null) { 309 set = new LinkedHashSet<StateTransition>(); 310 } else { 311 set = new TreeSet<StateTransition>(stateTransitionComparator); 312 } 313 314 transitionMap.put(name, set); 315 } 316 set.add(stateTransition); 317 318 } 319 320 if (!hasEndStep) { 321 throw new IllegalArgumentException( 322 "No end state was found. You must specify at least one transition with no next state."); 323 } 324 325 startState = stateTransitions.get(0).getState(); 326 327 } 328}