1. 配置 Step


XML

Java

正如domain 章节中所讨论的,Step是一个 domain object,它封装了批处理 job 的独立顺序阶段,并包含定义和控制实际批处理所需的所有信息。这是一个必然模糊的描述,因为任何给定Step的内容由开发人员自行决定编写JobStep可以像开发者所希望的那样简单或复杂。一个简单的Step可能会将数据从文件加载到数据库中,几乎不需要 code(取决于所使用的 implementations)。更复杂的Step可能具有复杂的业务规则,这些规则作为处理的一部分应用,如下图所示:

图 1. Step

1.1. Chunk-oriented 处理

Spring Batch 在其最常见的 implementation 中使用'Chunk-oriented'处理样式。面向块的处理是指在 time 读取数据并在 transaction 边界内写出创建“块”。一个 item 从ItemReader读入,传递给ItemProcessor,并聚合。一旦读取的项目数等于提交间隔,整个块将由ItemWriter写出,然后提交 transaction。下图显示了 process:

图 2. Chunk-oriented 处理

以下 code 显示了相同的概念:

List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
    Object item = itemReader.read()
    Object processedItem = itemProcessor.process(item);
    items.add(processedItem);
}
itemWriter.write(items);

1.1.1. 配置 Step

尽管Step所需的依赖项列表相对较短,但它是一个非常复杂的 class,它可能包含许多协作者。

为了简化 configuration,可以使用 Spring Batch 命名空间,如下面的示例所示:

XML Configuration

<job id="sampleJob" job-repository="jobRepository">
    <step id="step1">
        <tasklet transaction-manager="transactionManager">
            <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
        </tasklet>
    </step>
</job>

使用 java configuration 时,可以使用 Spring Batch 构建器,如下面的示例所示:

Java Configuration

/**
 * Note the JobRepository is typically autowired in and not needed to be explicitly
 * configured
 */
@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
    return this.jobBuilderFactory.get("sampleJob")
                            .repository(jobRepository)
                .start(sampleStep)
                .build();
}

/**
 * Note the TransactionManager is typically autowired in and not needed to be explicitly
 * configured
 */
@Bean
public Step sampleStep(PlatformTransactionManager transactionManager) {
        return this.stepBuilderFactory.get("sampleStep")
                                .transactionManager(transactionManager)
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .build();
}

上面的 configuration 包含了创建 item-oriented step 所需的唯一依赖项:

  • reader:提供处理项目的ItemReader

  • writer:处理ItemReader提供的项目的ItemWriter

  • transaction-manager:Spring 的PlatformTransactionManager开始并在处理过程中提交 transactions。

  • transactionManager:Spring 的PlatformTransactionManager开始并在处理过程中提交 transactions。

  • job-repositoryJobRepository在处理期间(就在提交之前)定期存储StepExecutionExecutionContext。对于 in-line <step/>(在<job/>中定义的一个),它是<job/>元素的属性。对于独立的 step,它被定义为<tasklet/>的属性。

  • repositoryJobRepository在处理期间(就在提交之前)定期存储StepExecutionExecutionContext

  • commit-interval:提交 transaction 之前要处理的项目数。

  • chunk:表示这是基于 item 的 step 以及 transaction 提交之前要处理的项目数。

应该注意的是job-repository默认为jobRepositorytransaction-manager默认为transactionManger。此外,ItemProcessor是可选的,因为 item 可以直接从 reader 传递给 writer。

应该注意的是repository默认为jobRepositorytransactionManager默认为transactionManger(所有这些都是通过基础设施从@EnableBatchProcessing提供的)。此外,ItemProcessor是可选的,因为 item 可以直接从 reader 传递给 writer。

1.1.2. 继承 Parent Step

如果Steps的 group 共享相似的配置,那么定义具体Steps可以继承 properties 的“parent”Step可能会有所帮助。与 Java 中的 class 继承类似,“child”Step将其元素和属性与 parent 组合在一起。孩子也会覆盖任何 parent 的Steps

在下面的示例中,Step,“concreteStep1”继承自“parentStep”。它使用'itemReader','itemProcessor','itemWriter',startLimit=5allowStartIfComplete=true进行实例化。另外,commitInterval是'5',因为它被“concreteStep1”Step覆盖,如下面的示例所示:

<step id="parentStep">
    <tasklet allow-start-if-complete="true">
        <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
    </tasklet>
</step>

<step id="concreteStep1" parent="parentStep">
    <tasklet start-limit="5">
        <chunk processor="itemProcessor" commit-interval="5"/>
    </tasklet>
</step>

job 元素中的 step 仍然需要id属性。这有两个原因:

  • 持久化StepExecution时,id用作 step name。如果 job 中的多个 step 中引用了相同的独立 step,则会发生错误。

  • 当 creating job 流动时,如本章后面所述,next属性应该引用流中的 step,而不是独立的 step。

Abstract Step

有时,可能需要定义 parent Step,它不是完整的Step configuration。例如,如果readerwritertasklet属性不在Step configuration 中,则初始化失败。如果必须在没有这些 properties 的情况下定义 parent,则应使用abstract属性。 abstract Step只是扩展,从未实例化。

在下面的示例中,如果Step abstractParentStep未声明为抽象,则不会实例化它。 Step,“concreteStep2”具有'itemReader','itemWriter'和 commit-interval=10。

<step id="abstractParentStep" abstract="true">
    <tasklet>
        <chunk commit-interval="10"/>
    </tasklet>
</step>

<step id="concreteStep2" parent="abstractParentStep">
    <tasklet>
        <chunk reader="itemReader" writer="itemWriter"/>
    </tasklet>
</step>
合并 Lists

Steps上的一些可配置元素是 lists,例如<listeners/>元素。如果 parent 和 child Steps都声明<listeners/>元素,那么子列表将覆盖 parent。在 order 中允许子项将其他 listeners 添加到 parent 定义的列表中,每个列表元素都具有merge属性。如果元素指定merge="true",那么子列表将与 parent 组合而不是覆盖它。

在下面的示例中,Step“concreteStep3”是使用两个 listeners 创建的:listenerOnelistenerTwo

<step id="listenersParentStep" abstract="true">
    <listeners>
        <listener ref="listenerOne"/>
    <listeners>
</step>

<step id="concreteStep3" parent="listenersParentStep">
    <tasklet>
        <chunk reader="itemReader" writer="itemWriter" commit-interval="5"/>
    </tasklet>
    <listeners merge="true">
        <listener ref="listenerTwo"/>
    <listeners>
</step>

1.1.3. 提交间隔

