3. 批处理的 Domain 语言

对于任何经验丰富的批处理架构师来说,Spring Batch 中使用的批处理的整体概念应该是熟悉和舒适的。有“Jobs”和“Steps”以及开发人员提供的称为 ItemReaders 和 ItemWriters 的处理单元。但是,由于 Spring 模式,操作,模板,回调和习语,有以下机会:

  • 坚持明确分离关注点的重大改进

  • 清楚地描述了作为接口提供的架构层和服务

  • 简单和默认的 implementations,允许快速采用和易用 out-of-the-box

  • 显着增强的可扩展性

下图是已经使用了几十年的 batch reference architecture 的简化 version。它概述了构成批处理的 domain 语言的组件。这个 architecture framework 是一个蓝图,已经在过去几代平台(COBOL/Mainframe,C /Unix,现在 Java/anywhere)中经过数十年的实现证明。 JCL 和 COBOL 开发人员可能对 C,C#和 Java 开发人员的概念感到满意。 Spring Batch 提供了在健壮,可维护的系统中常见的层,组件和技术服务的物理实现,用于解决简单到复杂的批处理应用程序的创建,使用基础结构和 extensions 来满足非常复杂的处理需求。

图 2.1:批量刻板印象

上图突出显示了构成批处理的 domain 语言的 key 概念。 Job 有一个到多个步骤,它们只有一个 ItemReader,ItemProcessor 和 ItemWriter。需要启动 job(JobLauncher),并且需要存储有关当前 running process 的元数据(JobRepository)。

3.1 Job

本节描述了与批 job 概念相关的构造型。 Job是封装整个批处理 process 的实体。与 common 与其他 Spring 项目一样,Job将通过 XML configuration 文件或基于 Java 的 configuration 连接在一起。该 configuration 可以称为“job configuration”。但是,Job只是整体层次结构的顶部:

在 Spring Batch 中,Job 只是 Steps 的容器。它结合了逻辑上属于流的多个步骤,并允许 properties global 配置到所有步骤,例如可重启性。 job configuration 包含:

  • job 的简单 name

  • 步骤的定义和 ordering

  • job 是否可重启

接口的默认简单 implementation 由 Spring Batch 以SimpleJob class 的形式提供,它在Job之上创建了一些标准功能,但批处理命名空间抽象了直接实例化它的需要。相反,可以使用<job>标签:

<job id="footballJob">
    <step id="playerload" next="gameLoad"/>
    <step id="gameLoad" next="playerSummarization"/>
    <step id="playerSummarization"/>
</job>

3.1.1 JobInstance

指的是逻辑 job run 的概念。让我们考虑批处理 job,它应该在一天结束时运行一次,例如上图中的'EndOfDay'job。有一个'EndOfDay'Job,但必须单独跟踪Job的每个 run。在这个 job 的情况下,每天将有一个逻辑JobInstance。例如,将有 1 月 1 日 run 和 1 月 2 日 run。如果 1 月 1 日 run 在第一个 time 失败并且在第二天再次运行,那么它仍然是 1 月 1 日 run。 (通常这与它正在处理的数据相对应,这意味着 1 月 1 日 run 处理 1 月 1 日的数据,等等)。因此,每个JobInstance可以有多个执行(JobExecution将在下面更详细地讨论),并且只有一个JobInstance对应于特定的Job并且识别JobParameters 可以在给定的 time 运行 running。

JobInstance的定义与将要加载的数据完全无关。完全由ItemReader implementation 决定如何加载数据。例如,在 EndOfDay 场景中,数据上可能有一列指示数据所属的'effective date'或'schedule date'。因此,1 月 1 日 run 只会加载第 1 个数据,而 1 月 2 日 run 只会使用第 2 个数据。因为这个决定可能是一个商业决策,所以由ItemReader决定。然而,使用相同JobInstance的内容将决定是否使用先前执行中的'state'(i.e.ExecutionContext,将在下面讨论)。使用新的JobInstance将意味着“从头开始”并使用现有实例通常意味着“从您离开的地方开始”。

3.1.2 JobParameters

在讨论了JobInstance以及它与Job的区别之后,要问的一个自然问题是:“一个JobInstance如何区别于另一个?”答案是:JobParametersJobParameters是用于启动批 job 的一组参数。在 run 期间,它们可用于识别甚至作为 reference 数据:

在上面的例子中,有两个实例,一个用于 1 月 1 日,另一个用于 1 月 2 日,实际上只有一个 Job,一个用__ob 参数 01-01-2008 启动,另一个用 01-02-2008 参数启动。因此,contract 可以定义为:JobInstance = Job标识JobParameters。这允许开发人员有效地控制JobInstance的定义方式,因为它们控制传入的参数。

