5. 配置 Step

正如批处理 Domain 语言中所讨论的,Step是一个 domain object,它封装了批处理 job 的独立顺序阶段,并包含定义和控制实际批处理所需的所有信息。这是一个必然模糊的描述,因为任何给定Step的内容由开发人员自行决定编写Job。 Step 可以像开发人员所希望的那样简单或复杂。一个简单的Step可能会将数据从文件加载到数据库中,几乎不需要 code。 (取决于使用的 implementations)更复杂的Step可能有复杂的业务规则,作为处理的一部分应用。

5.1 Chunk-Oriented 处理

Spring Batch 在其最常见的 implementation 中使用了“Chunk Oriented”处理方式。面向块的处理是指在 time 读取数据,并在 transaction 边界内创建将写出的“块”。一个 item 从ItemReader读入,传递给ItemProcessor,并聚合。一旦读取的项目数等于提交间隔,整个块就会通过 ItemWriter 写出,然后提交 transaction。

下面是上面显示的相同概念的 code 表示:

List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
    Object item = itemReader.read()
    Object processedItem = itemProcessor.process(item);
    items.add(processedItem);
}
itemWriter.write(items);

5.1.1 配置 Step

尽管Step所需的依赖项列表相对较短,但它是一个非常复杂的 class,它可能包含许多协作者。为了简化 configuration,可以使用 Spring Batch 命名空间:

<job id="sampleJob" job-repository="jobRepository">
    <step id="step1">
        <tasklet transaction-manager="transactionManager">
            <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
        </tasklet>
    </step>
</job>

上面的 configuration 表示创建 item-oriented step 所需的唯一依赖项:

  • reader - 提供处理项目的ItemReader

  • writer - 处理ItemReader提供的项目的ItemWriter

  • transaction-manager - Spring 的PlatformTransactionManager将用于在处理期间开始和提交 transactions。

  • job-repository - JobRepository将用于在处理期间(就在提交之前)定期存储StepExecutionExecutionContext。对于 in-line <step/>(在<job/>中定义的一个),它是<job/>元素的属性;对于独立的 step,它被定义为<tasklet/>的一个属性。

  • commit-interval - 提交 transaction 之前将处理的项目数。

需要注意的是,job-repository 默认为“jobRepository”,transaction-manager 默认为“transactionManger”。此外,ItemProcessor是可选的,不是必需的,因为 item 可以直接从 reader 传递给 writer。

5.1.2 从 Parent Step 继承

如果Step的 group 共享相似的配置,那么定义一个“parent”Step可能会有所帮助,具体Step可以从中继承 properties。与 Java 中的 class 继承类似,“ child”Step将其元素和属性与 parent 组合在一起。 child 还将覆盖 parent 的Step中的任何一个。

在下面的示例中,Step“concreteStep1”将从“parentStep”继承。它将使用'itemReader','itemProcessor','itemWriter',startLimit=5 和 allowStartIfComplete=true 进行实例化。此外,commitInterval 将为'5',因为它被“concreteStep1”覆盖:

<step id="parentStep">
    <tasklet allow-start-if-complete="true">
        <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
    </tasklet>
</step>

<step id="concreteStep1" parent="parentStep">
    <tasklet start-limit="5">
        <chunk processor="itemProcessor" commit-interval="5"/>
    </tasklet>
</step>

job 元素中的 step 仍然需要 id 属性。这有两个原因:

  • 持久化 StepExecution 时,id 将用作 step name。如果 job 中的多个 step 中引用了相同的独立 step,则会发生错误。

  • 当 creating job 流动时,如本章后面所述,下一个属性应该是指流中的 step,而不是独立的 step。

Abstract Step

有时可能需要定义 parent Step,它不是完整的Step configuration。例如,如果 reader,writer 和 tasklet 属性不在Step configuration 中,则初始化将失败。如果必须在没有这些 properties 的情况下定义 parent,则应使用“abstract”属性。 “抽象”Step将不会被实例化;它仅用于扩展。

在下面的示例中,如果未将Step“abstractParentStep”声明为抽象,则不会实例化它。 Step“concreteStep2”将包含“itemReader”,“itemWriter”和 commitInterval=10。

<step id="abstractParentStep" abstract="true">
    <tasklet>
        <chunk commit-interval="10"/>
    </tasklet>
</step>

<step id="concreteStep2" parent="abstractParentStep">
    <tasklet>
        <chunk reader="itemReader" writer="itemWriter"/>
    </tasklet>
</step>

合并 Lists

Step上的一些可配置元素是 lists;例如<listeners/>元素。如果 parent 和 child Step都声明<listeners/>元素,那么 child 的列表将覆盖 parent。在 order 中允许 child 将其他 listeners 添加到 parent 定义的列表中,每个列表元素都有一个“merge”属性。如果元素指定 merge =“ true”,那么 child 的列表将与 parent 组合而不是覆盖它。

在下面的例子中,将创建Step“concreteStep3”两个 listeners:listenerOnelistenerTwo

<step id="listenersParentStep" abstract="true">
    <listeners>
        <listener ref="listenerOne"/>
    <listeners>
</step>

<step id="concreteStep3" parent="listenersParentStep">
    <tasklet>
        <chunk reader="itemReader" writer="itemWriter" commit-interval="5"/>
    </tasklet>
    <listeners merge="true">
        <listener ref="listenerTwo"/>
    <listeners>