如前所述,step 读入和写出项目,使用提供的PlatformTransactionManager定期提交。如果commit-interval为 1,它会在写完每个 item 之后提交。这在许多情况下都不太理想,因为开始和提交 transaction 是昂贵的。理想情况下,最好在每个 transaction 中处理尽可能多的项目,这完全取决于正在处理的数据类型和 step 与之交互的资源。因此,可以配置在提交中处理的项目数。以下 example 显示steptaskletcommit-interval value 为 10。

XML Configuration

<job id="sampleJob">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
        </tasklet>
    </step>
</job>

Java Configuration

@Bean
public Job sampleJob() {
    return this.jobBuilderFactory.get("sampleJob")
                     .start(step1())
                     .end()
                     .build();
}

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .build();
}

在前面的 example 中,每个 transaction 中处理了 10 个项目。在处理开始时,开始 transaction。此外,在ItemReader上调用每个 time read,计数器递增。当它达到 10 时,聚合项列表将传递给ItemWriter,transaction 将被提交。

1.1.4. 配置 Step 以重新启动

在“配置和 Running 一个 Job”部分中,讨论了重新启动Job。重启对步骤有很多影响,因此可能需要一些特定的 configuration。

设置开始限制

在许多情况下,您可能希望控制Step可以启动的次数。例如,可能需要配置一个特定的Step,以便它只运行一次,因为它会使某些必须手动修复的资源无效,然后再运行 run。这可以在 step level 上配置,因为不同的步骤可能有不同的要求。只能执行一次的Step可以作为Job的一部分存在,而可以无限地运行Step。以下 code 片段显示了启动限制 configuration 的 example:

XML Configuration

<step id="step1">
    <tasklet start-limit="1">
        <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
    </tasklet>
</step>

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .startLimit(1)
                                .build();
}

上面的 step 只能运行一次。试图再次运行它会导致StartLimitExceededException被抛出。请注意 start-limit 的默认 value 是Integer.MAX_VALUE

重新启动已完成的 Step

在可重新启动的 job 的情况下,可能有一个或多个步骤应始终为 run,无论它们是否在第一个 time 成功。 example 可能是验证 step 或Step,它在处理之前清理资源。在重新启动的 job 的正常处理期间,跳过状态为“COMPLETED”的任何 step,意味着它已成功完成。将allow-start-if-complete设置为“true”会覆盖此项,以便 step 始终运行,如下面的 example 所示:

XML Configuration

<step id="step1">
    <tasklet allow-start-if-complete="true">
        <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
    </tasklet>
</step>

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .allowStartIfComplete(true)
                                .build();
}
Step 重新启动 Configuration Example

以下 example 显示了如何配置 job 以具有可以重新启动的步骤:

XML Configuration

<job id="footballJob" restartable="true">
    <step id="playerload" next="gameLoad">
        <tasklet>
            <chunk reader="playerFileItemReader" writer="playerWriter"
                   commit-interval="10" />
        </tasklet>
    </step>
    <step id="gameLoad" next="playerSummarization">
        <tasklet allow-start-if-complete="true">
            <chunk reader="gameFileItemReader" writer="gameWriter"
                   commit-interval="10"/>
        </tasklet>
    </step>
    <step id="playerSummarization">
        <tasklet start-limit="2">
            <chunk reader="playerSummarizationSource" writer="summaryWriter"
                   commit-interval="10"/>
        </tasklet>
    </step>
</job>

Java Configuration

@Bean
public Job footballJob() {
        return this.jobBuilderFactory.get("footballJob")
                                .start(playerLoad())
                                .next(gameLoad())
                                .next(playerSummarization())
                                .end()
                                .build();
}

@Bean
public Step playerLoad() {
        return this.stepBuilderFactory.get("playerLoad")
                        .<String, String>chunk(10)
                        .reader(playerFileItemReader())
                        .writer(playerWriter())
                        .build();
}

@Bean
public Step gameLoad() {
        return this.stepBuilderFactory.get("gameLoad")
                        .allowStartIfComplete(true)
                        .<String, String>chunk(10)
                        .reader(gameFileItemReader())
                        .writer(gameWriter())
                        .build();
}

@Bean
public Step playerSummarization() {
        return this.stepBuilderFactor.get("playerSummarization")
                        .startLimit(2)
                        .<String, String>chunk(10)
                        .reader(playerSummarizationSource())
                        .writer(summaryWriter())
                        .build();
}

前面的 example configuration 用于 job,它加载有关足球比赛的信息并对其进行总结。它包含三个步骤:playerLoadgameLoadplayerSummarizationplayerLoad step 从平面文件加载玩家信息,而gameLoad step 对游戏执行相同的操作。最后的 step,playerSummarization然后根据提供的游戏总结每个玩家的统计数据。假设playerLoad加载的文件只能加载一次,但gameLoad可以加载在特定目录中找到的任何游戏,在成功加载到数据库后删除它们。因此,playerLoad step 不包含其他 configuration。它可以启动任意次,如果完成,则跳过。但是,gameLoad step 需要每 time 运行一次,以防自上次运行以来添加了额外的 files。在 order 中将'allow-start-if-complete'设置为'true'以始终启动。 (假设加载的数据库表游戏上有一个 process 指示符,以确保摘要 step 可以正确找到新游戏)。摘要 step 是 job 中最重要的,它被配置为具有 2 的起始限制。这很有用,因为如果 step 不断失败,则会将新的 exit code 返回给控制 job 执行的 operators,它可以在手动干预之前不要重新开始。

此 job 为此文档提供了一个 example,与 samples 项目中的footballJob不同。

本节的其余部分描述了footballJob example 的三次运行中发生的情况。

Run 1:

  • playerLoad运行并成功完成,将 400 名玩家添加到'PLAYERS'table。

  • gameLoad运行并处理 11 个 files 值的游戏数据,将其内容加载到'GAMES'table 中。

  • playerSummarization开始处理并在 5 分钟后失败。

Run 2:

  • playerLoad没有 run,因为它已经成功完成,allow-start-if-complete是'false'(默认值)。

  • gameLoad再次运行并处理另外 2 个 files,同时将它们的内容加载到'GAMES'table 中(使用 process 指示符指示它们尚未被处理)。

  • playerSummarization开始处理所有剩余的游戏数据(使用 process 指标过滤)并在 30 分钟后再次失败。

Run 3:

  • playerLoad没有 run,因为它已经成功完成,allow-start-if-complete是'false'(默认值)。

  • gameLoad再次运行并处理另外 2 个 files,同时将它们的内容加载到'GAMES'table 中(使用 process 指示符指示它们尚未被处理)。

  • playerSummarization未启动且 job 立即被终止,因为这是playerSummarization的第三次执行,其限制仅为 2.必须提高限制或Job必须作为新的JobInstance执行。

1.1.5. 配置跳过逻辑

