1. 批处理的 Domain 语言


XML

Java

对于任何经验丰富的批处理架构师来说,Spring Batch 中使用的批处理的整体概念应该是熟悉和舒适的。有“作业”和“步骤”以及 developer-supplied 处理单元,称为ItemReaderItemWriter。但是,由于 Spring 模式,操作,模板,回调和习语,有以下机会:

  • 坚持明确分离关注点的重大改进。

  • 清楚地描述了作为接口提供的架构层和服务。

  • 简单和默认的 implementations,允许快速采用和易用 out-of-the-box。

  • 显着增强的可扩展性。

下图是批量 reference architecture 的简化 version,已经使用了数十年。它概述了构成批处理的 domain 语言的组件。这个 architecture framework 是一个蓝图,已经在过去几代平台(COBOL/Mainframe,C/Unix,现在 Java/anywhere)中经过数十年的 implementations 证明。 JCL 和 COBOL 开发人员可能对 C,C#和 Java 开发人员的概念感到满意。 Spring Batch 提供了健壮,可维护系统中常见的层,组件和技术服务的物理实现,这些系统用于解决简单到复杂的批处理应用程序的创建问题,使用基础结构和 extensions 来满足非常复杂的处理需求。

图 1.批量刻板印象

上图突出显示构成 Spring Batch 的 domain 语言的 key 概念。 Job 有一个到多个步骤,每个步骤只有一个ItemReader,一个ItemProcessor和一个ItemWriter。需要启动 job(使用JobLauncher),并且需要存储有关当前 running process 的元数据(在JobRepository中)。

1.1. 工作

本节描述了与批 job 概念相关的构造型。 Job是封装整个批处理 process 的实体。与 common 与其他 Spring 项目一样,Job与 XML configuration 文件或 Java-based configuration 连接在一起。该 configuration 可以称为“job configuration”。但是,Job只是整个层次结构的顶部,如下图所示:

图 2. Job 层次结构

在 Spring Batch 中,Job只是Step实例的容器。它结合了逻辑上属于流的多个步骤,并允许 properties global 配置到所有步骤,例如可重启性。 job configuration 包含:

  • job 的简单 name。

  • Step实例的定义和 ordering。

  • job 是否可重启。

Job 接口的默认简单 implementation 由 Spring Batch 以SimpleJob class 的形式提供,它在Job之上创建了一些标准功能。使用基于 java 的 configuration 时,可以使用一组构建器来实例化Job,如下面的示例所示:

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .end()
                     .build();
}

Sp _Batch 以SimpleJob class 的形式提供了Job接口的默认简单 implementation,它在Job之上创建了一些标准功能。但是,批处理命名空间抽象了直接实例化它的需要。相反,可以使用<job>标记,如下面的示例所示:

<job id="footballJob">
    <step id="playerload" next="gameLoad"/>
    <step id="gameLoad" next="playerSummarization"/>
    <step id="playerSummarization"/>
</job>

1.1.1. JobInstance

指的是逻辑 job run 的概念。考虑一个批处理 job,它应该在一天结束时运行一次,例如上图中的'EndOfDay'Job。有一个'EndOfDay'job,但必须单独跟踪Job的每个 run。在这个 job 的情况下,每天有一个逻辑JobInstance。例如,有 1 月 1 日 run,1 月 2 日 run,依此类推。如果 1 月 1 日 run 在第一个 time 失败并且在第二天再次运行,那么它仍然是 1 月 1 日 run。 (通常,这与它正在处理的数据相对应,这意味着 1 月 1 日 run 处理 1 月 1 日的数据)。因此,每个JobInstance可以有多个执行(JobExecution将在本章后面详细讨论),并且只有一个JobInstance对应于特定的Job并且识别JobParameters可以在给定的 time 运行。

JobInstance的定义绝对不会影响要加载的数据。完全取决于ItemReader implementation 来确定如何加载数据。例如,在 EndOfDay 场景中,数据上可能有一列指示数据所属的'effective date'或'schedule date'。因此,1 月 1 日 run 将仅加载来自 1st 的数据,而 1 月 2 日 run 将仅使用来自 2nd 的数据。因为这个决定很可能是一个商业决策,所以由ItemReader决定。但是,使用相同的JobInstance确定是否使用先前执行中的'state'(即ExecutionContext,将在本章后面讨论)。使用新的JobInstance意味着“从头开始”,并且使用现有实例通常意味着“从您离开的地方开始”。

1.1.2. JobParameters

在讨论了JobInstance以及它与 Job 的区别之后,我们要问的一个自然问题是:“一个人如何与另一个人区别开来?”答案是:JobParametersJobParameters object 包含一组用于启动批 job 的参数。在 run 期间,它们可用于识别甚至作为 reference 数据,如下图所示:

图 3. Job 参数

在前面的例子中,有两个实例,一个用于 1 月 1 日,另一个用于 1 月 2 日,实际上只有一个Job,但它有两个JobParameter objects:一个用 job job 参数启动,另一个是 01-01-2017 以 01-02-2017 参数开头。因此,contract 可以定义为:JobInstance = Job识别JobParameters。这允许开发人员有效地控制JobInstance的定义方式,因为它们控制传入的参数。

并非所有 job 参数都需要有助于识别JobInstance。默认情况下,他们会这样做。但是,framework 还允许提交带有参数的Job,这些参数不会影响JobInstance的身份。

1.1.3. JobExecution

A JobExecution指的是单次尝试_Jun Job 的技术概念。执行可能以失败或成功结束,但除非执行成功完成,否则对应于给定执行的JobInstance不被视为完成。使用前面描述的 EndOfDay Job作为 example,考虑为 01-01-2017,它在 run 的第一个 time 时失败。如果使用与第一个 run(01-01-2017)相同的标识 job 参数再次 run,则会创建一个新的JobExecution。但是,仍然只有一个JobInstance

Job定义 job 是什么以及如何执行它,JobInstance是一个纯粹的组织 object 到 group 执行,主要是为了启用正确的重启语义。但是,JobExecution是 run 期间实际发生的事件的主要存储机制,并且包含许多必须被控制和持久化的 properties,如下面的 table 所示:

属性定义
状态一个BatchStatus object,指示执行的状态。在 running 时,它是BatchStatus#STARTED。如果失败,则为BatchStatus#FAILED。如果成功完成,则为BatchStatus#COMPLETED
开始时间一个java.util.Date表示执行开始时的当前系统 time。如果 job 尚未启动,则此字段为空。
时间结束一个java.util.Date表示执行完成时的当前系统 time,无论它是否成功。如果 job 尚未完成,则该字段为空。
退出状态ExitStatus,表示 run 的结果。这是最重要的,因为它包含一个返回给调用者的 exit code。有关详细信息,请参阅第 5 章。如果 job 尚未完成,则该字段为空。
createTime表示第一次持久保存JobExecution时的当前系统 time。 job 可能尚未启动(因此没有 start time),但它总是有一个 createTime,framework 需要管理 job level ExecutionContexts
最近更新时间一个java.util.Date代表最后一次JobExecution持续存在。如果 job 尚未启动,则此字段为空。
执行上下文“property bag”包含需要在执行之间保留的任何用户数据。
failureExceptions执行Job期间遇到的 exceptions 列表。如果在Job失败期间遇到多个 exception,这些可能很有用。

这些 properties 非常重要,因为它们是持久的,可用于完全确定执行的状态。例如,如果 01-01 的 EndOfDay job 在 9:00 PM 执行而在 9:30 处失败,则在批处理元数据表中生成以下条目:

JOB_INST_IDJOBNAME
1EndOfDayJob
JOB_EXECUTION_IDTYPE_CDKEYNAMEDATE_VALIDENTIFYING
1日期schedule.Date2017-01-01真正
JOB_EXEC_IDJOB_INST_IDSTARTTIMEENDTIME状态
112017-01-01 21:002017-01-01 21:30失败

为清晰起见和格式化,列名可能已缩写或删除。

既然 job 已经失败,那么假设确定问题需要整整一个晚上才能确定,因此“批处理窗口”现在已经关闭。进一步假设窗口从 9:00 PM 开始,job 再次从 01-01 开始,从它停止并从 9:30 成功完成开始。因为它现在是第二天,01-02 job 也必须 run,并且它在 9:31 之后立即开始并在 10:30 的正常一小时 time 完成。除非两个作业有可能尝试访问相同的数据,否则不要求一个JobInstance一个接一个地启动,导致锁定数据库 level 的问题。完全由调度程序决定Job应该 run 的时间。由于它们是分开的JobInstances,Spring Batch 不会试图阻止它们同时运行。 (试图 run 相同的JobInstance而另一个已经 running 导致JobExecutionAlreadyRunningException被抛出)。现在,JobInstanceJobParameters表中应该有一个额外的条目,JobExecution table 中有两个额外的条目,如下表所示:

JOB_INST_IDJOBNAME
1EndOfDayJob
2EndOfDayJob
JOB_EXECUTION_IDTYPE_CDKEYNAMEDATE_VALIDENTIFYING
1日期schedule.Date2017-01-01 00:00:00真正
2日期schedule.Date2017-01-01 00:00:00真正
3日期schedule.Date2017-01-02 00:00:00真正
JOB_EXEC_IDJOB_INST_IDSTARTTIMEENDTIME状态
112017-01-01 21:002017-01-01 21:30失败
212017-01-02 21:002017-01-02 21:30已完成
322017-01-02 21:312017-01-02 22:29已完成

