5. 配置步骤

批处理域语言所讨论的,Step是一个域对象,该域对象封装了批处理作业的一个独立的 Sequences 阶段,并包含定义和控制实际批处理所需的所有信息。这是一个模糊的描述,因为任何给定Step的内容都由开发人员自行编写Job来决定。步骤可以像开发人员所希望的那样简单或复杂。简单的Step可能会将文件中的数据加载到数据库中,几乎不需要代码。 (取决于所使用的实现方式)较复杂的Step可能具有复杂的业务规则,这些规则将在处理过程中应用。

5.1 块处理

Spring Batch 在其最常见的实现中使用“面向块的”处理风格。面向块的处理是指一次读取一个数据,并在事务边界内创建要写出的“块”。从ItemReader读取一项,将其交给ItemProcessor并进行汇总。一旦读取的 Item 数等于提交间隔,就通过 ItemWriter 写入整个块,然后提交事务。

下面是上述相同概念的代码表示:

List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
    Object item = itemReader.read()
    Object processedItem = itemProcessor.process(item);
    items.add(processedItem);
}
itemWriter.write(items);

5.1.1 配置步骤

尽管Step所需的依赖项列表相对较短,但它是一个极其复杂的类,可能包含许多协 Writer。为了简化配置,可以使用 Spring Batch 名称空间:

<job id="sampleJob" job-repository="jobRepository">
    <step id="step1">
        <tasklet transaction-manager="transactionManager">
            <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
        </tasklet>
    </step>
</job>

上面的配置代表创建面向 Item 的步骤所需的唯一依赖项:

  • reader-提供处理 Item 的ItemReader

  • writer-处理ItemReader提供的 Item 的ItemWriter

  • transaction-manager-Spring 的PlatformTransactionManager,将用于在处理期间开始和提交事务。

  • job-repository-JobRepository,将用于在处理过程中(即在提交之前)定期存储StepExecutionExecutionContext。对于内联\ (在 内定义),它是\ 元素上的一个属性;对于独立步骤,它定义为\ 的属性。

  • commit-interval-提交事务之前将要处理的 Item 数。

应当注意,作业存储库默认为“ jobRepository”,而事务 Management 器默认为“ transactionManger”。此外,ItemProcessor是可选的,不是必需的,因为该 Item 可以直接从 Reader 传递给编写器。

5.1.2 从父步骤继承

如果一组Step共享相似的配置,则定义一个“父” Step可能会有所帮助,具体的Step可以从中继承属性。类似于 Java 中的类继承,“子” Step将其元素和属性与父类结合在一起。子代还将覆盖父代的任何Step

在下面的示例中,Step“ concreteStep1”将从“ parentStep”继承。它将使用'itemReader','itemProcessor','itemWriter',startLimit = 5 和 allowStartIfComplete = true 实例化。另外,由于被“ concreteStep1”覆盖,所以 commitInterval 将为“ 5”:

<step id="parentStep">
    <tasklet allow-start-if-complete="true">
        <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
    </tasklet>
</step>

<step id="concreteStep1" parent="parentStep">
    <tasklet start-limit="5">
        <chunk processor="itemProcessor" commit-interval="5"/>
    </tasklet>
</step>

在 job 元素内的步骤上,仍然需要 id 属性。这有两个原因:

  • 保留 StepExecution 时,该 ID 将用作步骤名称。如果在一个作业中的多个步骤中引用了同一独立步骤,则将发生错误。

  • 如本章稍后所述,在创建作业流程时,下一个属性应引用流程中的步骤,而不是独立步骤。

Abstract Step

有时可能有必要定义不是完整的Step配置的父级Step。例如,如果将 Reader,Writer 和 Tasklet 属性保留为Step配置,则初始化将失败。如果必须定义一个没有这些属性的父对象,则应使用“抽象”属性。 “抽象” Step将不会被实例化;它仅用于扩展。

在下面的示例中,如果Step“ abstractParentStep”未声明为抽象,则不会实例化。 Step“ concreteStep2”将具有“ itemReader”,“ itemWriter”和 commitInterval = 10.

<step id="abstractParentStep" abstract="true">
    <tasklet>
        <chunk commit-interval="10"/>
    </tasklet>
</step>

<step id="concreteStep2" parent="abstractParentStep">
    <tasklet>
        <chunk reader="itemReader" writer="itemWriter"/>
    </tasklet>
</step>

Merging Lists

Step上的一些可配置元素是列表;例如,\ 元素。如果父级和子级Step都声明了\ 元素,则子级的列表将覆盖父级的列表。为了允许孩子将其他侦听器添加到 parent 定义的列表中,每个列表元素都具有“合并”属性。如果该元素指定 merge =“ true”,则子级列表将与父级列表合并,而不是覆盖它。

在以下示例中,将创建Step“ concreteStep3”,并将两个侦听器:listenerOnelistenerTwo

<step id="listenersParentStep" abstract="true">
    <listeners>
        <listener ref="listenerOne"/>
    <listeners>
</step>

<step id="concreteStep3" parent="listenersParentStep">
    <tasklet>
        <chunk reader="itemReader" writer="itemWriter" commit-interval="5"/>
    </tasklet>
    <listeners merge="true">
        <listener ref="listenerTwo"/>
    <listeners>
</step>

5.1.3 提交间隔

