1. 批处理的域语言

对于任何经验丰富的批处理设计师而言,Spring Batch 中使用的批处理的总体概念应该是熟悉且舒适的。有“工作”和“步骤”以及开发人员提供的称为ItemReaderItemWriter的处理单元。但是,由于存在 Spring 模式,操作,模板,回调和惯用语,因此有以下机会:

下图是已使用了数十年的批处理参考体系结构的简化版本。它概述了组成批处理领域语言的组件。该体系结构框架是一个蓝图,已经在最后几代平台(COBOL/Mainframe,C/Unix,现在是 Java /任何地方)上数十年的实现中得到了证明。 JCL 和 COBOL 开发人员可能会像 C,C#和 Java 开发人员一样熟悉这些概念。 Spring Batch 提供了层,组件和技术服务的物理实现,这些层,组件和技术服务通常在健壮,可维护的系统中找到,这些系统用于解决从简单到复杂的批处理应用程序的创建,其基础结构和扩展可以满足非常复杂的处理需求。

图 1.批处理原型

上图突出显示了构成 Spring Batch 领域语言的关键概念。作业有一个到多个步骤,每个步骤恰好具有一个ItemReader,一个ItemProcessor和一个ItemWriter。需要启动一个作业(使用JobLauncher),并且需要存储有关当前正在运行的进程的元数据(在JobRepository中)。

1.1. Job

本节描述与批处理作业的概念有关的构造型。 Job是封装整个批处理过程的实体。与其他 Spring Item 一样,Job与 XML 配置文件或基于 Java 的配置连接在一起。该配置可以被称为“作业配置”。但是,Job只是整个层次结构的顶部,如下图所示:

图 2.作业层次结构

在 Spring Batch 中,Job只是Step实例的容器。它组合了逻辑上属于流程的多个步骤,并允许配置所有步骤全局的属性,例如可重新启动性。作业配置包含:

Spring Batch 以SimpleJob类的形式提供了 Job 接口的默认简单实现,它在Job之上创建了一些标准功能。使用基于 Java 的配置时,将生成一个生成器集合以实例化Job,如以下示例所示:

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .end()
                     .build();
}

Spring Batch 以SimpleJob类的形式提供了Job接口的默认简单实现,该实现在Job之上创建了一些标准功能。但是,批处理名称空间消除了直接实例化它的需要。相反,可以使用<job>标签,如以下示例所示:

<job id="footballJob">
    <step id="playerload" next="gameLoad"/>
    <step id="gameLoad" next="playerSummarization"/>
    <step id="playerSummarization"/>
</job>

1.1.1. JobInstance

JobInstance表示逻辑作业运行的概念。考虑一个应该在一天结束时运行一次的批处理作业,例如上图中的“ EndOfDay” Job。有一个“ EndOfDay”作业,但是必须单独跟踪Job的每个单独运行。对于这项工作,每天只有逻辑JobInstance。例如,有 1 月 1 日运行,1 月 2 日运行,依此类推。如果 1 月 1 日运行第一次失败并在第二天再次运行,则仍是 1 月 1 日运行。 (通常,这也与它正在处理的数据相对应,这意味着 1 月 1 日运行处理 1 月 1 日的数据)。因此,每个JobInstance可以有多个执行(本章后面将更详细地讨论JobExecution),并且在给定的时间只能运行一个对应于特定Job并标识JobParametersJobInstance

JobInstance的定义绝对与要加载的数据无关。确定如何加载数据完全取决于ItemReader实现。例如,在 EndOfDay 场景中,数据上可能有一列指示该数据所属的“生效日期”或“计划日期”。因此,1 月 1 日的运行将仅加载第 1 次的数据,而 1 月 2 日的运行将仅使用第 2 次的数据。由于此确定可能是业务决策,因此应由ItemReader来决定。但是,使用相同的JobInstance将确定是否使用先前执行中的“状态”(即ExecutionContext,本章后面将讨论)。使用新的JobInstance意味着“从头开始”,而使用现有实例通常意味着“从您离开的地方开始”。

1.1.2. JobParameters

在讨论了JobInstance及其与 Job 的区别之后,自然要问的问题是:“一个JobInstance与另一个有何区别?”答案是:JobParametersJobParameters对象包含一组用于启动批处理作业的参数。它们可以在运行期间用于标识甚至用作参考数据,如下图所示:

图 3.作业参数

在前面的示例中,有两个实例,一个实例是 1 月 1 日,另一个实例是 1 月 2 日,实际上只有一个Job,但是它有两个JobParameter对象:一个是用 job 参数 01-01-2017 启动的另一个以参数 01-02-2017 开头。因此,Contract 可以定义为:JobInstance = Job标识JobParameters。这使开发人员可以有效地控制JobInstance的定义方式,因为他们可以控制传入的参数。

