3. 批处理的域语言

对于任何经验丰富的批处理设计师来说,Spring Batch 中使用的批处理的总体概念应该是熟悉且舒适的。有“作业”和“步骤”,以及开发人员提供的称为 ItemReaders 和 ItemWriters 的处理单元。但是,由于存在 Spring 模式,操作,模板,回调和惯用语,因此有以下机会:

  • 遵守明确的关注点方面的重大改进

  • 清楚地描述了作为接口提供的体系结构层和服务

  • 简单和默认的实现方式,可以快速采用和开箱即用

  • 大大增强了可扩展性

下图是已使用了数十年的批处理参考体系结构的简化版本。它概述了组成批处理领域语言的组件。该体系结构框架是一个蓝图,已经在最后几代平台(COBOL/Mainframe,C/Unix 和现在的 Java /任何地方)上数十年的实现中得到了证明。 JCL 和 COBOL 开发人员可能像 C,C#和 Java 开发人员一样熟悉这些概念。 Spring Batch 提供了层,组件和技术服务的物理实现,这些层,组件和技术服务通常用于强大,可维护的系统中,这些系统用于解决从简单到复杂的批处理应用程序的创建,其基础结构和扩展可以满足非常复杂的处理需求。

图 2.1:批量定型

上图突出显示了组成批处理域语言的关键概念。作业有一个到多个步骤,其中只有一个 ItemReader,ItemProcessor 和 ItemWriter。需要启动一个作业(JobLauncher),并且需要存储有关当前正在运行的进程的元数据(JobRepository)。

3.1 Job

本节描述与批处理作业的概念有关的构造型。 Job是封装整个批处理过程的实体。与其他 SpringItem 一样,Job将通过 XML 配置文件或基于 Java 的配置连接在一起。该配置可以被称为“作业配置”。但是,Job只是整体层次结构的顶部:

在 Spring Batch 中,作业只是“步骤”的容器。它组合了逻辑上属于流程的多个步骤,并允许配置所有步骤全局的属性,例如可重新启动性。作业配置包含:

  • 工作的简单名称

  • 步骤的定义和 Sequences

  • 作业是否可重新启动

Spring Batch 以SimpleJob类的形式提供了Job接口的默认简单实现,该类在Job之上创建了一些标准功能,但是 batch 名称空间消除了直接实例化它的需要。而是可以使用<job>标签:

<job id="footballJob">
    <step id="playerload" next="gameLoad"/>
    <step id="gameLoad" next="playerSummarization"/>
    <step id="playerSummarization"/>
</job>

3.1.1 JobInstance

JobInstance表示逻辑作业运行的概念。让我们考虑一个应该在一天结束时运行一次的批处理作业,例如上图中的“ EndOfDay”作业。有一个'EndOfDay'Job,但是必须分别跟踪Job的每个运行。对于这项工作,每天将有一个逻辑JobInstance。例如,将有 1 月 1 日运行和 1 月 2 日运行。如果 1 月 1 日运行第一次失败并在第二天再次运行,则仍是 1 月 1 日运行。 (通常,这也与它正在处理的数据相对应,这意味着 1 月 1 日运行将处理 1 月 1 日的数据,依此类推)。因此,每个JobInstance可以有多个执行(下面将更详细地讨论JobExecution),并且在给定的时间只能运行一个与特定Job并标识JobParameterJobInstance

JobInstance的定义绝对与将要加载的数据无关。完全取决于用于确定如何加载数据的ItemReader实现。例如,在 EndOfDay 场景中,数据上可能有一列指示该数据所属的“生效日期”或“计划日期”。因此,1 月 1 日的运行只会加载来自 1 号的数据,而 1 月 2 日的运行只会使用来自 2 号的数据。由于此确定可能是业务决策,因此应由ItemReader来决定。但是,使用相同的JobInstance将确定是否使用先前执行中的“状态”(即下面讨论的ExecutionContext)。使用新的JobInstance表示“从头开始”,而使用现有实例通常表示“从上次中断的地方开始”。

3.1.2 JobParameters

在讨论了JobInstance及其与Job的区别之后,自然要问的问题是:“一个JobInstance与另一个JobInstance有什么区别?”答案是:JobParametersJobParameters是用于启动批处理作业的一组参数。在运行期间,它们可以用于标识甚至用作参考数据:

在上面的示例中,有两个实例,一个实例是 1 月 1 日,另一个实例是 1 月 2 日,实际上只有一个 Job,一个实例的工作参数为 01-01-2008,另一个实例的工作参数为。参数 01-02-2008.因此,Contract 可以定义为:JobInstance = Job标识JobParameters。这使开发人员可以有效地控制JobInstance的定义方式,因为他们可以控制传入的参数。