如上所述,一个步骤读取和写入 Item,并使用提供的PlatformTransactionManager定期提交。提交间隔为 1 时,它将在写入每个单独的 Item 后提交。在许多情况下,这并不理想,因为开始和提交事务非常昂贵。理想情况下,最好在每个事务中处理尽可能多的 Item,这完全取决于要处理的数据类型和与之交互的资源。因此,可以配置在提交中处理的 Item 数。

<job id="sampleJob">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
        </tasklet>
    </step>
</job>

在上面的示例中,每个事务中将处理 10 个 Item。在处理开始时,事务开始,并且每次在ItemReader上调用 read 时,计数器都会递增。当它达到 10 时,聚合项的列表将传递到ItemWriter,事务将被提交。

5.1.4 配置重新启动步骤

第 4 章,配置和运行作业中,讨论了重新启动Job。重新启动对步骤有很多影响,因此可能需要一些特定的配置。

设置 StartLimit

在许多情况下,您可能希望控制Step的启动次数。例如,可能需要配置特定的Step使其仅运行一次,因为它会使某些必须手动修复的资源无效,然后才能再次运行它。这可以在步骤级别上配置,因为不同的步骤可能有不同的要求。只能执行一次的Step与可以无限运行的Step作为同一Job的一部分存在。以下是启动限制配置示例:

<step id="step1">
    <tasklet start-limit="1">
        <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
    </tasklet>
</step>

上面的简单步骤只能运行一次。尝试再次运行它会引发异常。应该注意的是,开始限制的默认值为Integer.MAX_VALUE

重新开始完成的步骤

对于可重新启动的作业,可能有一个或多个步骤应始终运行,无论它们是否第一次成功。一个示例可能是验证步骤,或者是Step在处理之前清理资源。在正常处理重新启动的作业期间,状态为“已完成”(表示已成功完成)的任何步骤都将被跳过。将 allow-start-if-complete 设置为“ true”将覆盖此设置,以便该步骤将始终运行:

<step id="step1">
    <tasklet allow-start-if-complete="true">
        <chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
    </tasklet>
</step>

步骤重新启动配置示例

<job id="footballJob" restartable="true">
    <step id="playerload" next="gameLoad">
        <tasklet>
            <chunk reader="playerFileItemReader" writer="playerWriter"
                   commit-interval="10" />
        </tasklet>
    </step>
    <step id="gameLoad" next="playerSummarization">
        <tasklet allow-start-if-complete="true">
            <chunk reader="gameFileItemReader" writer="gameWriter"
                   commit-interval="10"/>
        </tasklet>
    </step>
    <step id="playerSummarization">
        <tasklet start-limit="3">
            <chunk reader="playerSummarizationSource" writer="summaryWriter"
                   commit-interval="10"/>
        </tasklet>
    </step>
</job>

上面的示例配置适用于加载有关足球 match 的信息并对其进行汇总的作业。它包含三个步骤:playerLoad,gameLoad 和 playerSummarization。 playerLoad Step从平面文件中加载玩家信息,而 gameLoad Step对游戏也是如此。最后的Step,playerSummarization,然后根据提供的游戏汇总每个玩家的统计信息。假定“ playerLoad”加载的文件仅必须加载一次,但是“ gameLoad”将加载在特定目录中找到的所有游戏,并在将它们成功加载到数据库后将其删除。结果,playerLoad Step不包含其他配置。它几乎可以无限启动,如果完成,将被跳过。但是,'gameLoad'Step每次都需要运行,以防自上次执行以来删除了多余的文件。为了始终启动,将“ allow-start-if-complete”设置为“ true”。 (假设装入的数据库表游戏具有过程指示器,以确保可以通过汇总步骤正确找到新游戏)。摘要StepJob中最重要的摘要,其初始限制为 3.这很有用,因为如果步骤连续失败,则会将新的退出代码返回给控制作业执行的操作员,并且在进行人工干预之前,不允许再次启动。

Note

该工作仅出于示例目的,与示例 Item 中找到的 footballJob 不同。

Run 1:

  • playerLoad 已执行并成功完成,将 400 位玩家添加到“ PLAYERS”表中。

  • 执行 gameLoad 并处理 11 个值得游戏数据的文件,并将其内容加载到“ GAMES”表中。

  • playerSummarization 开始处理,并在 5 分钟后失败。

Run 2:

  • playerLoad 未运行,因为它已经成功完成,并且 allow-start-if-complete 为'false'(默认值)。

  • gameLoad 再次执行并处理另外 2 个文件,并将它们的内容也加载到“ GAMES”表中(带有过程指示器,指示它们尚未处理)

  • playerSummarization 开始处理所有剩余的游戏数据(使用过程指示器进行过滤),并在 30 分钟后再次失败。

Run 3:

  • playerLoad 未运行,因为它已经成功完成,并且 allow-start-if-complete 为'false'(默认值)。

  • gameLoad 再次执行并处理另外 2 个文件,并将它们的内容也加载到“ GAMES”表中(带有过程指示器,指示它们尚未处理)

  • playerSummarization 没有开始,并且作业被立即终止,因为这是 playerSummarization 的第三次执行,并且其限制仅为 2.必须提高限制,或者必须将Job作为新的JobInstance执行。

5.1.5 配置跳过逻辑

