001/*
002 * Copyright 2012-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.job.builder;
017
018import java.util.ArrayList;
019import java.util.Arrays;
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Map;
025import java.util.Set;
026
027import org.springframework.batch.core.ExitStatus;
028import org.springframework.batch.core.Step;
029import org.springframework.batch.core.job.flow.Flow;
030import org.springframework.batch.core.job.flow.FlowExecutionStatus;
031import org.springframework.batch.core.job.flow.JobExecutionDecider;
032import org.springframework.batch.core.job.flow.State;
033import org.springframework.batch.core.job.flow.support.DefaultStateTransitionComparator;
034import org.springframework.batch.core.job.flow.support.SimpleFlow;
035import org.springframework.batch.core.job.flow.support.StateTransition;
036import org.springframework.batch.core.job.flow.support.state.DecisionState;
037import org.springframework.batch.core.job.flow.support.state.EndState;
038import org.springframework.batch.core.job.flow.support.state.FlowState;
039import org.springframework.batch.core.job.flow.support.state.SplitState;
040import org.springframework.batch.core.job.flow.support.state.StepState;
041import org.springframework.core.task.TaskExecutor;
042
043/**
044 * A builder for a flow of steps that can be executed as a job or as part of a job. Steps can be linked together with
045 * conditional transitions that depend on the exit status of the previous step.
046 *
047 * @author Dave Syer
048 * @author Michael Minella
049 *
050 * @since 2.2
051 *
052 * @param <Q> the type of object returned by the builder (by default a Flow)
053 *
054 */
055public class FlowBuilder<Q> {
056
057        private String name;
058
059        private String prefix;
060
061        private List<StateTransition> transitions = new ArrayList<StateTransition>();
062
063        private Map<String, State> tos = new HashMap<String, State>();
064
065        private State currentState;
066
067        private EndState failedState;
068
069        private EndState completedState;
070
071        private EndState stoppedState;
072
073        private int decisionCounter = 0;
074
075        private int splitCounter = 0;
076
077        private int endCounter = 0;
078
079        private Map<Object, State> states = new HashMap<Object, State>();
080
081        private SimpleFlow flow;
082
083        private boolean dirty = true;
084
085        public FlowBuilder(String name) {
086                this.name = name;
087                this.prefix = name + ".";
088                this.failedState = new EndState(FlowExecutionStatus.FAILED, prefix + "FAILED");
089                this.completedState = new EndState(FlowExecutionStatus.COMPLETED, prefix + "COMPLETED");
090                this.stoppedState = new EndState(FlowExecutionStatus.STOPPED, prefix + "STOPPED");
091        }
092
093        /**
094         * Validate the current state of the builder and build a flow. Subclasses may override this to build an object of a
095         * different type that itself depends on the flow.
096         *
097         * @return a flow
098         */
099        public Q build() {
100                @SuppressWarnings("unchecked")
101                Q result = (Q) flow();
102                return result;
103        }
104
105        /**
106         * Transition to the next step on successful completion of the current step. All other outcomes are treated as
107         * failures.
108         *
109         * @param step the next step
110         * @return this to enable chaining
111         */
112        public FlowBuilder<Q> next(Step step) {
113                doNext(step);
114                return this;
115        }
116
117        /**
118         * Start a flow. If some steps are already registered, just a synonym for {@link #from(Step)}.
119         *
120         * @param step the step to start with
121         * @return this to enable chaining
122         */
123        public FlowBuilder<Q> start(Step step) {
124                doStart(step);
125                return this;
126        }
127
128        /**
129         * Go back to a previously registered step and start a new path. If no steps are registered yet just a synonym for
130         * {@link #start(Step)}.
131         *
132         * @param step the step to start from (already registered)
133         * @return this to enable chaining
134         */
135        public FlowBuilder<Q> from(Step step) {
136                doFrom(step);
137                return this;
138        }
139
140        /**
141         * Transition to the decider on successful completion of the current step. All other outcomes are treated as
142         * failures.
143         *
144         * @param decider the JobExecutionDecider to determine the next step to execute
145         * @return this to enable chaining
146         */
147        public UnterminatedFlowBuilder<Q> next(JobExecutionDecider decider) {
148                doNext(decider);
149                return new UnterminatedFlowBuilder<Q>(this);
150        }
151
152        /**
153         * If a flow should start with a decision use this as the first state.
154         *
155         * @param decider the to start from
156         * @return a builder to enable chaining
157         */
158        public UnterminatedFlowBuilder<Q> start(JobExecutionDecider decider) {
159                doStart(decider);
160                return new UnterminatedFlowBuilder<Q>(this);
161        }
162
163        /**
164         * Start again from a decision that was already registered.
165         *
166         * @param decider the decider to start from (already registered)
167         * @return a builder to enable chaining
168         */
169        public UnterminatedFlowBuilder<Q> from(JobExecutionDecider decider) {
170                doFrom(decider);
171                return new UnterminatedFlowBuilder<Q>(this);
172        }
173
174        /**
175         * Go next on successful completion to a subflow.
176         *
177         * @param flow the flow to go to
178         * @return a builder to enable chaining
179         */
180        public FlowBuilder<Q> next(Flow flow) {
181                doNext(flow);
182                return this;
183        }
184
185        /**
186         * Start again from a subflow that was already registered.
187         *
188         * @param flow the flow to start from (already registered)
189         * @return a builder to enable chaining
190         */
191        public FlowBuilder<Q> from(Flow flow) {
192                doFrom(flow);
193                return this;
194        }
195
196        /**
197         * If a flow should start with a subflow use this as the first state.
198         *
199         * @param flow the flow to start from
200         * @return a builder to enable chaining
201         */
202        public FlowBuilder<Q> start(Flow flow) {
203                doStart(flow);
204                return this;
205        }
206
207        /**
208         * @param executor a task executor to execute the split flows
209         * @return a builder to enable fluent chaining
210         */
211        public SplitBuilder<Q> split(TaskExecutor executor) {
212                return new SplitBuilder<Q>(this, executor);
213        }
214
215        /**
216         * Start a transition to a new state if the exit status from the previous state matches the pattern given.
217         * Successful completion normally results in an exit status equal to (or starting with by convention) "COMPLETED".
218         * See {@link ExitStatus} for commonly used values.
219         *
220         * @param pattern the pattern of exit status on which to take this transition
221         * @return a builder to enable fluent chaining
222         */
223        public TransitionBuilder<Q> on(String pattern) {
224                return new TransitionBuilder<Q>(this, pattern);
225        }
226
227        /**
228         * A synonym for {@link #build()} which callers might find useful. Subclasses can override build to create an object
229         * of the desired type (e.g. a parent builder or an actual flow).
230         *
231         * @return the result of the builder
232         */
233        public final Q end() {
234                return build();
235        }
236
237        protected Flow flow() {
238                if (!dirty) {
239                        // optimization in case this method is called consecutively
240                        return flow;
241                }
242                flow = new SimpleFlow(name);
243                // optimization for flows that only have one state that itself is a flow:
244                if (currentState instanceof FlowState && states.size() == 1) {
245                        return ((FlowState) currentState).getFlows().iterator().next();
246                }
247                addDanglingEndStates();
248                flow.setStateTransitions(transitions);
249                flow.setStateTransitionComparator(new DefaultStateTransitionComparator());
250                dirty = false;
251                return flow;
252        }
253
254        private void doNext(Object input) {
255                if (this.currentState == null) {
256                        doStart(input);
257                }
258                State next = createState(input);
259                addTransition("COMPLETED", next);
260                addTransition("*", failedState);
261                this.currentState = next;
262        }
263
264        private void doStart(Object input) {
265                if (this.currentState != null) {
266                        doFrom(input);
267                }
268                this.currentState = createState(input);
269        }
270
271        private void doFrom(Object input) {
272                if (currentState == null) {
273                        doStart(input);
274                }
275                State state = createState(input);
276                tos.put(currentState.getName(), currentState);
277                this.currentState = state;
278        }
279
280        private State createState(Object input) {
281                State result;
282                if (input instanceof Step) {
283                        if (!states.containsKey(input)) {
284                                Step step = (Step) input;
285                                states.put(input, new StepState(prefix + step.getName(), step));
286                        }
287                        result = states.get(input);
288                }
289                else if (input instanceof JobExecutionDecider) {
290                        if (!states.containsKey(input)) {
291                                states.put(input, new DecisionState((JobExecutionDecider) input, prefix + "decision"
292                                                + (decisionCounter++)));
293                        }
294                        result = states.get(input);
295                }
296                else if (input instanceof Flow) {
297                        if (!states.containsKey(input)) {
298                                states.put(input, new FlowState((Flow) input, prefix + ((Flow) input).getName()));
299                        }
300                        result = states.get(input);
301                }
302                else {
303                        throw new FlowBuilderException("No state can be created for: " + input);
304                }
305                dirty = true;
306                return result;
307        }
308
309        private SplitState createState(Collection<Flow> flows, TaskExecutor executor) {
310                if (!states.containsKey(flows)) {
311                        states.put(flows, new SplitState(flows, prefix + "split" + (splitCounter++)));
312                }
313                SplitState result = (SplitState) states.get(flows);
314                if (executor != null) {
315                        result.setTaskExecutor(executor);
316                }
317                dirty = true;
318                return result;
319        }
320
321        private void addDanglingEndStates() {
322                Set<String> froms = new HashSet<String>();
323                for (StateTransition transition : transitions) {
324                        froms.add(transition.getState().getName());
325                }
326                if (tos.isEmpty() && currentState != null) {
327                        tos.put(currentState.getName(), currentState);
328                }
329                Map<String, State> copy = new HashMap<String, State>(tos);
330                // Find all the states that are really end states but not explicitly declared as such
331                for (String to : copy.keySet()) {
332                        if (!froms.contains(to)) {
333                                currentState = copy.get(to);
334                                if (!currentState.isEndState()) {
335                                        addTransition("COMPLETED", completedState);
336                                        addTransition("*", failedState);
337                                }
338                        }
339                }
340                copy = new HashMap<String, State>(tos);
341                // Then find the states that do not have a default transition
342                for (String from : copy.keySet()) {
343                        currentState = copy.get(from);
344                        if (!currentState.isEndState()) {
345                                if (!hasFail(from)) {
346                                        addTransition("*", failedState);
347                                }
348                                if (!hasCompleted(from)) {
349                                        addTransition("*", completedState);
350                                }
351                        }
352                }
353        }
354
355        private boolean hasFail(String from) {
356                return matches(from, "FAILED");
357        }
358
359        private boolean hasCompleted(String from) {
360                return matches(from, "COMPLETED");
361        }
362
363        private boolean matches(String from, String status) {
364                for (StateTransition transition : transitions) {
365                        if (from.equals(transition.getState().getName()) && transition.matches(status)) {
366                                return true;
367                        }
368                }
369                return false;
370        }
371
372        private void addTransition(String pattern, State next) {
373                tos.put(next.getName(), next);
374                transitions.add(StateTransition.createStateTransition(currentState, pattern, next.getName()));
375                if (transitions.size() == 1) {
376                        transitions.add(StateTransition.createEndStateTransition(failedState));
377                        transitions.add(StateTransition.createEndStateTransition(completedState));
378                        transitions.add(StateTransition.createEndStateTransition(stoppedState));
379                }
380                if (next.isEndState()) {
381                        transitions.add(StateTransition.createEndStateTransition(next));
382                }
383                dirty = true;
384        }
385
386        protected void stop(String pattern) {
387                addTransition(pattern, stoppedState);
388        }
389
390        protected void stop(String pattern, State restart) {
391                EndState next = new EndState(FlowExecutionStatus.STOPPED, "STOPPED", prefix + "stop" + (endCounter++), true);
392                addTransition(pattern, next);
393                currentState = next;
394                addTransition("*", restart);
395        }
396
397        private void end(String pattern) {
398                addTransition(pattern, completedState);
399        }
400
401        private void end(String pattern, String code) {
402                addTransition(pattern, new EndState(FlowExecutionStatus.COMPLETED, code, prefix + "end" + (endCounter++)));
403        }
404
405        private void fail(String pattern) {
406                addTransition(pattern, failedState);
407        }
408
409        /**
410         * A builder for continuing a flow from a decision state.
411         *
412         * @author Dave Syer
413         *
414         * @param <Q> the result of the builder's build()
415         */
416        public static class UnterminatedFlowBuilder<Q> {
417
418                private final FlowBuilder<Q> parent;
419
420                public UnterminatedFlowBuilder(FlowBuilder<Q> parent) {
421                        this.parent = parent;
422                }
423
424                /**
425                 * Start a transition to a new state if the exit status from the previous state matches the pattern given.
426                 * Successful completion normally results in an exit status equal to (or starting with by convention)
427                 * "COMPLETED". See {@link ExitStatus} for commonly used values.
428                 *
429                 * @param pattern the pattern of exit status on which to take this transition
430                 * @return a TransitionBuilder
431                 */
432                public TransitionBuilder<Q> on(String pattern) {
433                        return new TransitionBuilder<Q>(parent, pattern);
434                }
435
436        }
437
438        /**
439         * A builder for transitions within a flow.
440         *
441         * @author Dave Syer
442         *
443         * @param <Q> the result of the parent builder's build()
444         */
445        public static class TransitionBuilder<Q> {
446
447                private final FlowBuilder<Q> parent;
448
449                private final String pattern;
450
451                public TransitionBuilder(FlowBuilder<Q> parent, String pattern) {
452                        this.parent = parent;
453                        this.pattern = pattern;
454                }
455
456                /**
457                 * Specify the next step.
458                 *
459                 * @param step the next step after this transition
460                 * @return a FlowBuilder
461                 */
462                public FlowBuilder<Q> to(Step step) {
463                        State next = parent.createState(step);
464                        parent.addTransition(pattern, next);
465                        parent.currentState = next;
466                        return parent;
467                }
468
469                /**
470                 * Specify the next state as a complete flow.
471                 *
472                 * @param flow the next flow after this transition
473                 * @return a FlowBuilder
474                 */
475                public FlowBuilder<Q> to(Flow flow) {
476                        State next = parent.createState(flow);
477                        parent.addTransition(pattern, next);
478                        parent.currentState = next;
479                        return parent;
480                }
481
482                /**
483                 * Specify the next state as a decision.
484                 *
485                 * @param decider the decider to determine the next step
486                 * @return a FlowBuilder
487                 */
488                public FlowBuilder<Q> to(JobExecutionDecider decider) {
489                        State next = parent.createState(decider);
490                        parent.addTransition(pattern, next);
491                        parent.currentState = next;
492                        return parent;
493                }
494
495                /**
496                 * Signal the successful end of the flow.
497                 *
498                 * @return a FlowBuilder
499                 */
500                public FlowBuilder<Q> stop() {
501                        parent.stop(pattern);
502                        return parent;
503                }
504
505                /**
506                 * Stop the flow and provide a flow to start with if the flow is restarted.
507                 *
508                 * @param flow the flow to restart with
509                 * @return a FlowBuilder
510                 */
511                public FlowBuilder<Q> stopAndRestart(Flow flow) {
512                        State next = parent.createState(flow);
513                        parent.stop(pattern, next);
514                        return parent;
515                }
516
517                /**
518                 * Stop the flow and provide a decider to start with if the flow is restarted.
519                 *
520                 * @param decider a decider to restart with
521                 * @return a FlowBuilder
522                 */
523                public FlowBuilder<Q> stopAndRestart(JobExecutionDecider decider) {
524                        State next = parent.createState(decider);
525                        parent.stop(pattern, next);
526                        return parent;
527                }
528
529                /**
530                 * Stop the flow and provide a step to start with if the flow is restarted.
531                 *
532                 * @param restart the step to restart with
533                 * @return a FlowBuilder
534                 */
535                public FlowBuilder<Q> stopAndRestart(Step restart) {
536                        State next = parent.createState(restart);
537                        parent.stop(pattern, next);
538                        return parent;
539                }
540
541                /**
542                 * Signal the successful end of the flow.
543                 *
544                 * @return a FlowBuilder
545                 */
546                public FlowBuilder<Q> end() {
547                        parent.end(pattern);
548                        return parent;
549                }
550
551                /**
552                 * Signal the end of the flow with the status provided.
553                 *
554                 * @param status {@link String} containing the status.
555                 * @return a FlowBuilder
556                 */
557                public FlowBuilder<Q> end(String status) {
558                        parent.end(pattern, status);
559                        return parent;
560                }
561
562                /**
563                 * Signal the end of the flow with an error condition.
564                 *
565                 * @return a FlowBuilder
566                 */
567                public FlowBuilder<Q> fail() {
568                        parent.fail(pattern);
569                        return parent;
570                }
571        }
572
573        /**
574         * A builder for building a split state. Example (<code>builder</code> is a {@link FlowBuilder}):
575         *
576         * <pre>
577         * Flow splitFlow = builder.start(flow1).split(new SyncTaskExecutor()).add(flow2).build();
578         * </pre>
579         *
580         * where <code>flow1</code> and <code>flow2</code> will be executed (one after the other because of the task
581         * executor that was added). Another example
582         *
583         * <pre>
584         * Flow splitFlow = builder.start(step1).split(new SimpleAsyncTaskExecutor()).add(flow).build();
585         * </pre>
586         *
587         * In this example, a flow consisting of <code>step1</code> will be executed in parallel with <code>flow</code>.
588         *
589         * <em>Note:</em> Adding a split to a chain of states is not supported.  For example, the following configuration
590         * is not supported.  Instead, the configuration would need to create a flow3 that was the split flow and assemble
591         * them separately.
592         *
593         * <pre>
594         * // instead of this
595         * Flow complexFlow = new FlowBuilder&lt;SimpleFlow&gt;("ComplexParallelFlow")
596         *                       .start(flow1)
597         *                       .next(flow2)
598         *                       .split(new SimpleAsyncTaskExecutor())
599         *                       .add(flow3, flow4)
600         *                       .build();
601         *
602         * // do this
603         * Flow splitFlow = new FlowBuilder&lt;SimpleFlow&gt;("parallelFlow")
604         *                       .start(flow3)
605         *                       .split(new SimpleAsyncTaskExecutor())
606         *                       .add(flow4).build();
607         *
608         * Flow complexFlow = new FlowBuilder&lt;SimpleFlow&gt;("ComplexParallelFlow")
609         *                       .start(flow1)
610         *                       .next(flow2)
611         *                       .next(splitFlow)
612         *                       .build();
613         * </pre>
614         *
615         * @author Dave Syer
616         * @author Michael Minella
617         *
618         * @param <Q> the result of the parent builder's build()
619         */
620        public static class SplitBuilder<Q> {
621
622                private final FlowBuilder<Q> parent;
623
624                private TaskExecutor executor;
625
626                /**
627                 * @param parent the parent builder
628                 * @param executor the task executor to use in the split
629                 */
630                public SplitBuilder(FlowBuilder<Q> parent, TaskExecutor executor) {
631                        this.parent = parent;
632                        this.executor = executor;
633                }
634
635                /**
636                 * Add flows to the split, in addition to the current state already present in the parent builder.
637                 *
638                 * @param flows more flows to add to the split
639                 * @return the parent builder
640                 */
641                public FlowBuilder<Q> add(Flow... flows) {
642                        Collection<Flow> list = new ArrayList<Flow>(Arrays.asList(flows));
643                        String name = "split" + (parent.splitCounter++);
644                        int counter = 0;
645                        State one = parent.currentState;
646                        Flow flow = null;
647                        if (!(one == null || one instanceof FlowState)) {
648                                FlowBuilder<Flow> stateBuilder = new FlowBuilder<Flow>(name + "_" + (counter++));
649                                stateBuilder.currentState = one;
650                                flow = stateBuilder.build();
651                        } else if (one instanceof FlowState && parent.states.size() == 1) {
652                                list.add(((FlowState) one).getFlows().iterator().next());
653                        }
654
655                        if (flow != null) {
656                                list.add(flow);
657                        }
658                        State next = parent.createState(list, executor);
659                        parent.currentState = next;
660                        return parent;
661                }
662
663        }
664
665}