Note

并非所有作业参数都需要有助于JobInstance的标识。默认情况下,它们会这样做。但是,该框架还允许提交带有对JobInstance的身份无贡献的参数的Job

1.1.3. JobExecution

JobExecution是指一次尝试运行 Job 的技术概念。执行可能以失败或成功结束,但是与给定执行相对应的JobInstance除非执行成功完成,否则不认为是完整的。使用前面描述的 EndOfDay Job作为示例,考虑到 01-01-2017 的JobInstance首次运行失败。如果使用与第一次运行(01-01-2017)相同的标识作业参数再次运行,则会创建一个新的JobExecution。但是,仍然只有一个JobInstance

Job定义什么是作业及其执行方式,而JobInstance是纯粹的组织对象,用于将执行分组在一起,主要是为了实现正确的重新启动语义。 JobExecution是运行期间实际发生情况的主要存储机制,并且包含许多必须控制和持久化的属性,如下表所示:

表 1. JobExecution 属性

Property Definition
Status BatchStatus对象,指示执行状态。在运行时,它是BatchStatus#STARTED。如果失败,则为BatchStatus#FAILED。如果成功完成,则为BatchStatus#COMPLETED
startTime java.util.Date代表执行开始时的当前系统时间。如果作业尚未开始,则此字段为空。
endTime java.util.Date代表执行完成时的当前系统时间,无论执行是否成功。如果作业尚未完成,则该字段为空。
exitStatus ExitStatus,指示运行结果。这是最重要的,因为它包含返回给调用者的退出代码。有关更多详细信息,请参见第 5 章。如果作业尚未完成,则该字段为空。
createTime java.util.Date代表首次保留JobExecution时的当前系统时间。作业可能尚未启动(因此没有启动时间),但是它始终具有 createTime,这是 Management 作业级别ExecutionContexts的框架所需的。
lastUpdated java.util.Date代表上一次保留JobExecution的时间。如果作业尚未开始,则此字段为空。
executionContext 包含需要在两次执行之间保留的所有用户数据的“属性包”。
failureExceptions 在执行Job时遇到的异常列表。如果在Job失败期间遇到多个异常,这些将很有用。

这些属性很重要,因为它们可以持久保存并且可以用来完全确定执行状态。例如,如果 01-01 的 EndOfDay 作业在 9:00 PM 执行而在 9:30 失败,则在批处理元数据表中进行以下 Importing:

*表 2. BATCH_JOB_INSTANCE *

JOB_INST_ID JOB_NAME
1 EndOfDayJob

*表 3. BATCH_JOB_EXECUTION_PARAMS *

JOB_EXECUTION_ID TYPE_CD KEY_NAME DATE_VAL IDENTIFYING
1 DATE schedule.Date 2017-01-01 TRUE

*表 4. BATCH_JOB_EXECUTION *

JOB_EXEC_ID JOB_INST_ID START_TIME END_TIME STATUS
1 1 2017-01-01 21:00 2017-01-01 21:30 FAILED

Note

为了清楚和格式化,列名可能已被缩写或删除。

现在工作失败了,假设确定问题已花费了一整夜,因此“批处理窗口”现在关闭了。进一步假设该窗口在 9:00 PM 开始,该作业将在 01-01 再次开始,从停止的地方开始,并在 9:30 成功完成。因为现在是第二天,所以也必须运行 01-02 作业,此作业随后才在 9:31 开始,并在正常的一小时时间内在 10:30 完成。无需先将一个JobInstance依次启动,除非这两个作业有可能尝试访问相同的数据,从而导致锁定数据库级别的问题。何时应运行Job完全取决于调度程序。由于它们是单独的JobInstances,因此 Spring Batch 不会尝试阻止它们同时运行。 (尝试在同一JobInstance上运行而另一个已经在运行中会导致JobExecutionAlreadyRunningException抛出)。 JobInstanceJobParameters表中现在都应该有一个额外的条目,而JobExecution表中应该有两个额外的条目,如下表所示:

*表 5. BATCH_JOB_INSTANCE *

JOB_INST_ID JOB_NAME
1 EndOfDayJob
2 EndOfDayJob

*表 6. BATCH_JOB_EXECUTION_PARAMS *

JOB_EXECUTION_ID TYPE_CD KEY_NAME DATE_VAL IDENTIFYING
1 DATE schedule.Date 2017-01-01 00:00:00 TRUE
2 DATE schedule.Date 2017-01-01 00:00:00 TRUE
3 DATE schedule.Date 2017-01-02 00:00:00 TRUE

*表 7. BATCH_JOB_EXECUTION *

JOB_EXEC_ID JOB_INST_ID START_TIME END_TIME STATUS
1 1 2017-01-01 21:00 2017-01-01 21:30 FAILED
2 1 2017-01-02 21:00 2017-01-02 21:30 COMPLETED
3 2 2017-01-02 21:31 2017-01-02 22:29 COMPLETED