在许多情况下,处理过程中遇到的错误不应导致Step失败,但应该跳过。这通常是一个必须由了解数据本身及其含义的人做出的决定。例如,财务数据可能无法跳过,因为它会导致转账,这需要完全准确。另一方面,加载供应商列表可能允许跳过。如果由于格式不正确或缺少必要信息而未加载供应商,则可能没有问题。通常,这些不良记录也会被记录下来,这在以后讨论 listeners 时会有所介绍。

以下 example 显示了使用跳过限制的示例:

XML Configuration

<step id="step1">
   <tasklet>
      <chunk reader="flatFileItemReader" writer="itemWriter"
             commit-interval="10" skip-limit="10">
         <skippable-exception-classes>
            <include class="org.springframework.batch.item.file.FlatFileParseException"/>
         </skippable-exception-classes>
      </chunk>
   </tasklet>
</step>

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(10)
                                .reader(flatFileItemReader())
                                .writer(itemWriter())
                                .faultTolerant()
                                .skipLimit(10)
                                .skip(FlatFileParseException.class)
                                .build();
}

在前面的 example 中,使用FlatFileItemReader。如果在任何时候抛出FlatFileParseException,则跳过 item 并计算总跳过限制为 10.在 step 执行中读取,process 和写入时跳过单独计数,但该限制适用于所有跳过。达到跳过限制后,找到的下一个 exception 会导致 step 失败。换句话说,第十一次跳过会触发 exception,而不是第十次。

前面的 example 的一个问题是除了FlatFileParseException之外的任何其他 exception 都会导致Job失败。在某些情况下,这可能是正确的行为。但是,在其他情况下,可能更容易识别哪些 exceptions 应该导致失败并跳过其他所有内容,如下面的示例所示:

XML Configuration

<step id="step1">
    <tasklet>
        <chunk reader="flatFileItemReader" writer="itemWriter"
               commit-interval="10" skip-limit="10">
            <skippable-exception-classes>
                <include class="java.lang.Exception"/>
                <exclude class="java.io.FileNotFoundException"/>
            </skippable-exception-classes>
        </chunk>
    </tasklet>
</step>

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(10)
                                .reader(flatFileItemReader())
                                .writer(itemWriter())
                                .faultTolerant()
                                .skipLimit(10)
                                .skip(Exception.class)
                                .noSkip(FileNotFoundException.class)
                                .build();
}

通过将java.lang.Exception标识为可跳过的 exception class,configuration 指示所有Exceptions都是可跳过的。但是,通过'排除'java.io.FileNotFoundException,configuration 将可跳过的 exception classes 列表细化为除FileNotFoundException之外的所有Exceptions。遇到任何排除的 exception classes 都是致命的(也就是说,它们不被跳过)。

对于遇到的任何 exception,可跳过性由 class 层次结构中最近的超类确定。任何未分类的 exception 都被视为“致命”。

<include/><exclude/>元素的 order 无关紧要。

skipnoSkip calls 的 order 无关紧要。

1.1.6. 配置重试逻辑

在大多数情况下,您希望 exception 导致跳过或Step失败。但是,并非所有 exceptions 都是确定性的。如果在读取时遇到FlatFileParseException,则始终为该 record 抛出。重置ItemReader无济于事。但是,对于其他 exceptions,例如DeadlockLoserDataAccessException,表示当前 process 已尝试更新另一个 process 持有锁的 record,等待并再次尝试可能会导致成功。在这种情况下,重试应配置如下:

<step id="step1">
   <tasklet>
      <chunk reader="itemReader" writer="itemWriter"
             commit-interval="2" retry-limit="3">
         <retryable-exception-classes>
            <include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
         </retryable-exception-classes>
      </chunk>
   </tasklet>
</step>
@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .faultTolerant()
                                .retryLimit(3)
                                .retry(DeadlockLoserDataAccessException.class)
                                .build();
}

Step允许限制单个 item 可以重试的次数和一个“可重试”的 exceptions 列表。有关重试如何工作的更多详细信息,请参见重试。

1.1.7. 控制回滚

默认情况下,无论重试还是跳过,从ItemWriter抛出的任何 exceptions 都会导致Step控制的 transaction 回滚。如果如前所述配置了 skip,则从ItemReader抛出的 exceptions 不会导致回滚。但是,在许多情况下,从ItemWriter抛出的 exceptions 不应导致回滚,因为没有执行任何操作来使 transaction 无效。出于这个原因,Step可以配置一个不应该导致回滚的 exceptions 列表,如下面的 example 所示:

XML Configuration

<step id="step1">
   <tasklet>
      <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
      <no-rollback-exception-classes>
         <include class="org.springframework.batch.item.validator.ValidationException"/>
      </no-rollback-exception-classes>
   </tasklet>
</step>

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .faultTolerant()
                                .noRollback(ValidationException.class)
                                .build();
}
Transactional Readers

ItemReader的基本 contract 是它仅向前。 step 缓冲 reader 输入,因此在回滚的情况下,项目不需要 reader 中的 re-read。但是,在某些情况下,reader 构建在 transactional 资源之上,例如 JMS 队列。在这种情况下,由于队列与回滚的 transaction 绑定,因此将从队列中提取的消息重新打开。因此,step 可以配置为不缓冲项目,如下面的示例所示:

XML Configuration

<step id="step1">
    <tasklet>
        <chunk reader="itemReader" writer="itemWriter" commit-interval="2"
               is-reader-transactional-queue="true"/>
    </tasklet>
</step>

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .readerIsTransactionalQueue()
                                .build();
}

1.1.8. Transaction 属性

Transaction 属性可用于控制isolationpropagationtimeout设置。有关设置 transaction 属性的更多信息可以在Spring 核心文档中找到。以下 example _set isolationpropagationtimeout transaction 属性:

XML Configuration

<step id="step1">
    <tasklet>
        <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
        <transaction-attributes isolation="DEFAULT"
                                propagation="REQUIRED"
                                timeout="30"/>
    </tasklet>
</step>

Java Configuration

@Bean
public Step step1() {
        DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
        attribute.setPropagationBehavior(Propagation.REQUIRED.value());
        attribute.setIsolationLevel(Isolation.DEFAULT.value());
        attribute.setTimeout(30);

        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .transactionAttribute(attribute)
                                .build();
}

1.1.9. 使用 Step 注册 ItemStream

step 必须在其生命周期的必要点处理ItemStream回调(有关ItemStream接口的更多信息,请参阅ItemStream)。如果 step 失败并且可能需要重新启动,这是至关重要的,因为ItemStream接口是 step 获取执行之间持久 state 所需的信息。

如果ItemReaderItemProcessorItemWriter本身实现ItemStream接口,则会自动注册这些接口。任何其他流需要单独注册。这通常是将间接依赖项(如委托)注入 reader 和 writer 的情况。可以通过'streams'元素在Step上注册流,如下面的示例所示:

XML Configuration

<step id="step1">
    <tasklet>
        <chunk reader="itemReader" writer="compositeWriter" commit-interval="2">
            <streams>
                <stream ref="fileItemWriter1"/>
                <stream ref="fileItemWriter2"/>
            </streams>
        </chunk>
    </tasklet>
</step>

<beans:bean id="compositeWriter"
            class="org.springframework.batch.item.support.CompositeItemWriter">
    <beans:property name="delegates">
        <beans:list>
            <beans:ref bean="fileItemWriter1" />
            <beans:ref bean="fileItemWriter2" />
        </beans:list>
    </beans:property>
</beans:bean>

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(itemReader())
                                .writer(compositeItemWriter())
                                .stream(fileItemWriter1())
                                .stream(fileItemWriter2())
                                .build();
}

/**
 * In Spring Batch 4, the CompositeItemWriter implements ItemStream so this isn't
 * necessary, but used for an example.
 */
@Bean
public CompositeItemWriter compositeItemWriter() {
        List<ItemWriter> writers = new ArrayList<>(2);
        writers.add(fileItemWriter1());
        writers.add(fileItemWriter2());

        CompositeItemWriter itemWriter = new CompositeItemWriter();

        itemWriter.setDelegates(writers);

        return itemWriter;
}

在上面的 example 中,CompositeItemWriter不是ItemStream,但它的两个代理都是。因此,必须在 framework 中将委托 writers 显式注册为 order 中的流才能正确处理它们。 ItemReader不需要显式注册为流,因为它是Step的直接 property。 step 现在可以重新启动,并且 reader 和 writer 的 state 在失败的 event 中正确保存。

1.1.10. 拦截 Step 执行

Job一样,在执行Step期间有许多 events,用户可能需要执行某些功能。对于 example,在 order 中写出需要页脚的平面文件,ItemWriter需要在Step完成时得到通知,以便可以写入页脚。这可以通过许多Step范围的 listener 中的一个来完成。

实现StepListener的 extensions 之一的任何 class(但不是那个接口本身,因为它是空的)可以通过listeners元素应用于 step。 listeners元素在 step,tasklet 或 chunk 声明中有效。建议您在其 function 应用的 level 处声明 listeners,或者,如果它是 multi-featured(例如StepExecutionListenerItemReadListener),则在它应用的最细粒度 level 处声明它。以下 example 显示了在块 level 上应用的 listener:

XML Configuration

<step id="step1">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"/>
        <listeners>
            <listener ref="chunkListener"/>
        </listeners>
    </tasklet>
</step>

Java Configuration

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(10)
                                .reader(reader())
                                .writer(writer())
                                .listener(chunkListener())
                                .build();
}

如果使用名称空间<step>元素或其中一个*StepFactoryBean工厂,,ItemWriterItemProcessor本身实现StepListener接口之一将自动注册Step。这仅适用于直接注入Step的组件。如果 listener 嵌套在另一个 component 中,则需要显式注册(如前面使用 Step 注册 ItemStream所述)。

除了StepListener接口之外,还提供了 annotations 来解决相同的问题。普通的旧 Java objects 可以使用带有这些注释的方法,然后将这些方法转换为相应的StepListener类型。注释ItemReaderItemWriterTasklet等块组件的自定义_implement 也是常见的。 XML 解析器为<listener/>元素分析注释,并在构建器中使用listener方法注册,因此您需要做的就是使用 XML 命名空间或构建器向 step 注册 listeners。

StepExecutionListener

StepExecutionListener代表Step执行的最通用的 listener。它允许在启动Step之前和_e之后通知,无论它是正常结束还是失败,如下面的示例所示:

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

ExitStatus是 order 中_ret 的 return 类型,以允许 listeners 修改完成Step时返回的 exit code。

与此接口对应的注释是:

  • @BeforeStep

  • @AfterStep

ChunkListener

块被定义为在 transaction 范围内处理的项目。在每个提交间隔提交 transaction,提交一个“块”。 可用于在块开始处理之前或块成功完成之后执行逻辑,如以下接口定义所示:

public interface ChunkListener extends StepListener {

    void beforeChunk(ChunkContext context);
    void afterChunk(ChunkContext context);
    void afterChunkError(ChunkContext context);

}

在 transaction 启动之后但在ItemReader上调用 read 之前调用 beforeChunk 方法。相反,在提交了块之后调用afterChunk(如果存在回滚则根本不调用)。

与此接口对应的注释是:

  • @BeforeChunk

  • @AfterChunk

  • @AfterChunkError

没有块声明时可以应用ChunkListenerTaskletStep负责调用ChunkListener,因此它也适用于 non-item-oriented tasklet(在 tasklet 之前和之后调用)。

ItemReadListener

在讨论之前的跳过逻辑时,有人提到,对跳过的记录进行 log 可能是有益的,以便以后可以处理它们。在读取错误的情况下,可以使用ItemReaderListener来完成,如以下接口定义所示:

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

在每次调用ItemReader之前调用beforeRead方法。每次成功调用 read 之后都会调用afterRead方法,并传递已读取的 item。如果读取时出错,则调用onReadError方法。提供遇到的 exception 以便可以记录它。

与此接口对应的注释是:

  • @BeforeRead

  • @AfterRead

  • @OnReadError

ItemProcessListener

ItemReadListener一样,可以“监听”item 的处理,如以下接口定义所示:

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

ItemProcessor方法在ItemProcessor上的process之前被调用,并被传递给要处理的 item。成功处理 item 后调用afterProcess方法。如果处理时出错,则调用onProcessError方法。遇到 exception 并尝试处理 item,以便记录它们。

与此接口对应的注释是:

  • @BeforeProcess

  • @AfterProcess

  • @OnProcessError

ItemWriteListener

可以使用ItemWriteListener“监听”item 的写入,如以下接口定义所示:

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

ItemWriter方法在ItemWriter上的write之前被调用,并被传递给写入的项目列表。成功编写 item 后调用afterWrite方法。如果写入时出错,则调用onWriteError方法。遇到 exception 并尝试写入 item,以便记录它们。

与此接口对应的注释是:

  • @BeforeWrite

  • @AfterWrite

  • @OnWriteError

SkipListener

ItemReadListenerItemProcessListenerItemWriteListener都提供了通知错误的机制,但 none 通知您实际上已跳过 record。 ,对于 example,即使重试 item 并且成功,也会调用onWriteError。因此,有一个单独的界面用于跟踪跳过的项目,如以下界面定义所示:

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);

}

只要在读取时跳过 item,就会调用onSkipInRead。应该注意的是,回滚可能会导致相同的 item 被注册为多次跳过。在写入时跳过 item 时会调用onSkipInWrite。因为 item 已被成功读取(并且未被跳过),所以它也被提供 item 本身作为参数。