在许多情况下,处理时遇到的错误不应导致Step失败,而应跳过这些错误。通常这是必须由了解数据本身及其含义的人做出的决定。例如,财务数据可能无法跳过,因为它会导致资金被转移,这需要完全准确。另一方面,加载供应商列表可能会导致跳过。如果由于格式错误或缺少必要的信息而未加载供应商,则可能不会出现问题。通常,这些不良记录也会被记录下来,稍后在讨论侦听器时将予以覆盖。

<step id="step1">
   <tasklet>
      <chunk reader="flatFileItemReader" writer="itemWriter"
             commit-interval="10" skip-limit="10">
         <skippable-exception-classes>
            <include class="org.springframework.batch.item.file.FlatFileParseException"/>
         </skippable-exception-classes>
      </chunk>
   </tasklet>
</step>

在此示例中,使用了FlatFileItemReader,并且在任何时候抛出FlatFileParseException都会被跳过并计入总跳过限制 10.在步骤执行过程中,分别对读取,处理和写入的跳过进行计数,该限制适用于所有人。一旦达到跳过限制,找到的下一个异常将导致该步骤失败。

上面的示例的一个问题是除FlatFileParseException之外的任何其他异常都将导致Job失败。在某些情况下,这可能是正确的行为。但是,在其他情况下,可能更容易确定哪些异常应导致失败并跳过其他所有操作:

<step id="step1">
    <tasklet>
        <chunk reader="flatFileItemReader" writer="itemWriter"
               commit-interval="10" skip-limit="10">
            <skippable-exception-classes>
                <include class="java.lang.Exception"/>
                <exclude class="java.io.FileNotFoundException"/>
            </skippable-exception-classes>
        </chunk>
    </tasklet>
</step>

通过“包含” java.lang.Exception作为可跳过的异常类,该配置指示所有Exception都是可跳过的。但是,通过“排除” java.io.FileNotFoundException,配置将可跳过的异常类的列表优化为全部Exception FileNotFoundException。如果遇到任何排除在外的异常类将是致命的(即不被跳过)。

对于遇到的任何异常,可跳过性将由类层次结构中最接近的超类确定。任何未分类的异常将被视为“致命”异常。 <include/><exclude/>元素的 Sequences 无关紧要。

5.1.6 配置重试逻辑

在大多数情况下,您希望异常导致跳过或Step失败。但是,并非所有 exception 都是确定性的。如果在读取时遇到FlatFileParseException,则该记录将始终被抛出;重置ItemReader将无济于事。但是,对于其他异常,例如DeadlockLoserDataAccessException,它指示当前进程已尝试更新另一个进程已锁定的记录,await 并重试可能会导致成功。在这种情况下,应配置重试:

<step id="step1">
   <tasklet>
      <chunk reader="itemReader" writer="itemWriter"
             commit-interval="2" retry-limit="3">
         <retryable-exception-classes>
            <include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
         </retryable-exception-classes>
      </chunk>
   </tasklet>
</step>

Step允许限制单个 Item 的重试次数,以及“可重试”的 exception 列表。有关重试工作方式的更多详细信息,请参见第 9 章,重试

5.1.7 控制回滚

默认情况下,无论重试还是跳过,从ItemWriter引发的任何异常都将导致由Step控制的事务回滚。如果如上所述配置了 skip,则从ItemReader引发的异常不会导致回滚。但是,在许多情况下,从ItemWriter引发的异常不应导致回滚,因为没有采取任何行动来使事务无效。因此,可以为Step配置一系列不应引起回滚的异常。

<step id="step1">
   <tasklet>
      <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
      <no-rollback-exception-classes>
         <include class="org.springframework.batch.item.validator.ValidationException"/>
      </no-rollback-exception-classes>
   </tasklet>
</step>

Transactional Readers

ItemReader的基本 Contract 是仅向前。该步骤可缓冲读取器的 Importing,因此在回滚的情况下,无需从读取器中重新读取 Item。但是,在某些情况下,Reader 是构建在诸如 JMS 队列之类的事务资源之上的。在这种情况下,由于队列与回滚的事务相关,因此将从队列中拉出的消息放回去。因此,可以将步骤配置为不缓冲 Item:

<step id="step1">
    <tasklet>
        <chunk reader="itemReader" writer="itemWriter" commit-interval="2"
               is-reader-transactional-queue="true"/>
    </tasklet>
</step>

5.1.8 事务属性

事务属性可用于控制隔离,传播和超时设置。可以在 spring 核心文档中找到有关设置事务属性的更多信息。

<step id="step1">
    <tasklet>
        <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
        <transaction-attributes isolation="DEFAULT"
                                propagation="REQUIRED"
                                timeout="30"/>
    </tasklet>
</step>

5.1.9 在步骤中注册 ItemStreams

该步骤必须在其生命周期的必要时点处理ItemStream回调。 (有关ItemStream界面的更多信息,请参考第 6.4 节“ ItemStream”),这对于步骤失败至关重要,并且可能需要重新启动,因为ItemStream接口是该步骤获取有关两次执行之间的持久状态所需信息的地方。

如果ItemReaderItemProcessorItemWriter本身实现ItemStream接口,则将自动注册这些接口。任何其他流都需要单独注册。这通常是存在间接依赖性的情况,例如将委托注入到读取器和写入器中。可以通过'streams'元素在Step上注册流,如下所示:

<step id="step1">
    <tasklet>
        <chunk reader="itemReader" writer="compositeWriter" commit-interval="2">
            <streams>
                <stream ref="fileItemWriter1"/>
                <stream ref="fileItemWriter2"/>
            </streams>
        </chunk>
    </tasklet>
</step>

<beans:bean id="compositeWriter"
            class="org.springframework.batch.item.support.CompositeItemWriter">
    <beans:property name="delegates">
        <beans:list>
            <beans:ref bean="fileItemWriter1" />
            <beans:ref bean="fileItemWriter2" />
        </beans:list>
    </beans:property>
</beans:bean>

在上面的示例中,CompositeItemWriter不是ItemStream,但是它的两个委托都是。因此,必须将两个委托 Writer 都明确注册为流,以便框架正确处理它们。 ItemReader不需要显式注册为流,因为它是Step的直接属性。现在,该步骤将可重新启动,并且如果发生故障,读取器和写入器的状态将正确保留。

5.1.10 拦截步骤执行

就像Job一样,在执行Step期间会发生许多事件,用户可能需要执行某些功能。例如,为了写出需要页脚的平面文件,必须在Step完成时通知ItemWriter,以便页脚可以写入。这可以通过许多Step作用域侦听器之一来完成。

任何实现StepListenerextensions 之一的类(但由于接口本身是空的,因此不能实现该接口本身)可以通过 listeners 元素应用于步骤。 listeners 元素在步骤,tasklet 或块声明中有效。建议您在其功能适用的级别声明监听器,或者如果它具有多种功能(例如StepExecutionListenerItemReadListener),则在其适用的最细粒度级别声明监听器(在给定的示例中为大块)。

<step id="step1">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"/>
        <listeners>
            <listener ref="chunkListener"/>
        </listeners>
    </tasklet>
</step>

如果使用命名空间<step>元素或*StepFactoryBean工厂之一,则本身实现StepListener接口之一的ItemReaderItemWriterItemProcessor将自动向Step注册。这仅适用于直接注入Step的组件:如果侦听器嵌套在另一个组件中,则需要对其进行显式注册(如上所述)。

除了StepListener接口之外,还提供 Comments 来解决相同的问题。普通的旧 Java 对象可以使用带有这些注解的方法,然后将其转换为相应的StepListener类型。Comments 诸如ItemReaderItemWriterTasklet之类的块组件的自定义实现也是很常见的。 XML 解析器针对<listener/>元素对 Comments 进行了分析,因此您所需要做的就是使用 XML 名称空间通过一个步骤注册侦听器。

StepExecutionListener

StepExecutionListener代表Step执行的最通用的侦听器。它允许在Step开始之前和结束之后发出通知,无论它是正常结束还是失败:

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

ExitStatusafterStep的返回类型,以使侦听器有机会修改在Step完成时返回的退出代码。

与此接口对应的 Comments 为:

  • @BeforeStep

  • @AfterStep

ChunkListener

块定义为在事务范围内处理的 Item。在每个提交间隔提交事务都会提交一个“块”。 ChunkListener对于在块开始处理之前或块成功完成之后执行逻辑很有用:

public interface ChunkListener extends StepListener {

    void beforeChunk();
    void afterChunk();

}

在事务开始之后但在ItemReader上调用read之前,将调用beforeChunk方法。相反,afterChunk在提交块之后被调用(如果发生回滚则根本不调用)。

与此接口对应的 Comments 为:

  • @BeforeChunk

  • @AfterChunk

当没有块声明时,可以应用ChunkListener:它是TaskletStep负责调用ChunkListener,因此它也适用于非面向 Item 的 Tasklet(在 Tasklet 之前和之后调用)。

ItemReadListener

在上面讨论跳过逻辑时,曾提到记录跳过的记录可能会有所帮助,以便以后进行处理。如果出现读取错误,可以使用ItemReaderListener:完成

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

每次在ItemReader上调用read之前,将调用beforeRead方法。每次成功调用read之后,将调用afterRead方法,并将传递已读取的 Item。如果读取时出错,则将调用onReadError方法。将提供遇到的异常,以便可以将其记录下来。

与此接口对应的 Comments 为:

  • @BeforeRead

  • @AfterRead

  • @OnReadError

ItemProcessListener

ItemReadListener一样,可以将 Item 的处理“侦听”为:

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

beforeProcess方法将在ItemProcessor上的process之前调用,并交给将要处理的 Item。成功处理该 Item 后,将调用afterProcess方法。如果处理时出错,则将调用onProcessError方法。将提供遇到的异常和尝试处理的 Item,以便可以记录它们。

与此接口对应的 Comments 为:

  • @BeforeProcess

  • @AfterProcess

  • @OnProcessError

ItemWriteListener

可以使用ItemWriteListener来“听”Item 的书写:

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

beforeWrite方法将在ItemWriter上的write之前被调用,并且将被写入的内容交给该方法。成功写入 Item 后,将调用afterWrite方法。如果写入时出错,则将调用onWriteError方法。将提供遇到的异常和尝试写入的 Item,以便可以记录它们。

与此接口对应的 Comments 为:

  • @BeforeWrite

  • @AfterWrite

  • @OnWriteError