</step>

5.1.3 提交间隔

如上所述,step 读入和写出项目,使用提供的PlatformTransactionManager定期提交。如果 commit-interval 为 1,它将在写完每个 item 之后提交。这在许多情况下都不太理想,因为开始和提交 transaction 是昂贵的。理想情况下,最好在每个 transaction 中处理尽可能多的项目,这完全取决于正在处理的数据类型和 step 与之交互的资源。因此,可以配置在提交中处理的项目数。

<job id="sampleJob">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
        </tasklet>
    </step>
</job>

在上面的 example 中,每个 transaction 中将处理 10 个项目。在处理开始时,transaction 开始,并且在ItemReader上调用每个 time 读取,计数器递增。当它达到 10 时,聚合项目列表将传递给ItemWriter,transaction 将被提交。

5.1.4 为重启配置 Step

第 4 章,配置和运行 Job中,讨论了重新启动Job。重启对步骤有很多影响,因此可能需要一些特定的 configuration。

设置 StartLimit

在许多情况下,您可能希望控制Step可以启动的次数。例如,可能需要配置一个特定的Step,以便它只运行一次,因为它会使某些必须手动修复的资源无效,然后再运行 run。这可以在 step level 上配置,因为不同的步骤可能有不同的要求。只能执行一次的Step可以作为Job的一部分存在,而可以无限地运行Step。以下是 example 起始限制 configuration:

<step id="step1">
    <tasklet start-limit="1">
        <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
    </tasklet>
</step>

上面的简单 step 只能运行一次。试图再次运行它会导致抛出 exception。应该注意 start-limit 的默认 value 是Integer.MAX_VALUE

重新启动已完成的 step

在可重新启动的 job 的情况下,可能有一个或多个步骤应始终为 run,无论它们是否在第一个 time 成功。 example 可以是验证 step,也可以是在处理之前清理资源的Step。在重新启动的 job 的正常处理期间,将跳过任何状态为“COMPLETED”的 step,意味着它已成功完成。将 allow-start-if-complete 设置为“true”会覆盖此项,以便 step 始终为 run:

<step id="step1">
    <tasklet allow-start-if-complete="true">
        <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
    </tasklet>
</step>

Step 重新启动 Configuration Example

<job id="footballJob" restartable="true">
    <step id="playerload" next="gameLoad">
        <tasklet>
            <chunk reader="playerFileItemReader" writer="playerWriter"
                   commit-interval="10" />
        </tasklet>
    </step>
    <step id="gameLoad" next="playerSummarization">
        <tasklet allow-start-if-complete="true">
            <chunk reader="gameFileItemReader" writer="gameWriter"
                   commit-interval="10"/>
        </tasklet>
    </step>
    <step id="playerSummarization">
        <tasklet start-limit="3">
            <chunk reader="playerSummarizationSource" writer="summaryWriter"
                   commit-interval="10"/>
        </tasklet>
    </step>
</job>

上面的 example configuration 用于 job,它加载有关足球比赛的信息并对其进行总结。它包含三个步骤:playerLoad,gameLoad 和 playerSummarization。 playerLoad Step从平面文件加载玩家信息,而 gameLoad Step对游戏也是如此。最终的Step,playerSummarization,然后根据提供的游戏总结每个玩家的统计数据。假设'playerLoad'加载的文件只能加载一次,但'gameLoad'将加载在特定目录中找到的任何游戏,在成功加载到数据库后删除它们。因此,playerLoad Step不包含其他 configuration。它可以几乎无限启动,如果完成将被跳过。然而,'gameLoad'Step需要每 time 运行一次,以防自上次执行以来删除了额外的 files。在 order 中将'allow-start-if-complete'设置为'true'以始终启动。 (假设加载的数据库表游戏上有一个 process 指示符,以确保摘要 step 可以正确找到新游戏)。汇总Step,这是Job中最重要的,被配置为具有 3 的起始限制。这很有用,因为如果 step 连续失败,则会将新的 exit code 返回给控制 job 执行的 operators,并且在进行人工干预之前,不允许重新开始。

这个 job 纯粹是出于示例目的,与 samples 项目中找到的 footballJob 不同。

Run 1:

  • playerLoad 被执行并成功完成,将 400 名玩家添加到'PLAYERS'table。

  • gameLoad 被执行并处理 11 个 files 值的游戏数据,将其内容加载到'GAMES'table 中。

  • playerSummarization 开始处理并在 5 分钟后失败。

Run 2:

  • playerLoad 不是 run,因为它已经成功完成,allow-start-if-complete 是'false'(默认值)。

  • gameLoad 再次执行并处理另外 2 个 files,将它们的内容加载到'GAMES'table 中(带有 process 指示符,指示它们尚未被处理)

  • playerSummarization 开始处理所有剩余的游戏数据(使用 process 指标过滤)并在 30 分钟后再次失败。

Run 3:

  • playerLoad 不是 run,因为它已经成功完成,allow-start-if-complete 是'false'(默认值)。

  • gameLoad 再次执行并处理另外 2 个 files,将它们的内容加载到'GAMES'table 中(带有 process 指示符,指示它们尚未被处理)

  • playerSummarization 没有启动,job 被立即杀死,因为这是 playerSummarization 的第三次执行,它的限制只有 2.必须提高限制,或者Job必须作为新的JobInstance执行。