与此接口对应的注释是:

  • @OnSkipInRead

  • @OnSkipInWrite

  • @OnSkipInProcess

SkipListeners 和 Transactions

SkipListener的最常见用例之一是 log 出跳过的 item,以便可以使用另一个批处理 process 甚至人 process 来评估和修复导致跳过的问题。因为很多情况下原始 transaction 可以回滚,Spring Batch 有两个保证:

  • 每个 item 只调用一次适当的跳过方法(取决于发生错误的时间)。

  • 始终在 transaction 提交之前调用SkipListener。这是为了确保 listener 中的任何 transactional 资源调用不会因ItemWriter内的故障而回滚。

1.2. TaskletStep

Chunk-oriented 处理不是Step中 process 的唯一方法。如果Step必须包含简单的存储过程调用怎么办?您可以在过程完成后将调用实现为ItemReader和 return null。但是,这样做有点不自然,因为需要 no-op ItemWriter。 Spring Batch 为此方案提供TaskletStep

Tasklet是一个简单的接口,它有一个方法execute,由TaskletStep重复调用,直到它返回RepeatStatus.FINISHED或抛出 exception 来表示失败。每次调用Tasklet都包含在 transaction 中。 Tasklet实现者可能会调用存储过程,脚本或简单的 SQL 更新语句。

要创建TaskletStep,<tasklet/>元素的'ref'属性应该 reference 一个定义Tasklet object 的 bean。 <tasklet/>中不应使用<chunk/>元素。以下 example 显示了一个简单的 tasklet:

<step id="step1">
    <tasklet ref="myTasklet"/>
</step>

要创建TaskletStep,传递给构建器的tasklet方法的 bean 应实现Tasklet接口。当 building a TaskletStep时,不应该调用chunk。以下 example 显示了一个简单的 tasklet:

@Bean
public Step step1() {
    return this.stepBuilderFactory.get("step1")
                            .tasklet(myTasklet())
                            .build();
}

如果它实现了StepListener接口,TaskletStep会自动将 tasklet 注册为StepListener

1.2.1. TaskletAdapter

ItemReaderItemWriter接口的其他适配器一样,Tasklet接口包含 implementation,允许自身适应任何 pre-existing class:TaskletAdapter。这可能有用的示例是现有的 DAO,用于更新一组记录上的 flag。 TaskletAdapter可用于调用此 class,而无需为Tasklet接口编写适配器,如以下 example 所示:

XML Configuration

<bean id="myTasklet" class="o.s.b.core.step.tasklet.MethodInvokingTaskletAdapter">
    <property name="targetObject">
        <bean class="org.mycompany.FooDao"/>
    </property>
    <property name="targetMethod" value="updateFoo" />
</bean>

Java Configuration

@Bean
public MethodInvokingTaskletAdapter myTasklet() {
        MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter();

        adapter.setTargetObject(fooDao());
        adapter.setTargetMethod("updateFoo");

        return adapter;
}

1.2.2. Example Tasklet Implementation

许多批处理作业包含必须在主处理开始之前完成的步骤,以便在设置各种资源或处理完成后清理这些资源。如果 job 与 files 一起使用很多,则在将它们成功上载到另一个位置后,通常需要在本地删除某些 files。以下 example(取自Spring Batch samples 项目)是Tasklet implementation,只有这样的责任:

public class FileDeletingTasklet implements Tasklet, InitializingBean {

    private Resource directory;

    public RepeatStatus execute(StepContribution contribution,
                                ChunkContext chunkContext) throws Exception {
        File dir = directory.getFile();
        Assert.state(dir.isDirectory());

        File[] files = dir.listFiles();
        for (int i = 0; i < files.length; i++) {
            boolean deleted = files[i].delete();
            if (!deleted) {
                throw new UnexpectedJobExecutionException("Could not delete file " +
                                                          files[i].getPath());
            }
        }
        return RepeatStatus.FINISHED;
    }

    public void setDirectoryResource(Resource directory) {
        this.directory = directory;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(directory, "directory must be set");
    }
}

前面的Tasklet implementation 删除给定目录中的所有 files。应该注意,execute方法只被调用一次。剩下的就是从Step中引用Tasklet

XML Configuration

<job id="taskletJob">
    <step id="deleteFilesInDir">
       <tasklet ref="fileDeletingTasklet"/>
    </step>
</job>

<beans:bean id="fileDeletingTasklet"
            class="org.springframework.batch.sample.tasklet.FileDeletingTasklet">
    <beans:property name="directoryResource">
        <beans:bean id="directory"
                    class="org.springframework.core.io.FileSystemResource">
            <beans:constructor-arg value="target/test-outputs/test-dir" />
        </beans:bean>
    </beans:property>
</beans:bean>

Java Configuration

@Bean
public Job taskletJob() {
        return this.jobBuilderFactory.get("taskletJob")
                                .start(deleteFilesInDir())
                                .build();
}

@Bean
public Step deleteFilesInDir() {
        return this.stepBuilderFactory.get("deleteFilesInDir")
                                .tasklet(fileDeletingTasklet())
                                .build();
}

@Bean
public FileDeletingTasklet fileDeletingTasklet() {
        FileDeletingTasklet tasklet = new FileDeletingTasklet();

        tasklet.setDirectoryResource(new FileSystemResource("target/test-outputs/test-dir"));

        return tasklet;
}

1.3. 控制 Step 流程

由于能够在拥有 job 中一起 group 步骤,因此需要能够控制 job 如何从一个 step“流动”到另一个 step。 Step的失败并不一定意味着Job应该失败。此外,可能存在多种类型的“成功”,其确定接下来应该执行哪个Step。根据Steps的 group 的配置方式,某些步骤甚至可能根本不会被处理。

1.3.1. 顺序流动

最简单的流程方案是 job,其中所有步骤按顺序执行,如下图所示:

图 3.顺序流

这可以通过使用 step 元素的'next'属性来实现,如下面的示例所示:

XML Configuration

<job id="job">
    <step id="stepA" parent="s1" next="stepB" />
    <step id="stepB" parent="s2" next="stepC"/>
    <step id="stepC" parent="s3" />
</job>

Java Configuration

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                                .start(stepA())
                                .next(stepB())
                                .next(stepC())
                                .build();
}

在上面的场景中,'step A'首先运行,因为它是列出的第一个Step。如果'step A'正常完成,则'step B'运行,依此类推。但是,如果'step A'失败,则整个Job失败并且'step B'不执行。

使用 Spring Batch 命名空间,configuration 中列出的第一个 step 始终是Job的第一个 step run。其他 step 元素的 order 无关紧要,但第一个 step 必须始终首先出现在 xml 中。