SkipListener

ItemReadListenerItemProcessListenerItemWriteListener都提供了用于通知错误的机制,但没有一个机制会通知您实际上已跳过了一条记录。例如,即使重试并成功执行onWriteError也会被调用。因此,有一个单独的界面可用于跟踪跳过的 Item:

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);

}

每次在阅读过程中跳过某项都会调用onSkipInRead。应当注意,回滚可能导致同一项被注册为多次跳过。写入时跳过某项时,将调用onSkipInWrite。由于已成功读取(而不是跳过)该 Item,因此还提供了该 Item 本身作为参数。

与此接口对应的 Comments 为:

  • @OnSkipInRead

  • @OnSkipInWrite

  • @OnSkipInProcess

跳过侦听器和事务

SkipListener最常见的用例之一是注销跳过的 Item,以便可以使用另一个批处理甚至人工流程来评估和解决导致跳过的问题。由于在很多情况下原始事务可能会被回滚,因此 Spring Batch 提供了两个保证:

  • 适当的跳过方法(取决于错误发生的时间)将仅对每个 Item 调用一次。

  • SkipListener将始终在事务提交之前被调用。这是为了确保ItemWriter内的故障不会使侦听器调用的任何事务资源回滚。

5.2 TaskletStep

块处理不是在Step中处理的唯一方法。如果Step必须包含一个简单的存储过程调用怎么办?您可以将调用实现为ItemReader,并在过程完成后返回 null,但这有点不自然,因为需要使用无操作ItemWriter。 Spring Batch 为此场景提供了TaskletStep

Tasklet是一个简单的接口,具有一个方法execute,它将由TaskletStep反复调用,直到它返回RepeatStatus.FINISHED或引发异常以指示失败。对Tasklet的每次调用都包装在一个事务中。 Tasklet实现者可以调用存储过程,脚本或简单的 SQL 更新语句。要创建TaskletStep,\ 元素的'ref'属性应引用一个定义Tasklet对象的 bean。在\ 内不应使用\ 元素:

<step id="step1">
    <tasklet ref="myTasklet"/>
</step>

Note

如果TaskletStep实现了此接口,它将自动将 Tasklet 注册为StepListener

5.2.1 TaskletAdapter

ItemReaderItemWriter接口的其他适配器一样,Tasklet接口包含一个实现,可以使其自身适应任何现有的类:TaskletAdapter。一个可能有用的示例是现有的 DAO,用于更新一组记录上的标志。 TaskletAdapter可用于调用此类,而不必为Tasklet接口编写适配器:

<bean id="myTasklet" class="o.s.b.core.step.tasklet.MethodInvokingTaskletAdapter">
    <property name="targetObject">
        <bean class="org.mycompany.FooDao"/>
    </property>
    <property name="targetMethod" value="updateFoo" />
</bean>

5.2.2 Tasklet 实现示例

许多批处理作业包含必须在主处理开始之前执行的步骤,以设置各种资源,或者在处理完成后清理这些资源。对于需要大量处理文件的工作,通常需要在成功将文件上载到其他位置后在本地删除某些文件。下面的示例摘自 Spring Batch 示例 Item,是一个Tasklet实现,它具有以下职责:

public class FileDeletingTasklet implements Tasklet, InitializingBean {

    private Resource directory;

    public RepeatStatus execute(StepContribution contribution,
                                ChunkContext chunkContext) throws Exception {
        File dir = directory.getFile();
        Assert.state(dir.isDirectory());

        File[] files = dir.listFiles();
        for (int i = 0; i < files.length; i++) {
            boolean deleted = files[i].delete();
            if (!deleted) {
                throw new UnexpectedJobExecutionException("Could not delete file " +
                                                          files[i].getPath());
            }
        }
        return RepeatStatus.FINISHED;
    }

    public void setDirectoryResource(Resource directory) {
        this.directory = directory;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(directory, "directory must be set");
    }
}

上面的Tasklet实现将删除给定目录中的所有文件。应当注意,execute方法将仅被调用一次。剩下的就是从Step引用Tasklet

<job id="taskletJob">
    <step id="deleteFilesInDir">
       <tasklet ref="fileDeletingTasklet"/>
    </step>
</job>

<beans:bean id="fileDeletingTasklet"
            class="org.springframework.batch.sample.tasklet.FileDeletingTasklet">
    <beans:property name="directoryResource">
        <beans:bean id="directory"
                    class="org.springframework.core.io.FileSystemResource">
            <beans:constructor-arg value="target/test-outputs/test-dir" />
        </beans:bean>
    </beans:property>
</beans:bean>

5.3 控制步骤流程

由于能够将一个拥有的工作中的各个步骤组合在一起,因此需要能够控制工作如何从一个步骤“流向”另一个步骤。 Step的失败并不一定意味着Job应该会失败。此外,可能有不止一种类型的“成功”确定接下来应执行哪个Step。根据一组步骤的配置方式,某些步骤甚至可能根本无法处理。

5.3.1Sequences 流

最简单的流程场景是一项工作,其中所有步骤都按 Sequences 执行:

这可以使用 step 元素的'next'属性来实现:

<job id="job">
    <step id="stepA" parent="s1" next="stepB" />
    <step id="stepB" parent="s2" next="stepC"/>
    <step id="stepC" parent="s3" />