5.1.5 配置跳过逻辑

在许多情况下,处理过程中遇到的错误不应导致Step失败,但应该跳过。这通常是一个必须由了解数据本身及其含义的人做出的决定。例如,财务数据可能无法跳过,因为它会导致转账,这需要完全准确。另一方面,加载供应商列表可能允许跳过。如果由于格式不正确或缺少必要信息而未加载供应商,则可能不会出现问题。通常也会记录这些不良记录,稍后在讨论 listeners 时会对此进行介绍。

<step id="step1">
   <tasklet>
      <chunk reader="flatFileItemReader" writer="itemWriter"
             commit-interval="10" skip-limit="10">
         <skippable-exception-classes>
            <include class="org.springframework.batch.item.file.FlatFileParseException"/>
         </skippable-exception-classes>
      </chunk>
   </tasklet>
</step>

在这个 example 中,使用FlatFileItemReader,如果在任何时候抛出FlatFileParseException,它将被跳过并计算总跳过限制 10.在读取,process 和 step 执行内写入时跳过单独的计数,并且限制适用于所有人。达到跳过限制后,找到的下一个 exception 将导致 step 失败。

上面的 example 的一个问题是除了FlatFileParseException之外的任何其他 exception 都会导致Job失败。在某些情况下,这可能是正确的行为。但是,在其他情况下,可能更容易识别哪些 exceptions 应该导致失败并跳过其他所有内容:

<step id="step1">
    <tasklet>
        <chunk reader="flatFileItemReader" writer="itemWriter"
               commit-interval="10" skip-limit="10">
            <skippable-exception-classes>
                <include class="java.lang.Exception"/>
                <exclude class="java.io.FileNotFoundException"/>
            </skippable-exception-classes>
        </chunk>
    </tasklet>
</step>

通过'包含'java.lang.Exception作为可跳过的 exception class,configuration 指示所有Exception都是可跳过的。但是,通过'排除'java.io.FileNotFoundException,configuration 将可跳过的 exception classes 列表细化为除FileNotFoundException之外的所有Exception。遇到任何排除的 exception classes 都会致命(i.e.不会被跳过)。

对于遇到的任何 exception,可跳过性将由 class 层次结构中最近的超类确定。任何未分类的 exception 都将被视为“致命”。 <include/><exclude/>元素的 order 无关紧要。

5.1.6 配置重试逻辑

在大多数情况下,您希望 exception 导致跳过或Step失败。但是,并非所有 exceptions 都是确定性的。如果在阅读时遇到FlatFileParseException,则会始终为该 record 抛出;重置ItemReader无济于事。但是,对于其他 exceptions,例如DeadlockLoserDataAccessException,表示当前 process 已尝试更新另一个 process 持有锁的 record,等待并再次尝试可能会导致成功。在这种情况下,应配置重试:

<step id="step1">
   <tasklet>
      <chunk reader="itemReader" writer="itemWriter"
             commit-interval="2" retry-limit="3">
         <retryable-exception-classes>
            <include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
         </retryable-exception-classes>
      </chunk>
   </tasklet>
</step>

Step允许限制单个 item 可以重试的次数,以及一个“可重试”的 exceptions 列表。有关重试如何工作的更多详细信息,请参见第 9 章,重试

5.1.7 控制回滚

默认情况下,无论是重试还是跳过,从ItemWriter抛出的任何 exceptions 都将导致Step控制的 transaction 回滚。如果如上所述配置 skip,则从ItemReader抛出的 exceptions 不会导致回滚。但是,在许多情况下,从ItemWriter抛出的 exceptions 不应导致回滚,因为没有执行任何操作来使 transaction 无效。因此,Step可以配置一个不应导致回滚的 exceptions 列表。

<step id="step1">
   <tasklet>
      <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
      <no-rollback-exception-classes>
         <include class="org.springframework.batch.item.validator.ValidationException"/>
      </no-rollback-exception-classes>
   </tasklet>
</step>

Transactional Readers

ItemReader的基本 contract 是它仅向前。 step 缓冲 reader 输入,因此在回滚的情况下,项目不需要_re来自 reader。但是,在某些情况下,reader 构建在 transactional 资源之上,例如 JMS 队列。在这种情况下,由于队列与回滚的 transaction 绑定,因此将从队列中提取的消息将重新打开。因此,step 可以配置为不缓冲项目:

<step id="step1">
    <tasklet>
        <chunk reader="itemReader" writer="itemWriter" commit-interval="2"
               is-reader-transactional-queue="true"/>
    </tasklet>
</step>

5.1.8 Transaction 属性

Transaction 属性可用于控制隔离,传播和超时设置。有关设置 transaction 属性的更多信息,请参阅 spring 核心文档。

<step id="step1">
    <tasklet>
        <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
        <transaction-attributes isolation="DEFAULT"
                                propagation="REQUIRED"
                                timeout="30"/>
    </tasklet>
</step>

5.1.9 使用 Step 注册 ItemStreams

step 必须在其生命周期的必要点处理ItemStream回调。 (有关ItemStream接口的更多信息,请参阅第 6.4 节,“ItemStream”)如果 step 失败并且可能需要重新启动,这是至关重要的,因为ItemStream接口是 step 获取执行之间持久 state 所需的信息。