并非所有 job 参数都需要有助于识别JobInstance。默认情况下它们会这样做,但 framework 允许提交带有参数的Job,这些参数也不会影响JobInstance的身份。

3.1.3 JobExecution

A JobExecution指的是单次尝试运行Job的技术概念。执行可能以失败或成功结束,但除非执行成功完成,否则与给定执行相对应的JobInstance将不会被视为完成。使用上面描述的 EndOfDay Job作为 example,考虑为 01-01-2008,它在第一次 time 时失败 run。如果使用与第一个 run(01-01-2008)相同的标识 job 参数再次 run,则将创建一个新的JobExecution。但是,仍然只有一个JobInstance

Job定义 job 是什么以及如何执行它,JobInstance是一个纯粹的组织 object 到 group 执行,主要是为了启用正确的重启语义。但是,JobExecution是 run 期间实际发生的事件的主要存储机制,因此包含许多必须被控制和持久化的 properties:

表格 1_.JobExecution Properties

状态一个BatchStatus object,指示执行的状态。 _运行时,它是 BatchStatus.STARTED,如果失败,它是 BatchStatus.FAILED,如果它成功完成,它是 BatchStatus.COMPLETED
开始时间一个java.util.Date表示执行开始时的当前系统 time。
时间结束一个java.util.Date表示执行完成时的当前系统 time,无论它是否成功。
退出状态ExitStatus表示 run 的结果。这是最重要的,因为它包含一个将返回给调用者的 exit code。有关详细信息,请参阅第 5 章。
createTime表示第一次持久保存JobExecution时的当前系统 time。 job 可能尚未启动(因此没有 start time),但它总是有一个 createTime,这是 framework 管理 job level ExecutionContext s 所需的。
最近更新时间一个java.util.Date代表最后一次JobExecution持续存在。
执行上下文'property bag'包含需要在执行之间保留的任何用户数据。
failureExceptions执行Job期间遇到的 exceptions 列表。如果在Job失败期间遇到多个 exception,这些可能很有用。

这些 properties 很重要,因为它们将被持久化并可用于完全确定执行的状态。例如,如果 01-01 的 EndOfDay job 在 9:00 PM 执行,并且在 9:30 处失败,则将在批处理元数据表中进行以下输入:

表格 1_.BATCHJOB_INSTANCE

JOB_INST_IDJOBNAME
1EndOfDayJob

表格 1_.BATCHJOB_EXECUTION_PARAMS

JOB_EXECUTION_IDTYPE_CDKEYNAMEDATE_VALIDENTIFYING
1日期schedule.Date2008-01-01真正

表格 1_.BATCHJOB_EXECUTION

JOB_EXEC_IDJOB_INST_IDSTARTTIMEENDTIME状态
112008-01-01 21:002008-01-01 21:30失败

为清晰和格式化,列名可能已缩写或删除

既然 job 已经失败了,我们假设确定了问题的整个过程,以便“批处理窗口”现在关闭。假设窗口在 9:00 PM 开始,job 将再次从 01-01 开始,从它停止并从 9:30 成功完成开始。因为它现在是第二天,所以 01-02 job 也必须 run,之后在 9:31 开始,并在 10:30 正常的一小时 time 完成。除非两个作业有可能尝试访问相同的数据,否则不要求一个JobInstance一个接一个地启动,导致锁定数据库 level 的问题。完全由调度程序决定Job应该 run 的时间。由于它们是分开的JobInstance,Spring Batch 将不会试图阻止它们同时运行。 (尝试 run 相同JobInstance而另一个已经 running 将导致JobExecutionAlreadyRunningException被抛出)。现在,JobInstanceJobParameters表中应该有一个额外的条目,JobExecution table 中还有两个额外的条目:

表格 1_.BATCHJOB_INSTANCE

JOB_INST_IDJOBNAME
1EndOfDayJob
2EndOfDayJob

表格 1_.BATCHJOB_EXECUTION_PARAMS

JOB_EXECUTION_IDTYPE_CDKEYNAMEDATE_VALIDENTIFYING
1日期schedule.Date2008-01-01 00:00:00真正
2日期schedule.Date2008-01-01 00:00:00真正
3日期schedule.Date2008-01-02 00:00:00真正

表格 1_.BATCHJOB_EXECUTION

JOB_EXEC_IDJOB_INST_IDSTARTTIMEENDTIME状态
112008-01-01 21:002008-01-01 21:30失败
212008-01-02 21:002008-01-02 21:30已完成
322008-01-02 21:312008-01-02 22:29已完成

为清晰和格式化,列名可能已缩写或删除