</job>

在上述情况下,“步骤 A”将首先执行,因为它是列出的第一个Step。如果“步骤 A”正常完成,则将执行“步骤 B”,依此类推。但是,如果“步骤 A”失败,则整个Job将失败,并且“步骤 B”将不会执行。

Note

使用 Spring Batch 名称空间时,配置中列出的第一步将总是成为Job执行的第一步。其他步骤元素的 Sequences 无关紧要,但是第一步必须始终首先出现在 xml 中。

5.3.2 条件流

在上面的示例中,只有两种可能性:

  • Step成功,应执行下一个Step

  • Step失败,因此Job应该失败。

在许多情况下,这可能就足够了。但是,如果Step的故障应触发一个不同的Step而不是引起故障的情况呢?

为了处理更复杂的场景,Spring Batch 名称空间允许在 step 元素内定义转换元素。一种这样的过渡是“下一个”元素。与“ next”属性类似,“ next”元素将告诉Job接下来执行哪个Step。但是,与该属性不同,给定Step允许任何数量的“ next”元素,并且在失败的情况下没有默认行为。这意味着,如果使用过渡元素,则必须明确定义Step过渡的所有行为。还要注意,单个步骤不能同时具有“下一个”属性和过渡元素。

next 元素指定要匹配的模式以及下一步要执行的步骤:

<job id="job">
    <step id="stepA" parent="s1">
        <next on="*" to="stepB" />
        <next on="FAILED" to="stepC" />
    </step>
    <step id="stepB" parent="s2" next="stepC" />
    <step id="stepC" parent="s3" />
</job>

过渡元素的“ on”属性使用简单的模式匹配方案来匹配Step的执行所产生的ExitStatus。模式中仅允许使用两个特殊字符:

  • “ *”将零个或多个字符

  • “?”将完全匹配一个字符

例如,“ c * t”将匹配“ cat”和“ count”,而“ c?t”将匹配“ cat”但不匹配“ count”。

尽管Step上的过渡元素数量没有限制,但是如果Step的执行导致ExitStatus未被元素覆盖,则框架将引发异常,并且Job将会失败。该框架将自动排序从最具体到最不具体的过渡。这意味着即使在上面的示例中将元素替换为“ stepA”,“ _ FAILED”的ExitStatus仍将进入“ stepC”。

批次状态与退出状态

为条件流配置Job时,了解BatchStatusExitStatus之间的区别很重要。 BatchStatus是同时为JobExecutionStepExecution的属性的枚举,框架使用它来记录JobStep的状态。它可以是以下值之一:COMPLETED,STARTING,STARTED,STOPPING,STOPPED,FAILED,ABANDONED 或 UNKNOWN。其中大多数是不言自明的:COMPLETED 是在步骤或作业成功完成时设置的状态,在失败时设置 FAILED,等等。上面的示例包含以下“ next”元素:

<next on="FAILED" to="stepB" />

乍一 Watch,似乎“ on”属性引用了它所属的StepBatchStatus。但是,它实际上引用了StepExitStatus。顾名思义,ExitStatus代表Step完成执行后的状态。更具体地说,上面的“ next”元素引用ExitStatus的退出代码。要用英语编写,它说:“如果退出代码失败,则转到步骤 B”。默认情况下,退出代码始终与该步骤的BatchStatus相同,这就是上面的 Importing 起作用的原因。但是,如果退出代码需要不同怎么办?一个很好的例子来自 samplesItem 中的 skip sample 作业:

<step id="step1" parent="s1">
    <end on="FAILED" />
    <next on="COMPLETED WITH SKIPS" to="errorPrint1" />
    <next on="*" to="step2" />
</step>

上面的步骤有三种可能性:

  • Step失败,在这种情况下作业应该失败。

  • Step成功完成。

  • Step已成功完成,但退出代码为“ COMPLETED WITH SKIPS”。在这种情况下,应运行不同的步骤来处理错误。

以上配置将起作用。但是,需要根据跳过记录的执行条件来更改退出代码:

public class SkipCheckingListener extends StepExecutionListenerSupport {
    public ExitStatus afterStep(StepExecution stepExecution) {
        String exitCode = stepExecution.getExitStatus().getExitCode();
        if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
              stepExecution.getSkipCount() > 0) {
            return new ExitStatus("COMPLETED WITH SKIPS");
        }
        else {
            return null;
        }
    }
}

上面的代码是StepExecutionListener,首先检查以确保Step成功,然后检查StepExecution上的跳过计数是否大于 0.如果同时满足这两个条件,则新的ExitStatus的退出代码为“ COMPLETED WITH SKIPS”返回。

5.3.3 配置停止

在讨论BatchStatus 和 ExitStatus之后,您可能会想知道如何为Job确定BatchStatusExitStatus。通过已执行的代码确定Step的这些状态时,将基于配置确定Job的状态。

到目前为止,讨论的所有作业配置都至少具有一个没有过渡的最终Step。例如,执行以下步骤后,Job将结束:

<step id="stepC" parent="s3"/>

如果未为Step定义过渡,则Job的状态将定义如下:

  • 如果StepExitStatus FAILED 结尾,则JobBatchStatusExitStatus都将变为 FAILED。

  • 否则,JobBatchStatusExitStatus都将被完成。