如果ItemReaderItemProcessorItemWriter本身实现ItemStream接口,则会自动注册这些接口。任何其他流需要单独注册。这通常是存在间接依赖关系的情况,例如委托被注入 reader 和 writer。可以通过'streams'元素在Step上注册流,如下所示:

<step id="step1">
    <tasklet>
        <chunk reader="itemReader" writer="compositeWriter" commit-interval="2">
            <streams>
                <stream ref="fileItemWriter1"/>
                <stream ref="fileItemWriter2"/>
            </streams>
        </chunk>
    </tasklet>
</step>

<beans:bean id="compositeWriter"
            class="org.springframework.batch.item.support.CompositeItemWriter">
    <beans:property name="delegates">
        <beans:list>
            <beans:ref bean="fileItemWriter1" />
            <beans:ref bean="fileItemWriter2" />
        </beans:list>
    </beans:property>
</beans:bean>

在上面的 example 中,CompositeItemWriter不是ItemStream,但它的两个代理都是。因此,必须在 framework 中将委托 writers 显式注册为 order 中的流才能正确处理它们。 ItemReader不需要显式注册为流,因为它是Step的直接 property。 step 现在可以重新启动,reader 和 writer 的 state 将在失败的 event 中正确保存。

5.1.10 拦截 Step 执行

Job一样,在执行Step期间有许多 events,用户可能需要执行某些功能。对于 example,在 order 中写出需要页脚的平面文件,ItemWriter需要在Step完成时得到通知,以便页脚可以写入。这可以通过许多Step范围的 listener 中的一个来完成。

实现StepListener的 extensions 之一的任何 class(但不是那个接口本身,因为它是空的)可以通过 listeners 元素应用于 step。 listeners 元素在 step,tasklet 或 chunk 声明中有效。建议您在 level 中声明_liste 这个 function 应用它,或者如果它是 multi-featured(e.g. StepExecutionListenerItemReadListener),然后在它应用的最细粒度 level 上声明它(给出 example 中的块)。

<step id="step1">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"/>
        <listeners>
            <listener ref="chunkListener"/>
        </listeners>
    </tasklet>
</step>

如果使用命名空间<step>元素或*StepFactoryBean工厂之一,,ItemWriterItemProcessor本身实现StepListener接口之一将自动注册Step。这仅适用于直接注入Step的组件:如果 listener 嵌套在另一个 component 中,则需要显式注册(如上所述)。

除了StepListener接口之外,还提供了 annotations 来解决相同的问题。普通的旧 Java objects 可以使用带有这些注释的方法,然后将这些方法转换为相应的StepListener类型。注释ItemReaderItemWriterTasklet等块组件的自定义_implement 也是常见的。 XML 解析器为<listener/>元素分析 annotations,因此您需要做的就是使用 XML 命名空间向 step 注册 listeners。

StepExecutionListener

StepExecutionListener代表Step执行的最通用的 listener。它允许在Step开始之前和之后有_end 通知,无论它是正常结束还是失败:

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

ExitStatus是 order 中_ret 的 return 类型,以允许 listeners 修改完成Step时返回的 exit code。

与此接口对应的注释是:

  • @BeforeStep

  • @AfterStep

ChunkListener

块被定义为在 transaction 范围内处理的项目。在每个提交间隔提交 transaction,提交一个“块”。在块开始处理之前或块成功完成之后,ChunkListener可用于执行逻辑:

public interface ChunkListener extends StepListener {

    void beforeChunk();
    void afterChunk();

}

方法在 transaction 启动后调用,但在ItemReader上调用read之前调用。相反,在提交了块之后调用afterChunk(如果存在回滚则根本不调用)。

与此接口对应的注释是:

  • @BeforeChunk

  • @AfterChunk

当没有块声明时,可以应用ChunkListenerTaskletStep负责调用ChunkListener,因此它也适用于 non-item-oriented tasklet(在 tasklet 之前和之后调用)。

ItemReadListener

在讨论上面的跳过逻辑时,有人提到过 log 跳过的记录可能是有益的,这样他们以后就可以处理了。在读取错误的情况下,可以使用ItemReaderListener:来完成

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

ItemReader上每次调用read之前都会调用beforeRead方法。每次成功调用read后都会调用afterRead方法,并将传递已读取的 item。如果在读取时出错,则会调用onReadError方法。将提供遇到的 exception 以便记录它。

与此接口对应的注释是:

  • @BeforeRead

  • @AfterRead

  • @OnReadError

ItemProcessListener

ItemReadListener一样,item 的处理可以“收听”:

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

beforeProcess方法将在ItemProcessor上的process之前调用,并传递将被处理的 item。成功处理 item 后将调用afterProcess方法。如果处理时出错,将调用onProcessError方法。遇到 exception 并且将提供尝试处理的 item,以便可以记录它们。

与此接口对应的注释是:

  • @BeforeProcess

  • @AfterProcess

  • @OnProcessError

ItemWriteListener

使用ItemWriteListener可以“听”item 的写作:

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

beforeWrite方法将在ItemWriter之前的write之前调用,并且将被传递给将要写入的 item。成功编写 item 后将调用afterWrite方法。如果写入时出错,则会调用onWriteError方法。遇到 exception 并且将提供尝试写入的 item,以便可以记录它们。