3.2 Step

Step是一个 domain object,它封装了批 job 的独立顺序阶段。因此,每个Job完全由一个或多个步骤组成。 Step包含定义和控制实际批处理所需的所有信息。这是一个必然模糊的描述,因为任何给定Step的内容由开发人员自行决定编写Job。 Step 可以像开发人员所希望的那样简单或复杂。一个简单的Step可能会将文件中的数据加载到数据库中,几乎不需要 code。 (取决于所使用的 implementations)更复杂的Step可能具有复杂的业务规则,这些规则作为处理的一部分应用。与Job一样,Step具有与唯一JobExecution对应的单个StepExecution

3.2.1 StepExecution

StepExecution表示执行Step的单次尝试。每创建一个新的StepExecution Step是 run,类似于JobExecution。但是,如果 step 无法执行,因为它之前的 step 失败,则不会继续执行它。仅在实际启动时才会创建StepExecution

Step 执行由StepExecution class 的 objects 表示。每个执行包含对应的 step 和JobExecution的 reference,以及 transaction 相关的数据,例如提交和回滚计数以及开始和结束时间。此外,每个 step 执行都将包含ExecutionContext,其中包含开发人员在批处理运行中需要保留的任何数据,例如重新启动所需的统计信息或 state 信息。以下是StepExecution的 properties 列表:

表格 1_.StepExecution Properties

状态一个BatchStatus object,指示执行的状态。当它运行时,状态为 BatchStatus.STARTED,如果失败,状态为 BatchStatus.FAILED,如果成功完成,状态为 BatchStatus.COMPLETED
开始时间一个java.util.Date表示执行开始时的当前系统 time。
时间结束一个java.util.Date表示执行完成时的当前系统 time,无论它是否成功。
退出状态ExitStatus表示执行结果。这是最重要的,因为它包含一个将返回给调用者的 exit code。有关详细信息,请参阅第 5 章。
执行上下文'property bag'包含需要在执行之间保留的任何用户数据。
readCount已成功读取的项目数
writeCount已成功写入的项目数
COMMITCOUNT已为此执行提交的 transactions 数
rollbackCountStep控制的业务 transaction 已回滚的次数。
readSkipCountread失败的次数,导致跳过 item。
processSkipCountprocess失败的次数,导致跳过 item。
filterCountItemProcessor“过滤”的项目数。
writeSkipCountwrite失败的次数,导致跳过 item。

3.3 ExecutionContext

ExecutionContext表示 key/value 对的集合,由 order 中的 framework 持久化和控制,以允许开发人员放置 store 持久 state,其范围限定为StepExecutionJobExecution。对于熟悉 Quartz 的人来说,它与JobDataMap非常相似。最佳用法 example 是为了方便重启。使用平面文件输入作为 example,在处理单个 lines 时,framework 会定期在提交点保持ExecutionContext。如果在 run 期间发生致命错误,或者即使电源耗尽,这也允许ItemReader store 存储 state。所需要的只是将 lines 的当前数量读入 context,framework 将执行 rest:

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());

使用 Job Stereotypes 部分中的 EndOfDay example 作为 example,假设有一个 step:'loadData',它将文件加载到数据库中。在第一次失败的 run 之后,元数据表将如下所示:

表格 1_.BATCHJOB_INSTANCE

JOB_INST_IDJOBNAME
1EndOfDayJob

表格 1_.BATCHJOB_PARAMS

JOB_INST_IDTYPE_CDKEYNAMEDATE_VAL
1日期schedule.Date2008-01-01

表格 1_.BATCHJOB_EXECUTION

JOB_EXEC_IDJOB_INST_IDSTARTTIMEENDTIME状态
112008-01-01 21:002008-01-01 21:30失败

表格 1_.BATCHSTEP_EXECUTION

STEP_EXEC_IDJOB_EXEC_IDSTEPNAMESTARTTIMEENDTIME状态
11loadDate2008-01-01 21:002008-01-01 21:30失败

表格 1_.BATCHSTEP_EXECUTIONCONTEXT

STEP_EXEC_IDSHORTCONTEXT
1

在这种情况下,Step运行了 30 分钟并处理了 40,321 个“件”,这将代表此方案中文件中的 lines。此 value 将在 framework 每次提交之前更新,并且可以包含与ExecutionContext中的条目对应的多个行。在提交之前得到通知需要各种StepListenerItemStream之一,本指南后面将对此进行更详细的讨论。与前面的示例一样,假设Job在第二天重新启动。重新启动时,最后一个 run 的ExecutionContext中的值是从数据库重构的,当ItemReader打开时,它可以检查 context 中是否有任何存储的 state,并从那里初始化自己:

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
    log.debug("Initializing for restart. Restart data is: " + executionContext);

    long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

    LineReader reader = getReader();

    Object record = "";
    while (reader.getPosition() < lineCount && record != null) {
        record = readLine();
    }
}