Note

并非所有作业参数都需要有助于JobInstance的标识。默认情况下,它们会这样做,但是该框架允许使用参数也不会对JobInstance的身份作出贡献的Job提交。

3.1.3 JobExecution

JobExecution是指一次尝试运行Job的技术概念。执行可能以失败或成功结束,但是与给定执行相对应的JobInstance不会被视为完成,除非执行成功完成。以上面描述的 EndOfDay Job为例,考虑到 01-01-2008 的JobInstance首次运行失败。如果使用与第一次运行相同的标识作业参数再次运行(01-01-2008),将创建一个新的JobExecution。但是,仍然只有一个JobInstance

Job定义什么是作业及其执行方式,而JobInstance是纯粹的组织对象,用于将执行分组在一起,主要是为了实现正确的重启语义。 JobExecution是运行期间实际发生情况的主要存储机制,因此包含许多必须控制和持久化的属性:

表 3.1. JobExecution 属性

statusBatchStatus对象,指示执行状态。运行时为 BatchStatus.STARTED,如果失败,则为 BatchStatus.FAILED,如果成功完成,则为 BatchStatus.COMPLETED。
startTimejava.util.Date代表执行开始时的当前系统时间。
endTimejava.util.Date代表执行完成时的当前系统时间,无论执行是否成功。
exitStatusExitStatus表示运行结果。这是最重要的,因为它包含将返回给调用方的退出代码。有关更多详细信息,请参见第 5 章。
createTimejava.util.Date代表首次保留JobExecution时的当前系统时间。作业可能尚未启动(因此没有启动时间),但是它将始终具有 createTime,这是 Management 作业级别ExecutionContext的框架所要求的。
lastUpdatedjava.util.Date代表上一次保留JobExecution的时间。
executionContext“属性袋”包含两次执行之间需要保留的所有用户数据。
failureExceptions在执行Job时遇到的异常列表。如果在Job失败期间遇到多个异常,这些将很有用。

这些属性很重要,因为它们将被保留并可以用来完全确定执行状态。例如,如果 01-01 的 EndOfDay 作业在晚上 9:00 执行,但在 9:30 失败,则将在批处理元数据表中进行以下 Importing:

表 3.2. BATCH_JOB_INSTANCE

JOB_INST_IDJOB_NAME
1EndOfDayJob

表 3.3. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_IDTYPE_CDKEY_NAMEDATE_VALIDENTIFYING
1DATEschedule.Date2008-01-01TRUE

表 3.4. BATCH_JOB_EXECUTION

JOB_EXEC_IDJOB_INST_IDSTART_TIMEEND_TIMESTATUS
112008-01-01 21:002008-01-01 21:30FAILED

Note

为了清楚和格式化,列名可能已缩写或删除

现在,该工作已失败,我们假设确定问题已花费了一整夜,因此现在已关闭“批处理窗口”。假设窗口在 9:00 PM 开始,则该作业将在 01-01 再次开始,从停止的地方开始,并在 9:30 成功完成。因为现在是第二天,所以还必须运行 01-02 作业,此作业随后在 9:31 开始,并在正常的一小时时间内在 10:30 完成。无需先将一个JobInstance依次启动,除非这两个作业有可能尝试访问相同的数据,从而导致锁定数据库级别的问题。何时应运行Job完全取决于调度程序。由于它们是单独的JobInstance,因此 Spring Batch 将不会尝试阻止它们同时运行。 (尝试在另一个已经运行的同一个JobInstance上运行将导致抛出JobExecutionAlreadyRunningException)。 JobInstanceJobParameters表中现在都应该有一个额外的条目,而JobExecution表中应该有两个额外的条目:

表 3.5. BATCH_JOB_INSTANCE

JOB_INST_IDJOB_NAME
1EndOfDayJob
2EndOfDayJob

表 3.6. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_IDTYPE_CDKEY_NAMEDATE_VALIDENTIFYING
1DATEschedule.Date2008-01-01 00:00:00TRUE
2DATEschedule.Date2008-01-01 00:00:00TRUE
3DATEschedule.Date2008-01-02 00:00:00TRUE

表 3.7. BATCH_JOB_EXECUTION

JOB_EXEC_IDJOB_INST_IDSTART_TIMEEND_TIMESTATUS
112008-01-01 21:002008-01-01 21:30FAILED
212008-01-02 21:002008-01-02 21:30COMPLETED
322008-01-02 21:312008-01-02 22:29COMPLETED

Note