与此接口对应的注释是:

  • @BeforeWrite

  • @AfterWrite

  • @OnWriteError

SkipListener

ItemReadListenerItemProcessListenerItemWriteListener都提供了通知错误的机制,但 none 将通知您实际上已跳过 record。 onWriteError,对于 example,即使重试 item 并且成功,也会调用onWriteError。因此,有一个单独的界面用于跟踪跳过的项目:

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);

}

只要在读取时跳过 item,就会调用onSkipInRead。应该注意的是,回滚可能会导致相同的 item 被注册为多次跳过。在写入时跳过 item 时将调用onSkipInWrite。因为 item 已被成功读取(并且未被跳过),所以它也被提供 item 本身作为参数。

与此接口对应的注释是:

  • @OnSkipInRead

  • @OnSkipInWrite

  • @OnSkipInProcess

SkipListeners 和 Transactions

SkipListener的最常见用例之一是 log 出跳过的 item,以便可以使用另一个批处理 process 甚至人 process 来评估和修复导致跳过的问题。因为很多情况下原始 transaction 可以回滚,Spring Batch 有两个保证:

  • 适当的跳过方法(取决于发生错误的时间)将仅按 item 调用一次。

  • 始终在 transaction 提交之前调用SkipListener。这是为了确保 listener 中的任何 transactional 资源调用不会因ItemWriter内的故障而回滚。

5.2 TaskletStep

Chunk-oriented 处理不是Step中 process 的唯一方法。如果Step必须作为简单的存储过程调用,该怎么办?您可以在过程完成后将调用实现为ItemReader和 return null,但由于需要 no-op ItemWriter,因此有点不自然。 Spring Batch 为此方案提供了TaskletStep

Tasklet是一个简单的接口,它有一个方法execute,它将被TaskletStep重复调用,直到它返回RepeatStatus.FINISHED或抛出 exception 来表示失败。每次调用Tasklet都包含在 transaction 中。 Tasklet实现者可能会调用存储过程,脚本或简单的 SQL 更新语句。要创建TaskletStep,<tasklet/>元素的'ref'属性应该 reference 一个定义Tasklet object 的 bean;在<tasklet/>中不应使用<chunk/>元素:

<step id="step1">
    <tasklet ref="myTasklet"/>
</step>

如果实现此接口,TaskletStep将自动将 tasklet 注册为StepListener

5.2.1 TaskletAdapter

ItemReaderItemWriter接口的其他适配器一样,Tasklet接口包含 implementation,允许自身适应任何 pre-existing class:TaskletAdapter。这可能有用的示例是现有的 DAO,用于更新一组记录上的 flag。 TaskletAdapter可用于调用此 class,而无需为Tasklet接口编写适配器:

<bean id="myTasklet" class="o.s.b.core.step.tasklet.MethodInvokingTaskletAdapter">
    <property name="targetObject">
        <bean class="org.mycompany.FooDao"/>
    </property>
    <property name="targetMethod" value="updateFoo" />
</bean>

5.2.2 Example Tasklet Implementation

许多批处理作业包含必须在主处理开始之前完成的步骤,以便在设置各种资源或处理完成后清理这些资源。如果 job 与 files 一起使用很多,则在将它们成功上载到另一个位置后,通常需要在本地删除某些 files。下面的 example 取自 Spring Batch samples 项目,是一个Tasklet implementation,只有这样的责任:

public class FileDeletingTasklet implements Tasklet, InitializingBean {

    private Resource directory;

    public RepeatStatus execute(StepContribution contribution,
                                ChunkContext chunkContext) throws Exception {
        File dir = directory.getFile();
        Assert.state(dir.isDirectory());

        File[] files = dir.listFiles();
        for (int i = 0; i < files.length; i++) {
            boolean deleted = files[i].delete();
            if (!deleted) {
                throw new UnexpectedJobExecutionException("Could not delete file " +
                                                          files[i].getPath());
            }
        }
        return RepeatStatus.FINISHED;
    }

    public void setDirectoryResource(Resource directory) {
        this.directory = directory;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(directory, "directory must be set");
    }
}

上面的Tasklet implementation 将删除给定目录中的所有 files。应该注意的是,execute方法只会被调用一次。剩下的就是从Step中引用Tasklet

<job id="taskletJob">
    <step id="deleteFilesInDir">
       <tasklet ref="fileDeletingTasklet"/>
    </step>
</job>

<beans:bean id="fileDeletingTasklet"
            class="org.springframework.batch.sample.tasklet.FileDeletingTasklet">
    <beans:property name="directoryResource">
        <beans:bean id="directory"
                    class="org.springframework.core.io.FileSystemResource">
            <beans:constructor-arg value="target/test-outputs/test-dir" />
        </beans:bean>
    </beans:property>
</beans:bean>

5.3 控制步骤流程

由于能够在拥有 job 中将 group 步骤组合在一起,因此需要能够控制 job 如何从一个 step 流向另一个 step。 Step的失败并不一定意味着Job应该失败。此外,可能存在多种类型的“成功”,其确定接下来应该执行哪个Step。根据如何配置 group 步骤,某些步骤甚至可能根本不会被处理。

5.3.1 顺序流量

最简单的流程方案是 job,其中所有步骤按顺序执行:

这可以使用 step 元素的'next'属性来实现:

<job id="job">
    <step id="stepA" parent="s1" next="stepB" />
    <step id="stepB" parent="s2" next="stepC"/>
    <step id="stepC" parent="s3" />
</job>

在上面的场景中,'step A'将首先执行,因为它是列出的第一个Step。如果'step A'正常完成,那么'step B'将执行,依此类推。但是,如果'step A'失败,那么整个Job将失败并且'step B'将不会执行。

使用 Spring Batch 命名空间,configuration 中列出的第一个 step 将始终是Job执行的第一个 step。其他 step 元素的 order 无关紧要,但第一个 step 必须始终首先出现在 xml 中。

5.3.2 条件流

在上面的例子中,只有两种可能性:

  • Step成功,下一个Step应该执行。

  • Step失败,因此Job应该失败。

在许多情况下,这可能就足够了。但是,如果Step的失败应该触发不同的Step,而不是导致失败?


为了处理更复杂的场景,Spring Batch 命名空间允许在 step 元素中定义过渡元素。一个这样的过渡是“下一个”元素。与“next”属性一样,“next”元素将告诉Job下一个Step执行哪个。但是,与属性不同,在给定的Step上允许任意数量的“next”元素,并且在失败的情况下没有默认行为。这意味着如果使用过渡元素,则必须明确定义Step过渡的所有行为。另请注意,单个 step 不能同时具有“next”属性和 transition 元素。

下一个元素指定 pattern 到 match 和 step 来执行 next:

<job id="job">
    <step id="stepA" parent="s1">
        <next on="*" to="stepB" />
        <next on="FAILED" to="stepC" />
    </step>
    <step id="stepB" parent="s2" next="stepC" />
    <step id="stepC" parent="s3" />
</job>

过渡元素的“on”属性使用简单的 pattern-matching scheme 来匹配Step的执行所产生的ExitStatus。 pattern 中只允许使用两个特殊字符:

  • “*”将为零个或多个字符

  • “?”将匹配一个字符

例如,“c * t”将 match“cat”和“count”,而“c?t”将 match“cat”但不是“count”。

虽然Step上的转换元素数量没有限制,但如果Step的执行导致ExitStatus未被元素覆盖,那么 framework 将抛出 exception 并且Job将失败。 framework 将自动 order 从最具体到最不具体的转换。这意味着即使在上面的 example 中将元素换成“stepA”,ExitStatus的“FAILED”仍会转到“stepC”。

批次状态与退出状态

为条件流配置Job时,了解BatchStatusExitStatus之间的区别非常重要。 BatchStatus是枚举,它是JobExecutionStepExecution的 property,framework 用于_记录JobStep的状态。它可以是以下值之一:已完成,启动,启动,停止,停止,失败,放弃或未知。其中大多数是自解释的:COMPLETED 是 step 或 job 成功完成时设置的状态,FAILED 失败时设置,等等。上面的 example 包含以下'next'元素:

<next on="FAILED" to="stepB" />

乍一看,似乎'on'属性引用它所属的StepBatchStatus。但是,它实际上是的ExitStatus。正如 name 所暗示的那样,ExitStatus表示完成执行后Step的状态。更具体地说,上面的'next'元素引用了ExitStatus的 exit code。要用英文写,它会说:“如果退出 code 失败,请转到步骤 B”。默认情况下,exit code 始终与 Step 的BatchStatus相同,这就是上面的条目有效的原因。但是,如果 exit code 需要不同呢?一个好的 example 来自 samples 项目中的 skip sample job:

<step id="step1" parent="s1">
    <end on="FAILED" />
    <next on="COMPLETED WITH SKIPS" to="errorPrint1" />
    <next on="*" to="step2" />
</step>

上述 step 有三种可能:

  • Step失败,在这种情况下 job 应该失败。

  • Step成功完成。

  • Step已成功完成,但退出 code 为'COMPLETED WITH SKIPS'。在这种情况下,应该使用不同的 step 来处理错误。

上面的 configuration 会有效。但是,有些东西需要根据跳过记录的执行条件来更改 exit code:

public class SkipCheckingListener extends StepExecutionListenerSupport {
    public ExitStatus afterStep(StepExecution stepExecution) {
        String exitCode = stepExecution.getExitStatus().getExitCode();
        if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
              stepExecution.getSkipCount() > 0) {
            return new ExitStatus("COMPLETED WITH SKIPS");
        }
        else {
            return null;
        }
    }
}

上面的 code 是一个StepExecutionListener,首先检查以确保Step成功,然后如果StepExecution上的跳过计数高于 0.如果满足两个条件,则返回一个新的ExitStatus,其退出 code 为“COMPLETED WITH SKIPS”退回。

5.3.3 配置停止

在讨论BatchStatus 和 ExitStatus之后,人们可能想知道如何为Job确定BatchStatusExitStatus。虽然这些状态是由 code 执行的Step确定的,但Job的状态将根据 configuration 确定。

到目前为止,所讨论的所有 job 配置至少有一个没有转换的Step。对于 example,在执行以下 step 之后,Job将结束:

<step id="stepC" parent="s3"/>

如果没有为Step定义转换,那么Job的状态将定义如下:

  • 如果Step ends 与ExitStatus失败,则JobBatchStatusExitStatus都将失败。

  • 否则,JobBatchStatusExitStatus都将被完成。