1.3.2. 条件流

在上面的例子中,只有两种可能性:

  • Step成功,下一个Step应该执行。

  • Step失败,因此Job应该失败。

在许多情况下,这可能就足够了。但是,如果Step的失败应该触发不同的Step,而不是导致失败?下图显示了这样的流程:

图 4.条件流

为了处理更复杂的场景,Spring Batch 命名空间允许在 step 元素中定义过渡元素。一个这样的过渡是next元素。与next属性一样,next元素告诉Job下一个Step执行。但是,与属性不同,在给定的Step上允许任意数量的next元素,并且在失败的情况下没有默认行为。这意味着,如果使用过渡元素,则必须明确定义Step过渡的所有行为。另请注意,单个 step 不能同时具有next属性和transition元素。

next元素指定 pattern 到 match 和 step 接下来执行,如下面的 example 所示:

XML Configuration

<job id="job">
    <step id="stepA" parent="s1">
        <next on="*" to="stepB" />
        <next on="FAILED" to="stepC" />
    </step>
    <step id="stepB" parent="s2" next="stepC" />
    <step id="stepC" parent="s3" />
</job>

Java Configuration

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                                .start(stepA())
                                .on("*").to(stepB())
                                .from(stepA()).on("FAILED").to(stepC())
                                .end()
                                .build();
}

使用 XML configuration 时,transition 元素的on属性使用简单的 pattern-matching scheme 来匹配Step执行的ExitStatus

当使用 java configuration 时,on方法使用一个简单的 pattern-matching scheme 来匹配Step执行的ExitStatus

pattern 中只允许使用两个特殊字符:

  • “*”匹配零个或多个字符

  • “?”恰好匹配一个字符

例如,“c * t”匹配“cat”和“count”,而“c?t”匹配“cat”但不匹配“count”。

虽然Step上的过渡元素数量没有限制,但如果Step执行导致ExitStatus未被元素覆盖,则 framework 抛出 exception 并且Job失败。 framework 自动将从最具体到最不具体的转换命令。这意味着,即使在上面的 example 中将 ordering 交换为“stepA”,ExitStatus的“FAILED”仍会转到“stepC”。

批次状态与退出状态

为条件流配置Job时,了解BatchStatusExitStatus之间的区别非常重要。 BatchStatus是枚举,它是JobExecutionStepExecution的 property,framework 用于_记录JobStep的状态。它可以是以下值之一:COMPLETEDSTARTINGSTARTEDSTOPPINGSTOPPEDFAILEDABANDONEDUNKNOWN。其中大多数是自解释的:COMPLETED是 step 或 job 成功完成时设置的状态,FAILED在失败时设置,等等。

使用 XML configuration 时,以下 example 包含'next'元素:

<next on="FAILED" to="stepB" />

使用 Java Configuration 时,以下 example 包含'on'元素:

...
.from(stepA()).on("FAILED").to(stepB())
...

乍一看,它似乎'on'_re _se 它所属的BatchStatus。但是,它实际上是的ExitStatus。正如 name 所暗示的那样,ExitStatus表示完成执行后Step的状态。

更具体地说,当使用 XML configuration 时,前面的 XML configuration example 中显示的'next'元素会引用ExitStatus的 exit code。

使用 Java configuration 时,前面的 Java configuration example 中显示的'on'方法会引用ExitStatus的 exit code。

在英语中,它说:“如果退出 code 是FAILED,则转到步骤 B”。默认情况下,exit code 始终与StepBatchStatus相同,这就是上面的条目有效的原因。但是,如果 exit code 需要不同呢?一个好的 example 来自 samples 项目中的 skip sample job:

XML Configuration

<step id="step1" parent="s1">
    <end on="FAILED" />
    <next on="COMPLETED WITH SKIPS" to="errorPrint1" />
    <next on="*" to="step2" />
</step>

Java Configuration

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                        .start(step1()).on("FAILED").end()
                        .from(step1()).on("COMPLETED WITH SKIPS").to(errorPrint1())
                        .from(step1()).on("*").to(step2())
                        .end()
                        .build();
}

step1有三种可能性:

  • Step失败,在这种情况下 job 应该失败。

  • Step成功完成。

  • Step已成功完成,但退出 code 为'COMPLETED WITH SKIPS'。在这种情况下,应该使用不同的 step 来处理错误。

上面的 configuration 工作。但是,有些东西需要根据跳过记录的执行条件来更改 exit code,如下例所示:

public class SkipCheckingListener extends StepExecutionListenerSupport {
    public ExitStatus afterStep(StepExecution stepExecution) {
        String exitCode = stepExecution.getExitStatus().getExitCode();
        if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
              stepExecution.getSkipCount() > 0) {
            return new ExitStatus("COMPLETED WITH SKIPS");
        }
        else {
            return null;
        }
    }
}

上面的 code 是一个StepExecutionListener,首先检查以确保Step成功,然后检查StepExecution上的跳过计数是否高于 0.如果满足这两个条件,则返回一个退出 code 为COMPLETED WITH SKIPS的新ExitStatus

1.3.3. 配置停止

在讨论BatchStatus 和 ExitStatus之后,人们可能想知道如何为Job确定BatchStatusExitStatus。虽然这些状态是由 code 执行的Step确定的,但Job的状态是根据 configuration 确定的。

到目前为止,所讨论的所有 job 配置至少有一个没有转换的Step。对于 example,在执行以下 step 之后,Job ends,如下面的 example 所示:

<step id="stepC" parent="s3"/>
@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                                .start(step1())
                                .build();
}

如果没有为Step定义转换,则Job的状态定义如下:

  • 如果Step ends 与ExitStatus FAILED,那么JobBatchStatusExitStatus都是FAILED

  • 否则,JobBatchStatusExitStatus都是COMPLETED

虽然这种终止批处理 job 的方法足以用于某些批处理作业,例如简单的顺序 step job,但可能需要自定义 job-stopping 场景。为此,Spring Batch 提供了三个过渡元素来停止Job(除了我们之前讨论过的下一个元素)。这些停止元素中的每一个都使用特定的BatchStatus来停止Job。重要的是要注意,stop 过渡元素对Job中任何StepsBatchStatusExitStatus都没有影响。这些元素仅影响Job的最终状态。例如,job 中的每个 step 都可能具有FAILED的状态,但 job 的状态为COMPLETED

结束 Step

配置 step 结尾指示JobBatchStatus COMPLETED停止。已完成状态COMPLETEDJob无法重新启动(framework 抛出JobInstanceAlreadyCompleteException)。

使用 XML configuration 时,'end'元素用于此任务。 end元素还允许使用可选的“exit-code”属性,该属性可用于自定义JobExitStatus。如果没有给出'exit-code'属性,那么ExitStatus默认为COMPLETED,以_匹配BatchStatus

