001/* 002 * Copyright 2012-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.job.builder; 017 018import java.util.ArrayList; 019import java.util.Arrays; 020import java.util.Collection; 021import java.util.HashMap; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Map; 025import java.util.Set; 026 027import org.springframework.batch.core.ExitStatus; 028import org.springframework.batch.core.Step; 029import org.springframework.batch.core.job.flow.Flow; 030import org.springframework.batch.core.job.flow.FlowExecutionStatus; 031import org.springframework.batch.core.job.flow.JobExecutionDecider; 032import org.springframework.batch.core.job.flow.State; 033import org.springframework.batch.core.job.flow.support.DefaultStateTransitionComparator; 034import org.springframework.batch.core.job.flow.support.SimpleFlow; 035import org.springframework.batch.core.job.flow.support.StateTransition; 036import org.springframework.batch.core.job.flow.support.state.DecisionState; 037import org.springframework.batch.core.job.flow.support.state.EndState; 038import org.springframework.batch.core.job.flow.support.state.FlowState; 039import org.springframework.batch.core.job.flow.support.state.SplitState; 040import org.springframework.batch.core.job.flow.support.state.StepState; 041import org.springframework.core.task.TaskExecutor; 042 043/** 044 * A builder for a flow of steps that can be executed as a job or as part of a job. Steps can be linked together with 045 * conditional transitions that depend on the exit status of the previous step. 046 * 047 * @author Dave Syer 048 * @author Michael Minella 049 * 050 * @since 2.2 051 * 052 * @param <Q> the type of object returned by the builder (by default a Flow) 053 * 054 */ 055public class FlowBuilder<Q> { 056 057 private String name; 058 059 private String prefix; 060 061 private List<StateTransition> transitions = new ArrayList<StateTransition>(); 062 063 private Map<String, State> tos = new HashMap<String, State>(); 064 065 private State currentState; 066 067 private EndState failedState; 068 069 private EndState completedState; 070 071 private EndState stoppedState; 072 073 private int decisionCounter = 0; 074 075 private int splitCounter = 0; 076 077 private int endCounter = 0; 078 079 private Map<Object, State> states = new HashMap<Object, State>(); 080 081 private SimpleFlow flow; 082 083 private boolean dirty = true; 084 085 public FlowBuilder(String name) { 086 this.name = name; 087 this.prefix = name + "."; 088 this.failedState = new EndState(FlowExecutionStatus.FAILED, prefix + "FAILED"); 089 this.completedState = new EndState(FlowExecutionStatus.COMPLETED, prefix + "COMPLETED"); 090 this.stoppedState = new EndState(FlowExecutionStatus.STOPPED, prefix + "STOPPED"); 091 } 092 093 /** 094 * Validate the current state of the builder and build a flow. Subclasses may override this to build an object of a 095 * different type that itself depends on the flow. 096 * 097 * @return a flow 098 */ 099 public Q build() { 100 @SuppressWarnings("unchecked") 101 Q result = (Q) flow(); 102 return result; 103 } 104 105 /** 106 * Transition to the next step on successful completion of the current step. All other outcomes are treated as 107 * failures. 108 * 109 * @param step the next step 110 * @return this to enable chaining 111 */ 112 public FlowBuilder<Q> next(Step step) { 113 doNext(step); 114 return this; 115 } 116 117 /** 118 * Start a flow. If some steps are already registered, just a synonym for {@link #from(Step)}. 119 * 120 * @param step the step to start with 121 * @return this to enable chaining 122 */ 123 public FlowBuilder<Q> start(Step step) { 124 doStart(step); 125 return this; 126 } 127 128 /** 129 * Go back to a previously registered step and start a new path. If no steps are registered yet just a synonym for 130 * {@link #start(Step)}. 131 * 132 * @param step the step to start from (already registered) 133 * @return this to enable chaining 134 */ 135 public FlowBuilder<Q> from(Step step) { 136 doFrom(step); 137 return this; 138 } 139 140 /** 141 * Transition to the decider on successful completion of the current step. All other outcomes are treated as 142 * failures. 143 * 144 * @param decider the JobExecutionDecider to determine the next step to execute 145 * @return this to enable chaining 146 */ 147 public UnterminatedFlowBuilder<Q> next(JobExecutionDecider decider) { 148 doNext(decider); 149 return new UnterminatedFlowBuilder<Q>(this); 150 } 151 152 /** 153 * If a flow should start with a decision use this as the first state. 154 * 155 * @param decider the to start from 156 * @return a builder to enable chaining 157 */ 158 public UnterminatedFlowBuilder<Q> start(JobExecutionDecider decider) { 159 doStart(decider); 160 return new UnterminatedFlowBuilder<Q>(this); 161 } 162 163 /** 164 * Start again from a decision that was already registered. 165 * 166 * @param decider the decider to start from (already registered) 167 * @return a builder to enable chaining 168 */ 169 public UnterminatedFlowBuilder<Q> from(JobExecutionDecider decider) { 170 doFrom(decider); 171 return new UnterminatedFlowBuilder<Q>(this); 172 } 173 174 /** 175 * Go next on successful completion to a subflow. 176 * 177 * @param flow the flow to go to 178 * @return a builder to enable chaining 179 */ 180 public FlowBuilder<Q> next(Flow flow) { 181 doNext(flow); 182 return this; 183 } 184 185 /** 186 * Start again from a subflow that was already registered. 187 * 188 * @param flow the flow to start from (already registered) 189 * @return a builder to enable chaining 190 */ 191 public FlowBuilder<Q> from(Flow flow) { 192 doFrom(flow); 193 return this; 194 } 195 196 /** 197 * If a flow should start with a subflow use this as the first state. 198 * 199 * @param flow the flow to start from 200 * @return a builder to enable chaining 201 */ 202 public FlowBuilder<Q> start(Flow flow) { 203 doStart(flow); 204 return this; 205 } 206 207 /** 208 * @param executor a task executor to execute the split flows 209 * @return a builder to enable fluent chaining 210 */ 211 public SplitBuilder<Q> split(TaskExecutor executor) { 212 return new SplitBuilder<Q>(this, executor); 213 } 214 215 /** 216 * Start a transition to a new state if the exit status from the previous state matches the pattern given. 217 * Successful completion normally results in an exit status equal to (or starting with by convention) "COMPLETED". 218 * See {@link ExitStatus} for commonly used values. 219 * 220 * @param pattern the pattern of exit status on which to take this transition 221 * @return a builder to enable fluent chaining 222 */ 223 public TransitionBuilder<Q> on(String pattern) { 224 return new TransitionBuilder<Q>(this, pattern); 225 } 226 227 /** 228 * A synonym for {@link #build()} which callers might find useful. Subclasses can override build to create an object 229 * of the desired type (e.g. a parent builder or an actual flow). 230 * 231 * @return the result of the builder 232 */ 233 public final Q end() { 234 return build(); 235 } 236 237 protected Flow flow() { 238 if (!dirty) { 239 // optimization in case this method is called consecutively 240 return flow; 241 } 242 flow = new SimpleFlow(name); 243 // optimization for flows that only have one state that itself is a flow: 244 if (currentState instanceof FlowState && states.size() == 1) { 245 return ((FlowState) currentState).getFlows().iterator().next(); 246 } 247 addDanglingEndStates(); 248 flow.setStateTransitions(transitions); 249 flow.setStateTransitionComparator(new DefaultStateTransitionComparator()); 250 dirty = false; 251 return flow; 252 } 253 254 private void doNext(Object input) { 255 if (this.currentState == null) { 256 doStart(input); 257 } 258 State next = createState(input); 259 addTransition("COMPLETED", next); 260 addTransition("*", failedState); 261 this.currentState = next; 262 } 263 264 private void doStart(Object input) { 265 if (this.currentState != null) { 266 doFrom(input); 267 } 268 this.currentState = createState(input); 269 } 270 271 private void doFrom(Object input) { 272 if (currentState == null) { 273 doStart(input); 274 } 275 State state = createState(input); 276 tos.put(currentState.getName(), currentState); 277 this.currentState = state; 278 } 279 280 private State createState(Object input) { 281 State result; 282 if (input instanceof Step) { 283 if (!states.containsKey(input)) { 284 Step step = (Step) input; 285 states.put(input, new StepState(prefix + step.getName(), step)); 286 } 287 result = states.get(input); 288 } 289 else if (input instanceof JobExecutionDecider) { 290 if (!states.containsKey(input)) { 291 states.put(input, new DecisionState((JobExecutionDecider) input, prefix + "decision" 292 + (decisionCounter++))); 293 } 294 result = states.get(input); 295 } 296 else if (input instanceof Flow) { 297 if (!states.containsKey(input)) { 298 states.put(input, new FlowState((Flow) input, prefix + ((Flow) input).getName())); 299 } 300 result = states.get(input); 301 } 302 else { 303 throw new FlowBuilderException("No state can be created for: " + input); 304 } 305 dirty = true; 306 return result; 307 } 308 309 private SplitState createState(Collection<Flow> flows, TaskExecutor executor) { 310 if (!states.containsKey(flows)) { 311 states.put(flows, new SplitState(flows, prefix + "split" + (splitCounter++))); 312 } 313 SplitState result = (SplitState) states.get(flows); 314 if (executor != null) { 315 result.setTaskExecutor(executor); 316 } 317 dirty = true; 318 return result; 319 } 320 321 private void addDanglingEndStates() { 322 Set<String> froms = new HashSet<String>(); 323 for (StateTransition transition : transitions) { 324 froms.add(transition.getState().getName()); 325 } 326 if (tos.isEmpty() && currentState != null) { 327 tos.put(currentState.getName(), currentState); 328 } 329 Map<String, State> copy = new HashMap<String, State>(tos); 330 // Find all the states that are really end states but not explicitly declared as such 331 for (String to : copy.keySet()) { 332 if (!froms.contains(to)) { 333 currentState = copy.get(to); 334 if (!currentState.isEndState()) { 335 addTransition("COMPLETED", completedState); 336 addTransition("*", failedState); 337 } 338 } 339 } 340 copy = new HashMap<String, State>(tos); 341 // Then find the states that do not have a default transition 342 for (String from : copy.keySet()) { 343 currentState = copy.get(from); 344 if (!currentState.isEndState()) { 345 if (!hasFail(from)) { 346 addTransition("*", failedState); 347 } 348 if (!hasCompleted(from)) { 349 addTransition("*", completedState); 350 } 351 } 352 } 353 } 354 355 private boolean hasFail(String from) { 356 return matches(from, "FAILED"); 357 } 358 359 private boolean hasCompleted(String from) { 360 return matches(from, "COMPLETED"); 361 } 362 363 private boolean matches(String from, String status) { 364 for (StateTransition transition : transitions) { 365 if (from.equals(transition.getState().getName()) && transition.matches(status)) { 366 return true; 367 } 368 } 369 return false; 370 } 371 372 private void addTransition(String pattern, State next) { 373 tos.put(next.getName(), next); 374 transitions.add(StateTransition.createStateTransition(currentState, pattern, next.getName())); 375 if (transitions.size() == 1) { 376 transitions.add(StateTransition.createEndStateTransition(failedState)); 377 transitions.add(StateTransition.createEndStateTransition(completedState)); 378 transitions.add(StateTransition.createEndStateTransition(stoppedState)); 379 } 380 if (next.isEndState()) { 381 transitions.add(StateTransition.createEndStateTransition(next)); 382 } 383 dirty = true; 384 } 385 386 protected void stop(String pattern) { 387 addTransition(pattern, stoppedState); 388 } 389 390 protected void stop(String pattern, State restart) { 391 EndState next = new EndState(FlowExecutionStatus.STOPPED, "STOPPED", prefix + "stop" + (endCounter++), true); 392 addTransition(pattern, next); 393 currentState = next; 394 addTransition("*", restart); 395 } 396 397 private void end(String pattern) { 398 addTransition(pattern, completedState); 399 } 400 401 private void end(String pattern, String code) { 402 addTransition(pattern, new EndState(FlowExecutionStatus.COMPLETED, code, prefix + "end" + (endCounter++))); 403 } 404 405 private void fail(String pattern) { 406 addTransition(pattern, failedState); 407 } 408 409 /** 410 * A builder for continuing a flow from a decision state. 411 * 412 * @author Dave Syer 413 * 414 * @param <Q> the result of the builder's build() 415 */ 416 public static class UnterminatedFlowBuilder<Q> { 417 418 private final FlowBuilder<Q> parent; 419 420 public UnterminatedFlowBuilder(FlowBuilder<Q> parent) { 421 this.parent = parent; 422 } 423 424 /** 425 * Start a transition to a new state if the exit status from the previous state matches the pattern given. 426 * Successful completion normally results in an exit status equal to (or starting with by convention) 427 * "COMPLETED". See {@link ExitStatus} for commonly used values. 428 * 429 * @param pattern the pattern of exit status on which to take this transition 430 * @return a TransitionBuilder 431 */ 432 public TransitionBuilder<Q> on(String pattern) { 433 return new TransitionBuilder<Q>(parent, pattern); 434 } 435 436 } 437 438 /** 439 * A builder for transitions within a flow. 440 * 441 * @author Dave Syer 442 * 443 * @param <Q> the result of the parent builder's build() 444 */ 445 public static class TransitionBuilder<Q> { 446 447 private final FlowBuilder<Q> parent; 448 449 private final String pattern; 450 451 public TransitionBuilder(FlowBuilder<Q> parent, String pattern) { 452 this.parent = parent; 453 this.pattern = pattern; 454 } 455 456 /** 457 * Specify the next step. 458 * 459 * @param step the next step after this transition 460 * @return a FlowBuilder 461 */ 462 public FlowBuilder<Q> to(Step step) { 463 State next = parent.createState(step); 464 parent.addTransition(pattern, next); 465 parent.currentState = next; 466 return parent; 467 } 468 469 /** 470 * Specify the next state as a complete flow. 471 * 472 * @param flow the next flow after this transition 473 * @return a FlowBuilder 474 */ 475 public FlowBuilder<Q> to(Flow flow) { 476 State next = parent.createState(flow); 477 parent.addTransition(pattern, next); 478 parent.currentState = next; 479 return parent; 480 } 481 482 /** 483 * Specify the next state as a decision. 484 * 485 * @param decider the decider to determine the next step 486 * @return a FlowBuilder 487 */ 488 public FlowBuilder<Q> to(JobExecutionDecider decider) { 489 State next = parent.createState(decider); 490 parent.addTransition(pattern, next); 491 parent.currentState = next; 492 return parent; 493 } 494 495 /** 496 * Signal the successful end of the flow. 497 * 498 * @return a FlowBuilder 499 */ 500 public FlowBuilder<Q> stop() { 501 parent.stop(pattern); 502 return parent; 503 } 504 505 /** 506 * Stop the flow and provide a flow to start with if the flow is restarted. 507 * 508 * @param flow the flow to restart with 509 * @return a FlowBuilder 510 */ 511 public FlowBuilder<Q> stopAndRestart(Flow flow) { 512 State next = parent.createState(flow); 513 parent.stop(pattern, next); 514 return parent; 515 } 516 517 /** 518 * Stop the flow and provide a decider to start with if the flow is restarted. 519 * 520 * @param decider a decider to restart with 521 * @return a FlowBuilder 522 */ 523 public FlowBuilder<Q> stopAndRestart(JobExecutionDecider decider) { 524 State next = parent.createState(decider); 525 parent.stop(pattern, next); 526 return parent; 527 } 528 529 /** 530 * Stop the flow and provide a step to start with if the flow is restarted. 531 * 532 * @param restart the step to restart with 533 * @return a FlowBuilder 534 */ 535 public FlowBuilder<Q> stopAndRestart(Step restart) { 536 State next = parent.createState(restart); 537 parent.stop(pattern, next); 538 return parent; 539 } 540 541 /** 542 * Signal the successful end of the flow. 543 * 544 * @return a FlowBuilder 545 */ 546 public FlowBuilder<Q> end() { 547 parent.end(pattern); 548 return parent; 549 } 550 551 /** 552 * Signal the end of the flow with the status provided. 553 * 554 * @param status {@link String} containing the status. 555 * @return a FlowBuilder 556 */ 557 public FlowBuilder<Q> end(String status) { 558 parent.end(pattern, status); 559 return parent; 560 } 561 562 /** 563 * Signal the end of the flow with an error condition. 564 * 565 * @return a FlowBuilder 566 */ 567 public FlowBuilder<Q> fail() { 568 parent.fail(pattern); 569 return parent; 570 } 571 } 572 573 /** 574 * A builder for building a split state. Example (<code>builder</code> is a {@link FlowBuilder}): 575 * 576 * <pre> 577 * Flow splitFlow = builder.start(flow1).split(new SyncTaskExecutor()).add(flow2).build(); 578 * </pre> 579 * 580 * where <code>flow1</code> and <code>flow2</code> will be executed (one after the other because of the task 581 * executor that was added). Another example 582 * 583 * <pre> 584 * Flow splitFlow = builder.start(step1).split(new SimpleAsyncTaskExecutor()).add(flow).build(); 585 * </pre> 586 * 587 * In this example, a flow consisting of <code>step1</code> will be executed in parallel with <code>flow</code>. 588 * 589 * <em>Note:</em> Adding a split to a chain of states is not supported. For example, the following configuration 590 * is not supported. Instead, the configuration would need to create a flow3 that was the split flow and assemble 591 * them separately. 592 * 593 * <pre> 594 * // instead of this 595 * Flow complexFlow = new FlowBuilder<SimpleFlow>("ComplexParallelFlow") 596 * .start(flow1) 597 * .next(flow2) 598 * .split(new SimpleAsyncTaskExecutor()) 599 * .add(flow3, flow4) 600 * .build(); 601 * 602 * // do this 603 * Flow splitFlow = new FlowBuilder<SimpleFlow>("parallelFlow") 604 * .start(flow3) 605 * .split(new SimpleAsyncTaskExecutor()) 606 * .add(flow4).build(); 607 * 608 * Flow complexFlow = new FlowBuilder<SimpleFlow>("ComplexParallelFlow") 609 * .start(flow1) 610 * .next(flow2) 611 * .next(splitFlow) 612 * .build(); 613 * </pre> 614 * 615 * @author Dave Syer 616 * @author Michael Minella 617 * 618 * @param <Q> the result of the parent builder's build() 619 */ 620 public static class SplitBuilder<Q> { 621 622 private final FlowBuilder<Q> parent; 623 624 private TaskExecutor executor; 625 626 /** 627 * @param parent the parent builder 628 * @param executor the task executor to use in the split 629 */ 630 public SplitBuilder(FlowBuilder<Q> parent, TaskExecutor executor) { 631 this.parent = parent; 632 this.executor = executor; 633 } 634 635 /** 636 * Add flows to the split, in addition to the current state already present in the parent builder. 637 * 638 * @param flows more flows to add to the split 639 * @return the parent builder 640 */ 641 public FlowBuilder<Q> add(Flow... flows) { 642 Collection<Flow> list = new ArrayList<Flow>(Arrays.asList(flows)); 643 String name = "split" + (parent.splitCounter++); 644 int counter = 0; 645 State one = parent.currentState; 646 Flow flow = null; 647 if (!(one == null || one instanceof FlowState)) { 648 FlowBuilder<Flow> stateBuilder = new FlowBuilder<Flow>(name + "_" + (counter++)); 649 stateBuilder.currentState = one; 650 flow = stateBuilder.build(); 651 } else if (one instanceof FlowState && parent.states.size() == 1) { 652 list.add(((FlowState) one).getFlows().iterator().next()); 653 } 654 655 if (flow != null) { 656 list.add(flow); 657 } 658 State next = parent.createState(list, executor); 659 parent.currentState = next; 660 return parent; 661 } 662 663 } 664 665}