虽然这种终止批处理 job 的方法足以用于某些批处理作业,例如简单的顺序 step job,但可能需要自定义 job-stopping 场景。为此,Spring Batch 提供了三个过渡元素来停止Job(除了我们之前讨论过的“下一个”元素)。这些停止元素中的每一个都将使用特定的BatchStatus来停止Job。重要的是要注意,stop 过渡元素对Job中的任何StepBatchStatusExitStatus都没有影响:这些元素只会影响Job的最终状态。例如,job 中的每个 step 都可能具有 FAILED 状态但 job 的状态为 COMPLETED,反之亦然。

'结束'元素

'end'元素指示JobBatchStatus的 COMPLETED 停止。已完成状态为 COMPLETED 的Job无法重新启动(framework 将抛出JobInstanceAlreadyCompleteException)。 'end'元素还允许可选的'exit-code'属性,可用于自定义JobExitStatus。如果没有给出'exit-code'属性,那么ExitStatus默认为“COMPLETED”,以匹配BatchStatus

在以下场景中,如果 step2 失败,那么Job将以BatchStatus的 COMPLETED 停止并且ExitStatus为“COMPLETED”并且 step3 将不会执行;否则,执行将转到步骤 3。请注意,如果 step2 失败,则Job将无法重新启动(因为状态为 COMPLETED)。

<step id="step1" parent="s1" next="step2">

<step id="step2" parent="s2">
    <end on="FAILED"/>
    <next on="*" to="step3"/>
</step>

<step id="step3" parent="s3">

'失败'元素

'fail'元素指示JobBatchStatus FAILED 停止。与'end'元素不同,'fail'元素不会阻止Job重启。 'fail'元素还允许使用可选的'exit-code'属性,该属性可用于自定义JobExitStatus。如果没有给出'exit-code'属性,那么ExitStatus默认为“FAILED”,以_匹配BatchStatus

在以下场景中,如果 step2 失败,那么Job将以BatchStatus的 FAILED 和ExitStatus的“EARLY TERMINATION”停止,并且 step3 将不会执行;否则,执行将转到步骤 3。此外,如果 step2 失败,并且Job重新启动,则将在 step2 上再次执行。

<step id="step1" parent="s1" next="step2">

<step id="step2" parent="s2">
    <fail on="FAILED" exit-code="EARLY TERMINATION"/>
    <next on="*" to="step3"/>
</step>

<step id="step3" parent="s3">

'停止'元素

'stop'元素指示JobBatchStatus STOPPED 停止。停止Job可以在处理过程中提供临时 break,以便 operator 可以在重新启动Job之前采取一些操作。 'stop'元素需要一个'restart'属性,该属性指定执行应该在Job is restarted时拾取的 step。

在以下场景中,如果 step1 以 COMPLETE 结束,则 job 将停止。重新启动后,将在第 2 步开始执行。

<step id="step1" parent="s1">
    <stop on="COMPLETED" restart="step2"/>
</step>

<step id="step2" parent="s2"/>

5.3.4 程序化流程决策

在某些情况下,可能需要比ExitStatus更多的信息来决定接下来要执行哪个 step。在这种情况下,JobExecutionDecider可用于协助决策。

public class MyDecider implements JobExecutionDecider {
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        if (someCondition) {
            return "FAILED";
        }
        else {
            return "COMPLETED";
        }
    }
}

在 job configuration 中,“decision”标签将指定要使用的决策程序以及所有转换。

<job id="job">
    <step id="step1" parent="s1" next="decision" />

    <decision id="decision" decider="decider">
        <next on="FAILED" to="step2" />
        <next on="COMPLETED" to="step3" />
    </decision>

    <step id="step2" parent="s2" next="step3"/>
    <step id="step3" parent="s3" />
</job>

<beans:bean id="decider" class="com.MyDecider"/>

5.3.5 分流

到目前为止描述的每个场景都涉及一个,它在 time 以线性方式执行Step s。除了这种典型的样式之外,Spring Batch 命名空间还允许使用'split'元素为 job 配置 parallel 流。如下所示,'split'元素包含一个或多个'flow'元素,其中可以定义整个单独的流。 'split'元素还可以包含任何前面讨论的过渡元素,例如'next'属性或'next','end','fail'或'pause'元素。

<split id="split1" next="step4">
    <flow>
        <step id="step1" parent="s1" next="step2"/>
        <step id="step2" parent="s2"/>
    </flow>
    <flow>
        <step id="step3" parent="s3"/>
    </flow>
</split>
<step id="step4" parent="s4"/>

5.3.6 外部化作业之间的流定义和依赖关系

job 中的部分流可以外部化为单独的 bean 定义,然后是 re-used。有两种方法可以做到这一点,第一种方法是简单地将流声明为对其他地方定义的流的 reference:

<job id="job">
    <flow id="job1.flow1" parent="flow1" next="step3"/>
    <step id="step3" parent="s3"/>
</job>

<flow id="flow1">
    <step id="step1" parent="s1" next="step2"/>
    <step id="step2" parent="s2"/>
</flow>

像这样定义外部流的效果就是将外部流中的步骤插入到 job 中,就像它们已经内联声明一样。通过这种方式,许多作业可以引用相同的模板流并将这些模板组合成不同的逻辑流。这也是分离各个流的 integration 测试的好方法。