使用 Java configuration 时,'end'方法用于此任务。 end方法还允许使用可选的'exitStatus'参数,该参数可用于自定义JobExitStatus。如果没有提供'exitStatus'value,那么ExitStatus默认为COMPLETED,以_匹配BatchStatus

在以下场景中,如果step2失败,则Job将以BatchStatus COMPLETED停止,ExitStatus COMPLETEDstep3不会 run。否则,执行移至step3。请注意,如果step2失败,则Job不可重新启动(因为状态为COMPLETED)。

<step id="step1" parent="s1" next="step2">

<step id="step2" parent="s2">
    <end on="FAILED"/>
    <next on="*" to="step3"/>
</step>

<step id="step3" parent="s3">
@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                                .start(step1())
                                .next(step2())
                                .on("FAILED").end()
                                .from(step2()).on("*").to(step3())
                                .end()
                                .build();
}
Step 失败

在给定点配置 step 失败指示JobBatchStatus FAILED停止。与 end 不同,Job的失败不会阻止Job重新启动。

使用 XML configuration 时,'fail'元素还允许使用可选的'exit-code'属性,该属性可用于自定义JobExitStatus。如果没有给出'exit-code'属性,那么ExitStatus默认为FAILED,以_匹配BatchStatus

在以下场景中,如果step2失败,则Job将以BatchStatus FAILED停止,ExitStatus EARLY TERMINATIONstep3不会执行。否则,执行移至step3。此外,如果step2失败并重新启动Job,则会在step2上再次开始执行。

XML Configuration

<step id="step1" parent="s1" next="step2">

<step id="step2" parent="s2">
    <fail on="FAILED" exit-code="EARLY TERMINATION"/>
    <next on="*" to="step3"/>
</step>

<step id="step3" parent="s3">

Java Configuration

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                        .start(step1())
                        .next(step2()).on("FAILED").fail()
                        .from(step2()).on("*").to(step3())
                        .end()
                        .build();
}
在给定 Step 处停止 Job

将 job 配置为在特定的 step 处停止指示JobBatchStatus STOPPED停止。停止Job可以在处理过程中提供临时 break,以便 operator 可以在重新启动Job之前采取一些操作。

使用 XML configuration 时,'stop'元素需要一个'restart'属性,该属性指定 step 在“Job 重新启动”时应该执行的步骤。

使用 java configuration 时,stopAndRestart方法需要一个'restart'属性,该属性指定 step 在“Job 重新启动”时应该执行的步骤。

在以下场景中,如果step1COMPLETE结束,则 job 将停止。重新启动后,将在step2上开始执行。

<step id="step1" parent="s1">
    <stop on="COMPLETED" restart="step2"/>
</step>

<step id="step2" parent="s2"/>
@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                        .start(step1()).on("COMPLETED").stopAndRestart(step2())
                        .end()
                        .build();
}

1.3.4. 程序化流程决策

在某些情况下,可能需要比ExitStatus更多的信息来决定接下来要执行哪个 step。在这种情况下,可以使用JobExecutionDecider来帮助做出决定,如下面的示例所示:

public class MyDecider implements JobExecutionDecider {
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        String status;
        if (someCondition()) {
            status = "FAILED";
        }
        else {
            status = "COMPLETED";
        }
        return new FlowExecutionStatus(status);
    }
}

在下面的 sample job configuration 中,decision指定要使用的决策程序以及所有转换:

XML Configuration

<job id="job">
    <step id="step1" parent="s1" next="decision" />

    <decision id="decision" decider="decider">
        <next on="FAILED" to="step2" />
        <next on="COMPLETED" to="step3" />
    </decision>

    <step id="step2" parent="s2" next="step3"/>
    <step id="step3" parent="s3" />
</job>

<beans:bean id="decider" class="com.MyDecider"/>

在下面的示例中,使用 Java configuration 时,实现JobExecutionDecider的 bean 直接传递给next调用。

Java Configuration

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                        .start(step1())
                        .next(decider()).on("FAILED").to(step2())
                        .from(decider()).on("COMPLETED").to(step3())
                        .end()
                        .build();
}

1.3.5. 分流

到目前为止描述的每个场景都涉及一个,它在 time 以线性方式执行其中的步骤。除了这种典型的样式,Spring Batch 还允许 job 配置 parallel 流。

XML 命名空间允许您使用'split'元素。如下面的示例所示,'split'元素包含一个或多个'flow'元素,其中可以定义整个单独的流。 “split”元素还可以包含任何前面讨论的过渡元素,例如“next”属性或“next”,“end”或“fail”元素。

<split id="split1" next="step4">
    <flow>
        <step id="step1" parent="s1" next="step2"/>
        <step id="step2" parent="s2"/>
    </flow>
    <flow>
        <step id="step3" parent="s3"/>
    </flow>
</split>
<step id="step4" parent="s4"/>

基于 Java 的 configuration 允许您通过提供的构建器配置拆分。如下面的示例所示,'split'元素包含一个或多个'flow'元素,其中可以定义整个单独的流。 “split”元素还可以包含任何前面讨论的过渡元素,例如“next”属性或“next”,“end”或“fail”元素。

@Bean
public Job job() {
        Flow flow1 = new FlowBuilder<SimpleFlow>("flow1")
                        .start(step1())
                        .next(step2())
                        .build();
        Flow flow2 = new FlowBuilder<SimpleFlow>("flow2")
                        .start(step3())
                        .build();

        return this.jobBuilderFactory.get("job")
                                .start(flow1)
                                .split(new SimpleAsyncTaskExecutor())
                                .add(flow2)
                                .next(step4())
                                .end()
                                .build();
}

1.3.6. 外部化流程定义和作业之间的依赖关系

job 中的部分流可以外部化为单独的 bean 定义,然后是 re-used。有两种方法可以做到这一点。第一种是简单地将流声明为对其他地方定义的流的 reference,如下面的示例所示:

XML Configuration

<job id="job">
    <flow id="job1.flow1" parent="flow1" next="step3"/>
    <step id="step3" parent="s3"/>
</job>

<flow id="flow1">
    <step id="step1" parent="s1" next="step2"/>
    <step id="step2" parent="s2"/>
</flow>

Java Configuration

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                                .start(flow1())
                                .next(step3())
                                .end()
                                .build();
}

@Bean
public Flow flow1() {
        return new FlowBuilder<SimpleFlow>("flow1")
                        .start(step1())
                        .next(step2())
                        .build();
}

如前面的 example 所示,定义外部流的效果是将外部流中的步骤插入到 job 中,就像它们已被内联声明一样。通过这种方式,许多作业可以引用相同的模板流并将这些模板组合成不同的逻辑流。这也是分离各个流的 integration 测试的好方法。