为清晰起见和格式化,列名可能已缩写或删除。

1.2. 步

Step是一个 domain object,它封装了批 job 的独立顺序阶段。因此,每个 Job 完全由一个或多个步骤组成。 Step包含定义和控制实际批处理所需的所有信息。这是一个必然模糊的描述,因为任何给定Step的内容由开发人员自行决定编写JobStep可以像开发者所希望的那样简单或复杂。一个简单的Step可能会将数据从文件加载到数据库中,几乎不需要 code(取决于所使用的 implementations)。更复杂的Step可能具有复杂的业务规则,这些规则作为处理的一部分应用。与Job一样,Step具有与唯一JobExecution相关的个体StepExecution,如下图所示:

图 4.带步骤的 Job 层次结构

1.2.1. StepExecution

StepExecution表示执行Step的单次尝试。每创建一个新的StepExecution Step是 run,类似于JobExecution。但是,如果 step 无法执行,因为它之前的 step 失败,则不会继续执行它。只有在实际启动Step时才会创建StepExecution

Step执行由StepExecution class 的 objects 表示。每个执行都包含对其相应的 step 和JobExecution和 transaction 相关数据的 reference,例如提交和回滚计数以及开始和结束时间。此外,每个 step 执行都包含一个ExecutionContext,其中包含开发人员需要在批处理运行中保留的任何数据,例如重新启动所需的统计信息或 state 信息。以下 table _l 为StepExecution的 properties:

属性定义
状态一个BatchStatus object,指示执行的状态。在 running 时,状态为BatchStatus.STARTED。如果失败,则状态为BatchStatus.FAILED。如果成功完成,状态为BatchStatus.COMPLETED
开始时间一个java.util.Date表示执行开始时的当前系统 time。如果 step 尚未启动,则此字段为空。
时间结束一个java.util.Date表示执行完成时的当前系统 time,无论它是否成功。如果 step 尚未退出,则此字段为空。
退出状态ExitStatus表示执行结果。这是最重要的,因为它包含一个返回给调用者的 exit code。有关详细信息,请参阅第 5 章。如果 job 尚未退出,则此字段为空。
执行上下文“property bag”包含需要在执行之间保留的任何用户数据。
readCount已成功读取的项目数。
writeCount已成功写入的项目数。
COMMITCOUNT已为此执行提交的 transactions 数。
rollbackCountStep控制的业务 transaction 已回滚的次数。
readSkipCountread失败的次数,导致跳过 item。
processSkipCountprocess失败的次数,导致跳过 item。
filterCountItemProcessor“过滤”的项目数。
writeSkipCountwrite失败的次数,导致跳过 item。

1.3. 执行上下文

ExecutionContext表示 key/value 对的集合,由 order 中的 framework 持久化和控制,以允许开发人员放置 store 持久 state,其范围限定为StepExecution object 或JobExecution object。对于熟悉 Quartz 的人来说,它与 JobDataMap 非常相似。最佳用法 example 是为了方便重启。使用平面文件输入作为 example,在处理单个 lines 时,framework 会定期在提交点保持ExecutionContext。这样做允许ItemReader store 存储 state,以防在 run 期间发生致命错误或者即使断电也是如此。所需要的只是将 lines 的当前数量读入 context,如下面的 example 所示,framework 将执行 rest:

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());

使用Job Stereotypes 部分中的 EndOfDay example 作为 example,假设有一个 step,'loadData',它将文件加载到数据库中。在第一次失败的 run 之后,元数据表看起来像以下 example:

JOB_INST_IDJOBNAME
1EndOfDayJob
JOB_INST_IDTYPE_CDKEYNAMEDATE_VAL
1日期schedule.Date2017-01-01
JOB_EXEC_IDJOB_INST_IDSTARTTIMEENDTIME状态
112017-01-01 21:002017-01-01 21:30失败
STEP_EXEC_IDJOB_EXEC_IDSTEPNAMESTARTTIMEENDTIME状态
11loadData2017-01-01 21:002017-01-01 21:30失败
STEP_EXEC_IDSHORTCONTEXT
1