在这种情况下,执行上述 code 后,当前 line 将为 40,322,允许Step从它停止的位置再次启动。 ExecutionContext也可以用于需要持久保存 run 本身的统计信息。例如,如果平面文件包含跨多个 lines 存在的处理订单,则可能需要存储已处理的订单数量(这与读取的 lines 数量大不相同),以便可以在以下位置发送电子邮件Step的结尾与正文中处理的总订单。 framework 处理为开发人员存储这个,在 order 中使用单独的JobInstance正确地对其进行范围化。要知道是否应该使用现有的ExecutionContext可能非常困难。例如,使用上面的'EndOfDay'example,当 01-01 run 再次启动第二个 time 时,framework 会识别它是相同的JobInstance,并且在Step的基础上,将ExecutionContext拉出数据库并将其作为StepExecutionStep本身的一部分。相反,对于 01-02 run,framework 识别它是一个不同的实例,因此必须将空 context 传递给Step。 framework 为开发人员提供了许多这类类型的确定,以确保在正确的 time 时为它们提供 state。同样重要的是要注意,在任何给定的 time,每StepExecution只存在一个ExecutionContext。 的客户端应该小心,因为这会创建一个共享键空间,因此在放入值时应小心,以确保不会覆盖任何数据。但是,Step stores 绝对没有 context 中的数据,所以没有办法对 framework 产生负面影响。

同样重要的是要注意每至少有一个ExecutionContext,每个StepExecution至少有一个ExecutionContext。对于 example,请考虑以下 code 代码段:

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob

如 comment 中所述,ecStep 将不等于 ecJob;他们是两个不同的ExecutionContext。作用于Step的那个将保存在Step中的每个提交点,而作用于Job的那个将保存在每个Step执行之间。

3.4 JobRepository

JobRepository是上面提到的所有原型的持久性机制。它为JobLauncherJobStep __mplement 提供 CRUD 操作。当首次启动Job时,从 repository 获取JobExecution,并且在执行过程中StepExecutionJobExecution_mplemplement 通过将它们传递给 repository 来持久化:

<job-repository id="jobRepository"/>

3.5 JobLauncher

JobLauncher表示一个简单的接口,用于使用给定的JobParameters集启动Job

public interface JobLauncher {

    public JobExecution run(Job job, JobParameters jobParameters)
                throws JobExecutionAlreadyRunningException, JobRestartException;
}

期望 implementations 将从JobRepository获得有效JobExecution并执行Job

3.6 Item Reader

ItemReader是一个抽象,表示的输入检索,一个 item 在 time。当ItemReader已经耗尽它可以提供的项目时,它将通过返回 null 来指示这一点。有关ItemReader接口及其各种 implementations 的更多详细信息,请参见第 6 章,ItemReaders 和 ItemWriters

3.7 Item Writer

ItemWriter是一个抽象,表示Step的输出,一个批次或一大块项目在 time。通常,item writer 不知道接下来会接收的输入,只知道在当前调用中传递的 item。有关ItemWriter接口及其各种 implementations 的更多详细信息,请参见第 6 章,ItemReaders 和 ItemWriters

3.8 Item Processor

ItemProcessor是一个抽象,表示 item 的业务处理。当ItemReader读取一个 item,ItemWriter写入它们时,ItemProcessor提供转换或应用其他业务处理的访问权限。如果在处理 item 时确定 item 无效,则返回 null 表示不应写出 item。有关 ItemProcessor 接口的更多详细信息,请参见第 6 章,ItemReaders 和 ItemWriters

3.9 批处理命名空间

上面列出的许多 domain 概念需要在 Spring ApplicationContext中配置。虽然可以在标准 bean 定义中使用上述接口的 implementations,但是为了便于 configuration,已经提供了一个名称空间:

<beans:beans xmlns="http://www.springframework.org/schema/batch"
     xmlns:beans="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/batch
           http://www.springframework.org/schema/batch/spring-batch-2.2.xsd">

    <job id="ioSampleJob">
        <step id="step1">
            <tasklet>
                <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
            </tasklet>
        </step>
    </job>

</beans:beans>

如果已声明批处理命名空间 long,则可以使用其任何元素。有关配置Job的更多信息可以在第 4 章,配置和运行 Job中找到。有关配置 Step 的更多信息,请参见第 5 章,配置 Step

Updated at: 9 months ago
2.5. SQLite 支持Table of content4. 配置和 Running 一个 Job