外化流的另一种形式是使用JobStepJobStep类似于FlowStep但实际上为指定的流中的步骤创建并启动单独的 job 执行。

以下 XML 代码段显示JobStep的 example:

XML Configuration

<job id="jobStepJob" restartable="true">
   <step id="jobStepJob.step1">
      <job ref="job" job-launcher="jobLauncher"
          job-parameters-extractor="jobParametersExtractor"/>
   </step>
</job>

<job id="job" restartable="true">...</job>

<bean id="jobParametersExtractor" class="org.spr...DefaultJobParametersExtractor">
   <property name="keys" value="input.file"/>
</bean>

以下 Java 代码段显示JobStep的 example:

Java Configuration

@Bean
public Job jobStepJob() {
        return this.jobBuilderFactory.get("jobStepJob")
                                .start(jobStepJobStep1(null))
                                .build();
}

@Bean
public Step jobStepJobStep1(JobLauncher jobLauncher) {
        return this.stepBuilderFactory.get("jobStepJobStep1")
                                .job(job())
                                .launcher(jobLauncher)
                                .parametersExtractor(jobParametersExtractor())
                                .build();
}

@Bean
public Job job() {
        return this.jobBuilderFactory.get("job")
                                .start(step1())
                                .build();
}

@Bean
public DefaultJobParametersExtractor jobParametersExtractor() {
        DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();

        extractor.setKeys(new String[]{"input.file"});

        return extractor;
}

job 参数提取器是一种策略,用于确定StepExecutionContext如何转换为JobParameters,是 run。当您想要一些更精细的选项来监视和报告作业和步骤时,JobStep非常有用。使用JobStep通常也是一个很好的答案:“如何在作业之间创建依赖关系?”这是一个很好的方法,可以将大型系统扩展为更小的模块并控制作业流。

1.4. Job 和 Step 属性的晚 Binding

前面显示的 XML 和平面文件示例都使用 Spring Resource抽象来获取文件。这是有效的,因为ResourcegetFile方法,返回java.io.File。可以使用标准 Spring 构造配置 XML 和平面文件资源,如以下 example 所示:

XML Configuration

<bean id="flatFileItemReader"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource"
              value="file://outputs/file.txt" />
</bean>

Java Configuration

@Bean
public FlatFileItemReader flatFileItemReader() {
        FlatFileItemReader<Foo> reader = new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource("file://outputs/file.txt"))
                        ...
}

前面的Resource从指定的文件系统位置加载文件。请注意,绝对位置必须以 double 斜杠(//)开头。在大多数 Spring applications 中,这个解决方案已经足够好了,因为这些资源的名称在 compile time 时是已知的。但是,在批处理方案中,可能需要在运行时将文件 name 确定为 job 的参数。这可以使用'-D'参数来解决系统 property。

以下 XML 代码段显示了如何从 property 读取文件 name:

XML Configuration

<bean id="flatFileItemReader"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="${input.file.name}" />
</bean>

以下 Java 代码段显示了如何从 property 读取文件 name:

Java Configuration

@Bean
public FlatFileItemReader flatFileItemReader(@Value("${input.file.name}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

此解决方案工作所需的全部内容都是系统参数(例如-Dinput.file.name="file://outputs/file.txt")。

虽然这里可以使用PropertyPlaceholderConfigurer,但是如果系统 property 始终设置是没有必要的,因为 Spring 中的ResourceEditor已经过滤并在系统 properties 上替换占位符。

通常,在批处理设置中,最好在 job 的JobParameters中对文件 name 进行参数化,而不是通过系统 properties,并以这种方式访问它们。为此,Spring Batch 允许各种JobStep属性的后期绑定,如下面的代码片段所示:

XML Configuration

<bean id="flatFileItemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="#{jobParameters['input.file.name']}" />
</bean>

Java Configuration

@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

JobExecutionStepExecution level ExecutionContext都可以以相同的方式访问,如以下示例所示:

XML Configuration

<bean id="flatFileItemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="#{jobExecutionContext['input.file.name']}" />
</bean>

XML Configuration

<bean id="flatFileItemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="#{stepExecutionContext['input.file.name']}" />
</bean>

Java Configuration

@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.file.name']}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

Java Configuration

@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{stepExecutionContext['input.file.name']}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

任何使用 late-binding 的 bean 都必须使用 scope =“ step”声明。有关更多信息,请参见Step 范围。

1.4.1. Step 范围

上面所有后期的 binding 示例都在 bean 定义中声明了“step”的范围,如下面的示例所示:

XML Configuration

<bean id="flatFileItemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="#{jobParameters[input.file.name]}" />
</bean>

Java Configuration

@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input.file.name]}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

在 order 中使用Step的范围来使用延迟 binding,因为在Step启动之前 bean 实际上不能被实例化,以允许找到属性。因为默认情况下它不是 Spring 容器的一部分,所以必须通过使用batch命名空间或通过为StepScope显式包含 bean 定义或使用@EnableBatchProcessing annotation 显式添加范围。仅使用其中一种方法。以下 example 使用batch命名空间:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="...">
<batch:job .../>
...
</beans>

以下 example 显式包含 bean 定义:

<bean class="org.springframework.batch.core.scope.StepScope" />

1.4.2. 工作范围

在 Spring Batch 3.0 中引入的Job范围类似于 configuration 中的Step范围,但是Job context 的范围,因此每个 running job 只有一个这样的 bean 实例。此外,还提供了对使用#{..}占位符从JobContext访问的 references 的后期绑定的支持。使用此 feature,可以从 job 或 job 执行 context 和 job 参数中提取 bean properties,如以下示例所示:

XML Configuration

<bean id="..." class="..." scope="job">
    <property name="name" value="#{jobParameters[input]}" />
</bean>

XML Configuration

<bean id="..." class="..." scope="job">
    <property name="name" value="#{jobExecutionContext['input.name']}.txt" />
</bean>

Java Configuration

@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

Java Configuration

@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.name']}") String name) {
        return new FlatFileItemReaderBuilder<Foo>()
                        .name("flatFileItemReader")
                        .resource(new FileSystemResource(name))
                        ...
}

因为默认情况下它不是 Spring 容器的一部分,所以必须通过使用batch命名空间,通过为 JobScope 显式包含 bean 定义,或使用@EnableBatchProcessing annotation(但不是全部)来显式添加范围。以下 example 使用batch命名空间:

<beans xmlns="http://www.springframework.org/schema/beans"
                  xmlns:batch="http://www.springframework.org/schema/batch"
                  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                  xsi:schemaLocation="...">

<batch:job .../>
...
</beans>

以下 example 包含一个明确定义JobScope的 bean:

<bean class="org.springframework.batch.core.scope.JobScope" />