为了清楚和格式化,列名可能已缩写或删除

3.2 Step

Step是一个域对象,它封装了批处理作业的一个独立的 Sequences 阶段。因此,每个Job完全由一个或多个步骤组成。 Step包含定义和控制实际批处理所需的所有信息。这是一个模糊的描述,因为任何给定Step的内容都由开发人员自行编写Job来决定。步骤可以像开发人员所希望的那样简单或复杂。一个简单的Step可能会将文件中的数据加载到数据库中,几乎不需要代码。 (取决于所使用的实现方式)较复杂的Step可能具有复杂的业务规则,这些规则将在处理过程中应用。与Job一样,Step的个人StepExecution与唯一的JobExecution相对应:

3.2.1 StepExecution

StepExecution代表执行Step的单次尝试。每次运行Step都会创建一个新的StepExecution,类似于JobExecution。但是,如果某个步骤由于执行失败而无法执行,则该执行将不会 continue 执行。 StepExecution仅在其Step实际启动时创建。

步骤执行由StepExecution类的对象表示。每个执行都包含对其相应步骤和JobExecution的引用,以及与事务相关的数据,例如提交和回滚计数以及开始和结束时间。此外,每个步骤执行都将包含ExecutionContext,其中包含开发人员在批处理运行期间需要保留的任何数据,例如重新启动所需的统计信息或状态信息。以下是StepExecution的属性列表:

表 3.8. StepExecution 属性

statusBatchStatus对象,指示执行状态。运行时,状态为 BatchStatus.STARTED,如果失败,则状态为 BatchStatus.FAILED,如果成功完成,则状态为 BatchStatus.COMPLETED
startTimejava.util.Date代表执行开始时的当前系统时间。
endTimejava.util.Date代表执行完成时的当前系统时间,无论执行是否成功。
exitStatusExitStatus表示执行结果。这是最重要的,因为它包含将返回给调用方的退出代码。有关更多详细信息,请参见第 5 章。
executionContext“属性袋”包含两次执行之间需要保留的所有用户数据。
readCount已成功读取的 Item 数
writeCount已成功写入的 Item 数
commitCount为此执行已提交的事务数
rollbackCountStep控制的业务事务已回滚的次数。
readSkipCountread失败的次数,导致 Item 被跳过。
processSkipCountprocess失败的次数,导致 Item 被跳过。
filterCountItemProcessor已“过滤”的 Item 数。
writeSkipCountwrite失败的次数,导致 Item 被跳过。

3.3 ExecutionContext

ExecutionContext表示键/值对的集合,这些键/值对由框架保留并控制,以便允许开发人员放置一个存储范围为StepExecutionJobExecution的持久状态。对于熟悉 Quartz 的人来说,它与JobDataMap非常相似。最佳用法示例是促进重新启动。以平面文件 Importing 为例,在处理各个行时,框架会定期在提交点保留ExecutionContext。如果运行期间发生致命错误,或者即使断电,这也允许ItemReader存储其状态。需要做的就是将当前读取的行数放入上下文中,框架将完成其余工作:

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());

以 Job Stereotypes 部分的 EndOfDay 示例为例,假设有一个步骤:“ loadData”,它将文件加载到数据库中。第一次失败运行后,元数据表将如下所示:

表 3.9. BATCH_JOB_INSTANCE

JOB_INST_IDJOB_NAME
1EndOfDayJob

表 3.10. BATCH_JOB_PARAMS

JOB_INST_IDTYPE_CDKEY_NAMEDATE_VAL
1DATEschedule.Date2008-01-01

表 3.11. BATCH_JOB_EXECUTION

JOB_EXEC_IDJOB_INST_IDSTART_TIMEEND_TIMESTATUS
112008-01-01 21:002008-01-01 21:30FAILED

表 3.12. BATCH_STEP_EXECUTION

STEP_EXEC_IDJOB_EXEC_IDSTEP_NAMESTART_TIMEEND_TIMESTATUS
11loadDate2008-01-01 21:002008-01-01 21:30FAILED

表 3.13. BATCH_STEP_EXECUTION_CONTEXT

STEP_EXEC_IDSHORT_CONTEXT
1{piece.count=40321}

在这种情况下,Step运行了 30 分钟并处理了 40321 个“件”,在这种情况下,这代表了文件中的行。此值将在框架每次提交之前更新,并且可以包含与ExecutionContext内的条目相对应的多行。在提交之前被通知需要各种StepListenerItemStream之一,本指南后面将对此进行详细讨论。与前面的示例一样,假定Job在第二天重新启动。重新启动后,将从数据库中重新构建上次运行的ExecutionContext的值,打开ItemReader时,它可以检查其在上下文中是否具有任何存储状态,并从那里初始化自身:

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
    log.debug("Initializing for restart. Restart data is: " + executionContext);

    long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

    LineReader reader = getReader();

    Object record = "";
    while (reader.getPosition() < lineCount && record != null) {
        record = readLine();
    }
}