尽管这种终止批处理作业的方法对于某些批处理作业(例如简单的 Sequences 步骤作业)已足够,但是可能需要自定义定义的作业停止方案。为此,Spring Batch 提供了三个过渡元素来停止Job(除了我们之前讨论的"next" element之外)。这些停止元素中的每一个都将停止具有特定BatchStatusJob。重要的是要注意,停止过渡元素将不会对Job中任何StepBatchStatusExitStatus产生影响:这些元素只会影响Job的最终状态。例如,作业中的每个步骤都可能具有“已失败”状态,但是作业具有“已完成”状态,反之亦然。

“结束”元素

'end'元素指示JobBatchStatus的 COMPLETED 停止。状态为 COMPLETED 的Job无法重新启动(框架将抛出JobInstanceAlreadyCompleteException)。 'end'元素还允许使用可选的'exit-code'属性,该属性可用于自定义JobExitStatus。如果未提供'exit-code'属性,则默认情况下ExitStatus将为“ COMPLETED”,以匹配BatchStatus

在以下情况下,如果步骤 2 失败,则Job将以BatchStatus的 COMPLETED 停止,而ExitStatus的“ COMPLETED”停止,并且 step3 将不执行;否则,执行将移至步骤 3.请注意,如果步骤 2 失败,则Job将无法重新启动(因为状态为 COMPLETED)。

<step id="step1" parent="s1" next="step2">

<step id="step2" parent="s2">
    <end on="FAILED"/>
    <next on="*" to="step3"/>
</step>

<step id="step3" parent="s3">

“失败”元素

“失败”元素指示JobBatchStatus的 FAILED 停止。与'end'元素不同,'fail'元素不会阻止Job重新启动。 'fail'元素还允许使用可选的'exit-code'属性,该属性可用于自定义JobExitStatus。如果未提供'exit-code'属性,则默认情况下ExitStatus为“ FAILED”,以匹配BatchStatus

在以下情况下,如果步骤 2 失败,则Job将以BatchStatus失败而停止,而ExitStatus则为“早期终止”而停止,步骤 3 将不执行;否则,执行将移至步骤 3.此外,如果步骤 2 失败,并且Job重新启动,则将在步骤 2 再次开始执行。

<step id="step1" parent="s1" next="step2">

<step id="step2" parent="s2">
    <fail on="FAILED" exit-code="EARLY TERMINATION"/>
    <next on="*" to="step3"/>
</step>

<step id="step3" parent="s3">

“停止”元素

'stop'元素指示JobBatchStatus的 STOPPED 停止。停止Job可以暂时中断处理,以便操作员可以在重新启动Job之前采取一些措施。 'stop'元素需要一个'restart'属性,该属性指定Job is restarted时应该执行的步骤。

在以下情况下,如果 step1 以 COMPLETE 完成,则作业将停止。重新启动后,将在步骤 2 开始执行。

<step id="step1" parent="s1">
    <stop on="COMPLETED" restart="step2"/>
</step>

<step id="step2" parent="s2"/>

5.3.4 程序流程决策

在某些情况下,可能需要比ExitStatus更多的信息来决定下一步执行哪个步骤。在这种情况下,可以使用JobExecutionDecider来辅助决策。

public class MyDecider implements JobExecutionDecider {
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        if (someCondition) {
            return "FAILED";
        }
        else {
            return "COMPLETED";
        }
    }
}

在作业配置中,“决策”标签将指定要使用的决策器以及所有转换。

<job id="job">
    <step id="step1" parent="s1" next="decision" />

    <decision id="decision" decider="decider">
        <next on="FAILED" to="step2" />
        <next on="COMPLETED" to="step3" />
    </decision>

    <step id="step2" parent="s2" next="step3"/>
    <step id="step3" parent="s3" />
</job>

<beans:bean id="decider" class="com.MyDecider"/>

5.3.5 分流

到目前为止,所描述的每种情况都涉及一个Job,它以线性方式一次执行其Step。除了这种典型的样式外,Spring Batch 命名空间还允许使用“ split”元素使用并行流来配置作业。如下所示,“ split”元素包含一个或多个“ flow”元素,可以在其中定义整个单独的流。 “拆分”元素还可以包含任何先前讨论的过渡元素,例如“下一个”属性或“下一个”,“结束”,“失败”或“暂停”元素。

<split id="split1" next="step4">
    <flow>
        <step id="step1" parent="s1" next="step2"/>
        <step id="step2" parent="s2"/>
    </flow>
    <flow>
        <step id="step3" parent="s3"/>
    </flow>
</split>
<step id="step4" parent="s4"/>

5.3.6 外部化作业之间的流程定义和依赖性

可以将作业中的部分流程外部化为单独的 Bean 定义,然后重新使用。有两种方法可以做到这一点,第一种是简单地将流声明为对其他地方定义的引用:

<job id="job">
    <flow id="job1.flow1" parent="flow1" next="step3"/>
    <step id="step3" parent="s3"/>
</job>

<flow id="flow1">
    <step id="step1" parent="s1" next="step2"/>
    <step id="step2" parent="s2"/>
</flow>

像这样定义外部流程的作用仅仅是将外部流程中的步骤插入作业中,就好像它们已被内联声明一样。这样,许多作业可以引用相同的模板流,并将这些模板组成不同的逻辑流。这也是分离单个流的集成测试的好方法。