Note

为了清楚和格式化,列名可能已被缩写或删除。

1.2. Step

Step是一个域对象,它封装了批处理作业的一个独立的 Sequences 阶段。因此,每个工作完全由一个或多个步骤组成。 Step包含定义和控制实际批处理所需的所有信息。这是一个模糊的描述,因为任何给定Step的内容都由开发人员自行编写Job来决定。 Step可以像开发人员期望的那样简单或复杂。简单的Step可能会将文件中的数据加载到数据库中,而只需要很少或不需要任何代码(取决于所使用的实现)。较复杂的Step可能具有复杂的业务规则,这些规则将在处理过程中应用。与Job一样,Step具有与唯一JobExecution相关联的单个StepExecution,如下图所示:

图 4.带有步骤的作业层次结构

1.2.1. StepExecution

StepExecution代表执行Step的单次尝试。每次运行Step都会创建一个新的StepExecution,类似于JobExecution。但是,如果某个步骤由于执行失败而无法执行,则不会 continue 执行。 StepExecution仅在其Step实际启动时创建。

Step执行由StepExecution类的对象表示。每个执行都包含对其相应步骤和JobExecution以及与事务相关的数据的引用,例如提交和回滚计数以及开始和结束时间。此外,每个步骤执行都包含一个ExecutionContext,其中包含开发人员在批处理运行中需要保留的所有数据,例如重新启动所需的统计信息或状态信息。下表列出了StepExecution的属性:

表 8. StepExecution 属性

Property Definition
Status BatchStatus对象,指示执行状态。在运行时,状态为BatchStatus.STARTED。如果失败,则状态为BatchStatus.FAILED。如果成功完成,则状态为BatchStatus.COMPLETED
startTime java.util.Date代表执行开始时的当前系统时间。如果步骤尚未开始,则此字段为空。
endTime java.util.Date代表执行完成时的当前系统时间,无论执行是否成功。如果步骤尚未退出,则此字段为空。
exitStatus ExitStatus表示执行结果。这是最重要的,因为它包含返回给调用者的退出代码。有关更多详细信息,请参见第 5 章。如果作业尚未退出,则此字段为空。
executionContext 包含需要在两次执行之间保留的所有用户数据的“属性包”。
readCount 已成功读取的 Item 数。
writeCount 已成功写入的 Item 数。
commitCount 为此执行已提交的事务数。
rollbackCount Step控制的业务事务已回滚的次数。
readSkipCount read失败的次数,导致 Item 被跳过。
processSkipCount read失败的次数,导致 Item 被跳过。
filterCount ItemProcessor已“过滤”的 Item 数。
writeSkipCount read失败的次数,导致 Item 被跳过。

1.3. ExecutionContext

ExecutionContext表示键/值对的集合,这些键/值对由框架保留并控制,以便允许开发人员放置一个范围为StepExecution对象或JobExecution对象的持久状态。对于那些熟悉 Quartz 的人来说,它与 JobDataMap 非常相似。最佳用法示例是促进重新启动。以平面文件 Importing 为例,在处理单独的行时,框架会定期在提交点保留ExecutionContext。这样做使ItemReader可以存储其状态,以防在运行期间发生致命错误,甚至断电。所需要做的就是将当前读取的行数放入上下文中,如下面的示例所示,框架将完成其余工作:

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());

Job Stereotypes 部分中的 EndOfDay 示例为例,假设有一个步骤“ loadData”将文件加载到数据库中。第一次失败运行后,元数据表将类似于以下示例:

*表 9. BATCH_JOB_INSTANCE *

JOB_INST_ID JOB_NAME
1 EndOfDayJob

*表 10. BATCH_JOB_EXECUTION_PARAMS *

JOB_INST_ID TYPE_CD KEY_NAME DATE_VAL
1 DATE schedule.Date 2017-01-01

*表 11. BATCH_JOB_EXECUTION *

JOB_EXEC_ID JOB_INST_ID START_TIME END_TIME STATUS
1 1 2017-01-01 21:00 2017-01-01 21:30 FAILED

*表 12. BATCH_STEP_EXECUTION *

STEP_EXEC_ID JOB_EXEC_ID STEP_NAME START_TIME END_TIME STATUS
1 1 loadData 2017-01-01 21:00 2017-01-01 21:30 FAILED

*表 13. BATCH_STEP_EXECUTION_CONTEXT *

STEP_EXEC_ID SHORT_CONTEXT
1 {piece.count=40321}