在前面的情况中,Step运行了 30 分钟并处理了 40,321 个“件”,这将代表此方案中文件中的 lines。此 value 在 framework 每次提交之前更新,并且可以包含与ExecutionContext中的条目对应的多个行。在提交之前得到通知需要各种StepListener __mplement(或ItemStream)之一,本指南后面将对此进行更详细的讨论。与前面的示例一样,假设Job在第二天重新启动。重新启动时,最后一个 run 的ExecutionContext中的值将从数据库重新构建。当ItemReader打开时,它可以检查 context 中是否有任何存储的 state 并从那里初始化自身,如下面的 example 所示:

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
    log.debug("Initializing for restart. Restart data is: " + executionContext);

    long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

    LineReader reader = getReader();

    Object record = "";
    while (reader.getPosition() < lineCount && record != null) {
        record = readLine();
    }
}

在这种情况下,在上面的 code 运行之后,当前的 line 为 40,322,允许Step从它停止的位置再次启动。 ExecutionContext也可以用于需要持久保存 run 本身的统计信息。例如,如果平面文件包含跨多个 lines 存在的处理订单,则可能需要存储已处理的订单数量(这与读取的 lines 数量大不相同),以便可以在以下位置发送电子邮件Step的结尾与正文中处理的订单总数。 framework 处理为开发人员存储这个,在 order 中使用单独的JobInstance正确地对其进行范围化。要知道是否应该使用现有的ExecutionContext可能非常困难。例如,使用上面的'EndOfDay'example,当 01-01 run 再次启动第二个 time 时,framework 会识别它是相同的JobInstance,并且在Step的基础上,将ExecutionContext拉出数据库,然后交出它(作为StepExecution的一部分)到Step本身。相反,对于 01-02 run,framework 识别它是一个不同的实例,因此必须将空 context 传递给Step。 framework 为开发人员提供了许多类型的确定,以确保 state 在正确的 time 时被赋予它们。同样重要的是要注意,在任何给定的 time,每StepExecution只存在一个ExecutionContext。 的客户端应该小心,因为这会创建一个共享密钥空间。因此,在放入值时应小心,以确保不会覆盖任何数据。但是,Step stores 绝对没有 context 中的数据,所以没有办法对 framework 产生负面影响。

同样重要的是要注意每至少有一个ExecutionContext,每个StepExecution至少有一个ExecutionContext。对于 example,请考虑以下 code 代码段:

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob

如 comment 中所述,ecStep不等于ecJob。他们是两个不同的ExecutionContexts。作用于Step的一个保存在Step中的每个提交点,而作用于 Job 的一个保存在每个Step执行之间。

1.4. JobRepository

JobRepository是上面提到的所有原型的持久性机制。它为JobLauncherJobStep __mplement 提供 CRUD 操作。首次启动Job时,从 repository 获取JobExecution,并且在执行过程中,StepExecutionJobExecution_mplement 通过将它们传递给 repository 来保持。

批处理命名空间支持使用<job-repository>标记配置JobRepository实例,如以下 example 所示:

<job-repository id="jobRepository"/>

使用 java configuration 时,@EnableBatchProcessing annotation 提供JobRepository作为自动配置的组件之一。

1.5. JobLauncher

JobLauncher表示一个简单的接口,用于使用给定的JobParameters集启动Job,如下面的示例所示:

public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

期望 implementations 从JobRepository获得有效JobExecution并执行Job

1.6. Item Reader

ItemReader是一个抽象,表示的输入检索,一个 item 在 time。当ItemReader已经耗尽它可以提供的项目时,它通过返回null来指示这一点。有关ItemReader接口及其各种 implementations 的更多详细信息,请参见Readers 和 Writers。

1.7. Item Writer

ItemWriter是一个抽象,表示Step的输出,一个批次或一大块项目在 time。通常,ItemWriter不知道它接下来应该接收的输入,并且只知道在其当前调用中传递的 item。有关ItemWriter接口及其各种 implementations 的更多详细信息,请参见Readers 和 Writers。

1.8. Item 处理器

ItemProcessor是一个抽象,表示 item 的业务处理。当ItemReader读取一个 item,ItemWriter写入它们时,ItemProcessor提供了一个转换或应用其他业务处理的访问点。如果在处理 item 时确定 item 无效,则返回null表示不应写出 item。有关ItemProcessor接口的更多详细信息可以在Readers 和 Writers中找到。

1.9. 批处理命名空间

之前列出的许多 domain 概念需要在 Spring ApplicationContext中配置。虽然可以在标准 bean 定义中使用上述接口的 implementations,但是为了便于 configuration,已经提供了一个名称空间,如下面的示例所示:

<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   https://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/batch
   https://www.springframework.org/schema/batch/spring-batch.xsd">

<job id="ioSampleJob">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
        </tasklet>
    </step>
</job>

</beans:beans>

如果已声明批处理命名空间 long,则可以使用其任何元素。有关配置 Job 的更多信息可以在配置和 Running 一个 Job中找到。有关配置Step的更多信息可以在配置 Step中找到。