外部化流程的另一种形式是使用JobStepJobStepFlowStep类似,但是实际上为指定流程中的步骤创建并启动了单独的作业执行。这是一个例子:

<job id="jobStepJob" restartable="true">
   <step id="jobStepJob.step1">
      <job ref="job" job-launcher="jobLauncher"
          job-parameters-extractor="jobParametersExtractor"/>
   </step>
</job>

<job id="job" restartable="true">...</job>

<bean id="jobParametersExtractor" class="org.spr...DefaultJobParametersExtractor">
   <property name="keys" value="input.file"/>
</bean>

作业参数提取器是一种策略,用于确定StepExecutionContext如何转换为要执行的 Job 的JobParametersJobStep在您希望有一些更精细的选项来监视和报告作业和步骤时很有用。使用JobStep通常也可以很好地回答以下问题:“如何在作业之间创建依赖关系?”。这是将大型系统分解为较小的模块并控制作业流程的好方法。

5.4 作业和步骤属性的后期绑定

上面的 XML 和 Flat File 示例都使用 Spring Resource抽象来获取文件。之所以有效,是因为Resource有一个 getFile 方法,该方法返回java.io.File。可以使用标准 Spring 构造来配置 XML 和平面文件资源:

<bean id="flatFileItemReader"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource"
              value="file://outputs/20070122.testStream.CustomerReportStep.TEMP.txt" />
</bean>

上面的Resource将从指定的文件系统位置加载文件。请注意,绝对位置必须以双斜杠(“ //”)开头。在大多数 spring 应用程序中,此解决方案足够好,因为它们的名称在编译时就已知。但是,在批处理方案中,可能需要在运行时将文件名确定为作业的参数。可以使用-D 参数(即系统属性)解决此问题:

<bean id="flatFileItemReader"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="${input.file.name}" />
</bean>

要使该解决方案起作用,所需要做的只是一个系统参数(-Dinput.file.name =“ file://file.txt”)。 (请注意,尽管此处可以使用PropertyPlaceholderConfigurer,但始终设置系统属性不是必需的,因为 Spring 中的ResourceEditor已经过滤并在系统属性上进行了占位符替换.)

通常,在批处理设置中,最好在作业的JobParameters中参数化文件名,而不是通过系统属性来访问文件名。为此,Spring Batch 允许后期绑定各种 Job 和 Step 属性:

<bean id="flatFileItemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="#{jobParameters['input.file.name']}" />
</bean>

可以通过相同的方式访问JobExecutionStepExecution级别ExecutionContext

<bean id="flatFileItemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="#{jobExecutionContext['input.file.name']}" />
</bean>
<bean id="flatFileItemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="#{stepExecutionContext['input.file.name']}" />
</bean>

Note

任何使用后期绑定的 bean 必须使用 scope =“ step”声明。请参阅第 5.4.1 节“步骤范围”更多信息。

Note

如果您使用的是 Spring 3.0(或更高版本),则步进作用域 Bean 中的表达式将使用 Spring Expression Language,这是一种功能强大的通用语言,具有许多有趣的功能。为了提供向后兼容性,如果 Spring Batch 检测到存在较旧版本的 Spring,它将使用功能较弱且解析规则稍有不同的本机表达式语言。主要区别在于,上面示例中的 map 键不需要在 Spring 2.5 中用引号引起来,但是在 Spring 3.0 中引号是必需的。

5.4.1 步骤范围

上面所有的后期绑定示例都在 bean 定义中声明了“ step”的范围:

<bean id="flatFileItemReader" scope="step"
      class="org.springframework.batch.item.file.FlatFileItemReader">
    <property name="resource" value="#{jobParameters[input.file.name]}" />
</bean>

为了使用后期绑定,需要使用Step范围,因为在Step启动之前实际上无法实例化 bean,这样才能找到属性。因为默认情况下它不是 Spring 容器的一部分,所以必须使用batch命名空间来显式添加范围:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="...">
<batch:job .../>
...
</beans>

或通过显式包含StepScope的 bean 定义(但不能同时包含两者):

<bean class="org.springframework.batch.core.scope.StepScope" />

5.4.2 工作范围

Spring Batch 3.0 中引入的 Job 作用域与配置中的 Step 作用域类似,但是是 Job 上下文的作用域,因此每个执行的作业只有一个这样的 bean 实例。此外,还提供了对使用#\ {}占位符从 JobContext 访问的引用的后期绑定的支持。使用此功能,可以从作业或作业执行上下文以及作业参数中提取 Bean 属性。例如。

<bean id="..." class="..." scope="job">
    <property name="name" value="#{jobParameters[input]}" />
</bean>
<bean id="..." class="..." scope="job">
    <property name="name" value="#{jobExecutionContext['input.name']}.txt" />
</bean>

因为默认情况下它不是 Spring 容器的一部分,所以必须使用batch名称空间显式添加范围:

<beans xmlns="http://www.springframework.org/schema/beans"
		  xmlns:batch="http://www.springframework.org/schema/batch"
		  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		  xsi:schemaLocation="...">

		  <batch:job .../>
		  ...
		  </beans>

或通过显式包括JobScope的 bean 定义(但不能同时包含两者):

<bean class="org.springframework.batch.core.scope.JobScope" />