在前面的情况下,Step运行了 30 分钟并处理了 40321 个“件”,在这种情况下,这代表了文件中的行。此值在框架每次提交之前更新,并且可以包含与ExecutionContext内的条目相对应的多个行。在提交之前被通知需要各种StepListener实现(或ItemStream)之一,本指南后面将对此进行详细讨论。与前面的示例一样,假定Job在第二天重新启动。重新启动后,将从数据库重新构建上次运行的ExecutionContext中的值。打开ItemReader时,它可以检查它是否在上下文中具有任何存储状态,并从那里对其进行初始化,如以下示例所示:

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
    log.debug("Initializing for restart. Restart data is: " + executionContext);

    long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

    LineReader reader = getReader();

    Object record = "";
    while (reader.getPosition() < lineCount && record != null) {
        record = readLine();
    }
}

在这种情况下,运行上面的代码后,当前行为 40322,从而使Step从中断处重新开始。 ExecutionContext还可以用于需要保留有关运行本身的统计信息。例如,如果一个平面文件包含多行中存在的处理订单,则可能有必要存储已处理的订单数量(与读取的行数有很大不同),以便可以通过以下方式发送电子邮件: Step的末尾包含正文中已处理的订单总数。框架负责为开发人员存储此内容,以便使用单独的JobInstance正确确定范围。知道是否应使用现有的ExecutionContext可能非常困难。例如,使用上面的“ EndOfDay”示例,当 01-01 运行第二次再次开始时,框架将识别出它是相同的JobInstance并以单独的Step为基础,将ExecutionContext从数据库中拉出,并且将其(作为StepExecution的一部分)移交给Step本身。相反,对于 01-02 运行,框架会认识到它是一个不同的实例,因此必须将空上下文传递给Step。框架为开发人员做出了许多类型的确定,以确保在正确的时间将状态提供给开发人员。同样重要的是要注意,在任何给定时间,每个StepExecution仅存在一个ExecutionContextExecutionContext的 Client 端应该小心,因为这会创建共享的键空间。因此,在 Importing 值时应注意确保没有数据被覆盖。但是,Step绝对不会在上下文中存储任何数据,因此没有办法对框架产生不利影响。

同样重要的是要注意,每个JobExecution至少有一个ExecutionContext,每个StepExecution至少有一个ExecutionContext。例如,考虑以下代码片段:

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob

如 Comment 中所述,ecStep不等于ecJob。他们是两个不同的ExecutionContexts。范围为Step的一个保存在Step的每个提交点,而范围为 Job 的一个保存在每个Step执行之间。

1.4. JobRepository

JobRepository是上述所有构造型的持久性机制。它为JobLauncherJobStep实现提供 CRUD 操作。首次启动Job时,会从存储库中获取JobExecution,并且在执行过程中,通过将StepExecutionJobExecution实现传递到存储库中来实现它们。

批处理名称空间提供了对使用<job-repository>标记配置JobRepository实例的支持,如以下示例所示:

<job-repository id="jobRepository"/>

使用 Java 配置时,@EnableBatchProcessingComments 会提供JobRepository作为开箱即用自动配置的组件之一。

1.5. JobLauncher

JobLauncher代表一个简单的界面,用于使用给定的JobParameters集启动Job,如以下示例所示:

public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

期望实现从JobRepository获得有效的JobExecution并执行Job

1.6. itemReader

ItemReader是一种抽象,表示一次检索Step的 Importing。 ItemReader耗尽了它可以提供的 Item 后,将通过返回null来表明这一点。可以在Reader 和 Writer中找到有关ItemReader接口及其各种实现的更多详细信息。

1.7. ItemWriter

ItemWriter是一个抽象,表示一次Step,一个批次或大块 Item 的输出。通常,ItemWriter不知道接下来应该接收的 Importing,只知道当前调用中传递的 Item。可以在Reader 和 Writer中找到有关ItemWriter接口及其各种实现的更多详细信息。

1.8. Item 处理器

ItemProcessor是表示 Item 的业务处理的抽象。 ItemReader读取一个 Item,而ItemWriter写入一个 Item,而ItemProcessor提供了一个转换或应用其他业务处理的访问点。如果在处理 Item 时确定该 Item 无效,则返回null指示不应将该 Item 写出。有关ItemProcessor界面的更多详细信息,请参见Reader 和 Writer

1.9. 批处理命名空间

先前列出的许多域概念都需要在 Spring ApplicationContext中进行配置。尽管上述接口的实现可以在标准 bean 定义中使用,但已提供了命名空间以简化配置,如以下示例所示:

<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   https://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/batch
   https://www.springframework.org/schema/batch/spring-batch.xsd">

<job id="ioSampleJob">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
        </tasklet>
    </step>
</job>

</beans:beans>

只要声明了批处理名称空间,就可以使用其任何元素。有关配置作业的更多信息,请参见配置和运行作业。可以在配置步骤中找到有关配置Step的更多信息。

首页