在这种情况下,执行上述代码后,当前行将为 40,322,从而允许Step从中断处重新开始。 ExecutionContext还可以用于需要保留有关运行本身的统计信息。例如,如果平面文件包含跨多行存在的处理订单,则可能有必要存储已处理的订单数量(与读取的行数有很大不同),以便可以通过以下方式发送电子邮件: Step的末尾在正文中处理了总订单。框架负责为开发人员存储此内容,以便使用单独的JobInstance正确确定范围。知道是否应使用现有的ExecutionContext可能非常困难。例如,使用上面的“ EndOfDay”示例,当 01-01 运行第二次再次开始时,框架将识别出它是相同的JobInstance,并且在单个Step的基础上,将ExecutionContext从数据库中移出并动手它是StepExecutionStep本身的一部分。相反,对于 01-02 运行,框架会认识到它是一个不同的实例,因此必须将空上下文传递给Step。框架使开发人员做出许多这类确定,以确保在正确的时间将状态提供给开发人员。同样重要的是要注意,在任何给定时间,每个StepExecution仅存在一个ExecutionContextExecutionContext的 Client 应小心,因为这会创建共享的键空间,因此在放入值时应格外小心,以确保不会覆盖任何数据。但是,Step绝对不会在上下文中存储任何数据,因此没有办法对框架产生不利影响。

同样重要的是要注意,每个JobExecution至少有一个ExecutionContext,每个StepExecution至少有一个ExecutionContext。例如,考虑以下代码片段:

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob

如 Comment 中所述,ecStep 将不等于 ecJob;他们是两个不同的ExecutionContext。范围为Step的一个将保存在Step的每个提交点,而范围为Job的一个将保存在每次Step执行之间。

3.4 JobRepository

JobRepository是上述所有构造型的持久性机制。它为JobLauncherJobStep实现提供 CRUD 操作。首次启动Job时,会从存储库中获取JobExecution,并且在执行过程中StepExecutionJobExecution的实现通过将其传递到存储库中而得以保留:

<job-repository id="jobRepository"/>

3.5 JobLauncher

JobLauncher代表一个简单的界面,用于使用给定的JobParameters集启动Job

public interface JobLauncher {

    public JobExecution run(Job job, JobParameters jobParameters)
                throws JobExecutionAlreadyRunningException, JobRestartException;
}

预期实现将从JobRepository获得有效的JobExecution并执行Job

3.6itemReader

ItemReader是一种抽象,表示一次检索Step的 Importing。当ItemReader用尽了它可以提供的 Item 时,它将通过返回 null 来表明这一点。可以在第 6 章,ItemReader 和 ItemWriters中找到有关ItemReader接口及其各种实现的更多详细信息。

3.7ItemWriter

ItemWriter是一个抽象,表示一次Step,一个批次或大块 Item 的输出。通常,ItemWriter 不知道下一步将要接收的 Importing,仅知道在当前调用中传递的 Item。有关ItemWriter接口及其各种实现的更多详细信息,可以在第 6 章,ItemReader 和 ItemWriters中找到。

3.8Item 处理器

ItemProcessor是表示 Item 的业务处理的抽象。 ItemReader读取一项,ItemWriter写入一项,而ItemProcessor提供访问权以进行转换或应用其他业务处理。如果在处理 Item 时确定该 Item 无效,则返回 null 表示不应将该 Item 写出。有关 ItemProcessor 接口的更多详细信息,请参见第 6 章,ItemReader 和 ItemWriters

3.9 批处理命名空间

上面列出的许多域概念都需要在 Spring ApplicationContext中进行配置。尽管可以在标准 bean 定义中使用上述接口的实现,但提供了命名空间以简化配置:

<beans:beans xmlns="http://www.springframework.org/schema/batch"
     xmlns:beans="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/batch
           http://www.springframework.org/schema/batch/spring-batch-2.2.xsd">

    <job id="ioSampleJob">
        <step id="step1">
            <tasklet>
                <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
            </tasklet>
        </step>
    </job>

</beans:beans>

只要声明了批处理名称空间,就可以使用其任何元素。有关配置Job的更多信息,请参见第 4 章,配置和运行作业。可以在第 5 章,配置步骤中找到有关配置步骤的更多信息。