外化流的另一种形式是使用JobStepJobStep类似于FlowStep,但实际上为指定的流中的步骤创建并启动单独的 job 执行。这是一个 example:

<job id="jobStepJob" restartable="true">
   <step id="jobStepJob.step1">
      <job ref="job" job-launcher="jobLauncher"
          job-parameters-extractor="jobParametersExtractor"/>
   </step>
</job>

<job id="job" restartable="true">...</job>

<bean id="jobParametersExtractor" class="org.spr...DefaultJobParametersExtractor">
   <property name="keys" value="input.file"/>
</bean>

job 参数提取器是一种策略,用于确定StepExecutionContext如何转换为JobParameters以执行 Job。当您希望有一些更精细的选项来监视和报告作业和步骤时,JobStep非常有用。使用JobStep通常也是一个很好的答案:“如何在作业之间创建依赖关系?”。这是一个很好的方法,可以将大型系统扩展为更小的模块并控制作业流。

5.4 Job 和 Step 属性的 Binding Binding

上面的 XML 和 Flat File 示例都使用 Spring Resource抽象来获取文件。这是有效的,因为Resource有一个 getFile 方法,它返回一个java.io.File。可以使用标准 Spring 结构配置 XML 和 Flat File 资源:

<bean id="flatFileItemReader"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource"
              value="file://outputs/20070122.testStream.CustomerReportStep.TEMP.txt" />
</bean>

以上Resource将从指定的文件系统位置加载文件。请注意,绝对位置必须以 double 斜杠(“//”)开头。在大多数 spring applications 中,这个解决方案已经足够好了,因为这些名称在 compile time 时已知。但是,在批处理方案中,可能需要在运行时将文件 name 确定为 job 的参数。这可以使用'-D'参数来解决,i.e。一个系统 property:

<bean id="flatFileItemReader"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="${input.file.name}" />
</bean>

此解决方案工作所需的全部内容都是系统参数(-Dinput.file.name =“file://file.txt”)。 (请注意,虽然这里可以使用PropertyPlaceholderConfigurer,但是如果系统 property 始终设置是没有必要的,因为 Spring 中的ResourceEditor已经过滤并在系统 properties.)上进行占位符替换

通常在批处理设置中,最好在 job 的JobParameters中参数化文件 name,而不是通过系统 properties,并以这种方式访问它们。为了实现这一点,Spring Batch 允许各种 Job 和 Step 属性的后期绑定:

<bean id="flatFileItemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="#{jobParameters['input.file.name']}" />
</bean>

JobExecutionStepExecution level ExecutionContext都可以以相同的方式访问:

<bean id="flatFileItemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="#{jobExecutionContext['input.file.name']}" />
</bean>
<bean id="flatFileItemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="#{stepExecutionContext['input.file.name']}" />
</bean>

任何使用 late-binding 的 bean 都必须使用 scope =“ step”声明。有关第 5.4.1 节,“步骤范围”的更多信息,请参阅。

如果您正在使用 Spring 3.0(或以上),则 step-scoped beans 中的表达式位于 Spring 表达式语言中,这是一种功能强大的通用语言,具有许多有趣的 features。为了提供向后兼容性,如果 Spring Batch 检测到存在旧版本的 Spring,则它使用功能较弱的本机表达式语言,并且具有稍微不同的解析规则。主要区别在于上面 example 中的 map 键不需要用 Spring 2.5 引用,但引号在 Spring 3.0 中是必需的。

5.4.1 步骤范围

上面所有后期的 binding 示例都在 bean 定义中声明了“step”的范围:

<bean id="flatFileItemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="#{jobParameters[input.file.name]}" />
</bean>

在 order 中使用Step的范围来使用延迟 binding,因为在开始之前 bean 实际上不能被实例化,这允许找到属性。因为默认情况下它不是 Spring 容器的一部分,所以必须使用batch命名空间显式添加范围:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="...">
<batch:job .../>
...
</beans>

或者通过为StepScope(但不是两者)明确包含 bean 定义:

<bean class="org.springframework.batch.core.scope.StepScope" />

5.4.2 Job 范围

在 Spring Batch 3.0 中引入的 Job 范围类似于 configuration 中的 Step 范围,但是 Job context 的范围,因此每个执行 job 只有一个这样的 bean 实例。此外,还提供了对使用#{从 JobContext 访问的 references 的后期绑定的支持。 452}占位符。使用此 feature,可以从 job 或 job 执行 context 和 job 参数中提取 bean properties。 E.g。

<bean id="..." class="..." scope="job">
    <property name="name" value="#{jobParameters[input]}" />
</bean>
<bean id="..." class="..." scope="job">
    <property name="name" value="#{jobExecutionContext['input.name']}.txt" />
</bean>

因为默认情况下它不是 Spring 容器的一部分,所以必须使用batch命名空间显式添加范围:

<beans xmlns="http://www.springframework.org/schema/beans"
		  xmlns:batch="http://www.springframework.org/schema/batch"
		  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		  xsi:schemaLocation="...">

		  <batch:job .../>
		  ...
		  </beans>

或者通过为JobScope(但不是两者)明确包含 bean 定义:

<bean class="org.springframework.batch.core.scope.JobScope" />
Updated at: 9 months ago
4.6.6. 中止 JobTable of content6. ItemReaders 和 ItemWriters