On this page
1. 批处理的域语言
对于任何经验丰富的批处理设计师而言,Spring Batch 中使用的批处理的总体概念应该是熟悉且舒适的。有“工作”和“步骤”以及开发人员提供的称为ItemReader
和ItemWriter
的处理单元。但是,由于存在 Spring 模式,操作,模板,回调和惯用语,因此有以下机会:
遵守关注点明显分开的情况得到了显着改善。
清楚地描述了作为接口提供的体系结构层和服务。
简单和默认的实现,可以快速采用和开箱即用。
显着增强的可扩展性。
下图是已使用了数十年的批处理参考体系结构的简化版本。它概述了组成批处理领域语言的组件。该体系结构框架是一个蓝图,已经在最后几代平台(COBOL/Mainframe,C/Unix,现在是 Java /任何地方)上数十年的实现中得到了证明。 JCL 和 COBOL 开发人员可能会像 C,C#和 Java 开发人员一样熟悉这些概念。 Spring Batch 提供了层,组件和技术服务的物理实现,这些层,组件和技术服务通常在健壮,可维护的系统中找到,这些系统用于解决从简单到复杂的批处理应用程序的创建,其基础结构和扩展可以满足非常复杂的处理需求。
图 1.批处理原型
上图突出显示了构成 Spring Batch 领域语言的关键概念。作业有一个到多个步骤,每个步骤恰好具有一个ItemReader
,一个ItemProcessor
和一个ItemWriter
。需要启动一个作业(使用JobLauncher
),并且需要存储有关当前正在运行的进程的元数据(在JobRepository
中)。
1.1. Job
本节描述与批处理作业的概念有关的构造型。 Job
是封装整个批处理过程的实体。与其他 Spring Item 一样,Job
与 XML 配置文件或基于 Java 的配置连接在一起。该配置可以被称为“作业配置”。但是,Job
只是整个层次结构的顶部,如下图所示:
图 2.作业层次结构
在 Spring Batch 中,Job
只是Step
实例的容器。它组合了逻辑上属于流程的多个步骤,并允许配置所有步骤全局的属性,例如可重新启动性。作业配置包含:
作业的简单名称。
Step
个实例的定义和 Sequences。作业是否可重新启动。
Spring Batch 以SimpleJob
类的形式提供了 Job 接口的默认简单实现,它在Job
之上创建了一些标准功能。使用基于 Java 的配置时,将生成一个生成器集合以实例化Job
,如以下示例所示:
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.end()
.build();
}
Spring Batch 以SimpleJob
类的形式提供了Job
接口的默认简单实现,该实现在Job
之上创建了一些标准功能。但是,批处理名称空间消除了直接实例化它的需要。相反,可以使用<job>
标签,如以下示例所示:
<job id="footballJob">
<step id="playerload" next="gameLoad"/>
<step id="gameLoad" next="playerSummarization"/>
<step id="playerSummarization"/>
</job>
1.1.1. JobInstance
JobInstance
表示逻辑作业运行的概念。考虑一个应该在一天结束时运行一次的批处理作业,例如上图中的“ EndOfDay” Job
。有一个“ EndOfDay”作业,但是必须单独跟踪Job
的每个单独运行。对于这项工作,每天只有逻辑JobInstance
。例如,有 1 月 1 日运行,1 月 2 日运行,依此类推。如果 1 月 1 日运行第一次失败并在第二天再次运行,则仍是 1 月 1 日运行。 (通常,这也与它正在处理的数据相对应,这意味着 1 月 1 日运行处理 1 月 1 日的数据)。因此,每个JobInstance
可以有多个执行(本章后面将更详细地讨论JobExecution
),并且在给定的时间只能运行一个对应于特定Job
并标识JobParameters
的JobInstance
。
JobInstance
的定义绝对与要加载的数据无关。确定如何加载数据完全取决于ItemReader
实现。例如,在 EndOfDay 场景中,数据上可能有一列指示该数据所属的“生效日期”或“计划日期”。因此,1 月 1 日的运行将仅加载第 1 次的数据,而 1 月 2 日的运行将仅使用第 2 次的数据。由于此确定可能是业务决策,因此应由ItemReader
来决定。但是,使用相同的JobInstance
将确定是否使用先前执行中的“状态”(即ExecutionContext
,本章后面将讨论)。使用新的JobInstance
意味着“从头开始”,而使用现有实例通常意味着“从您离开的地方开始”。
1.1.2. JobParameters
在讨论了JobInstance
及其与 Job 的区别之后,自然要问的问题是:“一个JobInstance
与另一个有何区别?”答案是:JobParameters
。 JobParameters
对象包含一组用于启动批处理作业的参数。它们可以在运行期间用于标识甚至用作参考数据,如下图所示:
图 3.作业参数
在前面的示例中,有两个实例,一个实例是 1 月 1 日,另一个实例是 1 月 2 日,实际上只有一个Job
,但是它有两个JobParameter
对象:一个是用 job 参数 01-01-2017 启动的另一个以参数 01-02-2017 开头。因此,Contract 可以定义为:JobInstance
= Job
标识JobParameters
。这使开发人员可以有效地控制JobInstance
的定义方式,因为他们可以控制传入的参数。
Note
并非所有作业参数都需要有助于JobInstance
的标识。默认情况下,它们会这样做。但是,该框架还允许提交带有对JobInstance
的身份无贡献的参数的Job
。
1.1.3. JobExecution
JobExecution
是指一次尝试运行 Job 的技术概念。执行可能以失败或成功结束,但是与给定执行相对应的JobInstance
除非执行成功完成,否则不认为是完整的。使用前面描述的 EndOfDay Job
作为示例,考虑到 01-01-2017 的JobInstance
首次运行失败。如果使用与第一次运行(01-01-2017)相同的标识作业参数再次运行,则会创建一个新的JobExecution
。但是,仍然只有一个JobInstance
。
Job
定义什么是作业及其执行方式,而JobInstance
是纯粹的组织对象,用于将执行分组在一起,主要是为了实现正确的重新启动语义。 JobExecution
是运行期间实际发生情况的主要存储机制,并且包含许多必须控制和持久化的属性,如下表所示:
表 1. JobExecution 属性
Property | Definition |
Status | BatchStatus 对象,指示执行状态。在运行时,它是BatchStatus#STARTED 。如果失败,则为BatchStatus#FAILED 。如果成功完成,则为BatchStatus#COMPLETED |
startTime | java.util.Date 代表执行开始时的当前系统时间。如果作业尚未开始,则此字段为空。 |
endTime | java.util.Date 代表执行完成时的当前系统时间,无论执行是否成功。如果作业尚未完成,则该字段为空。 |
exitStatus | ExitStatus ,指示运行结果。这是最重要的,因为它包含返回给调用者的退出代码。有关更多详细信息,请参见第 5 章。如果作业尚未完成,则该字段为空。 |
createTime | java.util.Date 代表首次保留JobExecution 时的当前系统时间。作业可能尚未启动(因此没有启动时间),但是它始终具有 createTime,这是 Management 作业级别ExecutionContexts 的框架所需的。 |
lastUpdated | java.util.Date 代表上一次保留JobExecution 的时间。如果作业尚未开始,则此字段为空。 |
executionContext | 包含需要在两次执行之间保留的所有用户数据的“属性包”。 |
failureExceptions | 在执行Job 时遇到的异常列表。如果在Job 失败期间遇到多个异常,这些将很有用。 |
这些属性很重要,因为它们可以持久保存并且可以用来完全确定执行状态。例如,如果 01-01 的 EndOfDay 作业在 9:00 PM 执行而在 9:30 失败,则在批处理元数据表中进行以下 Importing:
*表 2. BATCH_JOB_INSTANCE *
JOB_INST_ID | JOB_NAME |
1 | EndOfDayJob |
*表 3. BATCH_JOB_EXECUTION_PARAMS *
JOB_EXECUTION_ID | TYPE_CD | KEY_NAME | DATE_VAL | IDENTIFYING |
1 | DATE | schedule.Date | 2017-01-01 | TRUE |
*表 4. BATCH_JOB_EXECUTION *
JOB_EXEC_ID | JOB_INST_ID | START_TIME | END_TIME | STATUS |
1 | 1 | 2017-01-01 21:00 | 2017-01-01 21:30 | FAILED |
Note
为了清楚和格式化,列名可能已被缩写或删除。
现在工作失败了,假设确定问题已花费了一整夜,因此“批处理窗口”现在关闭了。进一步假设该窗口在 9:00 PM 开始,该作业将在 01-01 再次开始,从停止的地方开始,并在 9:30 成功完成。因为现在是第二天,所以也必须运行 01-02 作业,此作业随后才在 9:31 开始,并在正常的一小时时间内在 10:30 完成。无需先将一个JobInstance
依次启动,除非这两个作业有可能尝试访问相同的数据,从而导致锁定数据库级别的问题。何时应运行Job
完全取决于调度程序。由于它们是单独的JobInstances
,因此 Spring Batch 不会尝试阻止它们同时运行。 (尝试在同一JobInstance
上运行而另一个已经在运行中会导致JobExecutionAlreadyRunningException
抛出)。 JobInstance
和JobParameters
表中现在都应该有一个额外的条目,而JobExecution
表中应该有两个额外的条目,如下表所示:
*表 5. BATCH_JOB_INSTANCE *
JOB_INST_ID | JOB_NAME |
1 | EndOfDayJob |
2 | EndOfDayJob |
*表 6. BATCH_JOB_EXECUTION_PARAMS *
JOB_EXECUTION_ID | TYPE_CD | KEY_NAME | DATE_VAL | IDENTIFYING |
1 | DATE | schedule.Date | 2017-01-01 00:00:00 | TRUE |
2 | DATE | schedule.Date | 2017-01-01 00:00:00 | TRUE |
3 | DATE | schedule.Date | 2017-01-02 00:00:00 | TRUE |
*表 7. BATCH_JOB_EXECUTION *
JOB_EXEC_ID | JOB_INST_ID | START_TIME | END_TIME | STATUS |
1 | 1 | 2017-01-01 21:00 | 2017-01-01 21:30 | FAILED |
2 | 1 | 2017-01-02 21:00 | 2017-01-02 21:30 | COMPLETED |
3 | 2 | 2017-01-02 21:31 | 2017-01-02 22:29 | COMPLETED |
Note
为了清楚和格式化,列名可能已被缩写或删除。
1.2. Step
Step
是一个域对象,它封装了批处理作业的一个独立的 Sequences 阶段。因此,每个工作完全由一个或多个步骤组成。 Step
包含定义和控制实际批处理所需的所有信息。这是一个模糊的描述,因为任何给定Step
的内容都由开发人员自行编写Job
来决定。 Step
可以像开发人员期望的那样简单或复杂。简单的Step
可能会将文件中的数据加载到数据库中,而只需要很少或不需要任何代码(取决于所使用的实现)。较复杂的Step
可能具有复杂的业务规则,这些规则将在处理过程中应用。与Job
一样,Step
具有与唯一JobExecution
相关联的单个StepExecution
,如下图所示:
图 4.带有步骤的作业层次结构
1.2.1. StepExecution
StepExecution
代表执行Step
的单次尝试。每次运行Step
都会创建一个新的StepExecution
,类似于JobExecution
。但是,如果某个步骤由于执行失败而无法执行,则不会 continue 执行。 StepExecution
仅在其Step
实际启动时创建。
Step
执行由StepExecution
类的对象表示。每个执行都包含对其相应步骤和JobExecution
以及与事务相关的数据的引用,例如提交和回滚计数以及开始和结束时间。此外,每个步骤执行都包含一个ExecutionContext
,其中包含开发人员在批处理运行中需要保留的所有数据,例如重新启动所需的统计信息或状态信息。下表列出了StepExecution
的属性:
表 8. StepExecution 属性
Property | Definition |
Status | BatchStatus 对象,指示执行状态。在运行时,状态为BatchStatus.STARTED 。如果失败,则状态为BatchStatus.FAILED 。如果成功完成,则状态为BatchStatus.COMPLETED 。 |
startTime | java.util.Date 代表执行开始时的当前系统时间。如果步骤尚未开始,则此字段为空。 |
endTime | java.util.Date 代表执行完成时的当前系统时间,无论执行是否成功。如果步骤尚未退出,则此字段为空。 |
exitStatus | ExitStatus 表示执行结果。这是最重要的,因为它包含返回给调用者的退出代码。有关更多详细信息,请参见第 5 章。如果作业尚未退出,则此字段为空。 |
executionContext | 包含需要在两次执行之间保留的所有用户数据的“属性包”。 |
readCount | 已成功读取的 Item 数。 |
writeCount | 已成功写入的 Item 数。 |
commitCount | 为此执行已提交的事务数。 |
rollbackCount | Step 控制的业务事务已回滚的次数。 |
readSkipCount | read 失败的次数,导致 Item 被跳过。 |
processSkipCount | read 失败的次数,导致 Item 被跳过。 |
filterCount | ItemProcessor 已“过滤”的 Item 数。 |
writeSkipCount | read 失败的次数,导致 Item 被跳过。 |
1.3. ExecutionContext
ExecutionContext
表示键/值对的集合,这些键/值对由框架保留并控制,以便允许开发人员放置一个范围为StepExecution
对象或JobExecution
对象的持久状态。对于那些熟悉 Quartz 的人来说,它与 JobDataMap 非常相似。最佳用法示例是促进重新启动。以平面文件 Importing 为例,在处理单独的行时,框架会定期在提交点保留ExecutionContext
。这样做使ItemReader
可以存储其状态,以防在运行期间发生致命错误,甚至断电。所需要做的就是将当前读取的行数放入上下文中,如下面的示例所示,框架将完成其余工作:
executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());
以Job
Stereotypes 部分中的 EndOfDay 示例为例,假设有一个步骤“ loadData”将文件加载到数据库中。第一次失败运行后,元数据表将类似于以下示例:
*表 9. BATCH_JOB_INSTANCE *
JOB_INST_ID | JOB_NAME |
1 | EndOfDayJob |
*表 10. BATCH_JOB_EXECUTION_PARAMS *
JOB_INST_ID | TYPE_CD | KEY_NAME | DATE_VAL |
1 | DATE | schedule.Date | 2017-01-01 |
*表 11. BATCH_JOB_EXECUTION *
JOB_EXEC_ID | JOB_INST_ID | START_TIME | END_TIME | STATUS |
1 | 1 | 2017-01-01 21:00 | 2017-01-01 21:30 | FAILED |
*表 12. BATCH_STEP_EXECUTION *
STEP_EXEC_ID | JOB_EXEC_ID | STEP_NAME | START_TIME | END_TIME | STATUS |
1 | 1 | loadData | 2017-01-01 21:00 | 2017-01-01 21:30 | FAILED |
*表 13. BATCH_STEP_EXECUTION_CONTEXT *
STEP_EXEC_ID | SHORT_CONTEXT |
1 | {piece.count=40321} |
在前面的情况下,Step
运行了 30 分钟并处理了 40321 个“件”,在这种情况下,这代表了文件中的行。此值在框架每次提交之前更新,并且可以包含与ExecutionContext
内的条目相对应的多个行。在提交之前被通知需要各种StepListener
实现(或ItemStream
)之一,本指南后面将对此进行详细讨论。与前面的示例一样,假定Job
在第二天重新启动。重新启动后,将从数据库重新构建上次运行的ExecutionContext
中的值。打开ItemReader
时,它可以检查它是否在上下文中具有任何存储状态,并从那里对其进行初始化,如以下示例所示:
if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
log.debug("Initializing for restart. Restart data is: " + executionContext);
long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));
LineReader reader = getReader();
Object record = "";
while (reader.getPosition() < lineCount && record != null) {
record = readLine();
}
}
在这种情况下,运行上面的代码后,当前行为 40322,从而使Step
从中断处重新开始。 ExecutionContext
还可以用于需要保留有关运行本身的统计信息。例如,如果一个平面文件包含多行中存在的处理订单,则可能有必要存储已处理的订单数量(与读取的行数有很大不同),以便可以通过以下方式发送电子邮件: Step
的末尾包含正文中已处理的订单总数。框架负责为开发人员存储此内容,以便使用单独的JobInstance
正确确定范围。知道是否应使用现有的ExecutionContext
可能非常困难。例如,使用上面的“ EndOfDay”示例,当 01-01 运行第二次再次开始时,框架将识别出它是相同的JobInstance
并以单独的Step
为基础,将ExecutionContext
从数据库中拉出,并且将其(作为StepExecution
的一部分)移交给Step
本身。相反,对于 01-02 运行,框架会认识到它是一个不同的实例,因此必须将空上下文传递给Step
。框架为开发人员做出了许多类型的确定,以确保在正确的时间将状态提供给开发人员。同样重要的是要注意,在任何给定时间,每个StepExecution
仅存在一个ExecutionContext
。 ExecutionContext
的 Client 端应该小心,因为这会创建共享的键空间。因此,在 Importing 值时应注意确保没有数据被覆盖。但是,Step
绝对不会在上下文中存储任何数据,因此没有办法对框架产生不利影响。
同样重要的是要注意,每个JobExecution
至少有一个ExecutionContext
,每个StepExecution
至少有一个ExecutionContext
。例如,考虑以下代码片段:
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob
如 Comment 中所述,ecStep
不等于ecJob
。他们是两个不同的ExecutionContexts
。范围为Step
的一个保存在Step
的每个提交点,而范围为 Job 的一个保存在每个Step
执行之间。
1.4. JobRepository
JobRepository
是上述所有构造型的持久性机制。它为JobLauncher
,Job
和Step
实现提供 CRUD 操作。首次启动Job
时,会从存储库中获取JobExecution
,并且在执行过程中,通过将StepExecution
和JobExecution
实现传递到存储库中来实现它们。
批处理名称空间提供了对使用<job-repository>
标记配置JobRepository
实例的支持,如以下示例所示:
<job-repository id="jobRepository"/>
使用 Java 配置时,@EnableBatchProcessing
Comments 会提供JobRepository
作为开箱即用自动配置的组件之一。
1.5. JobLauncher
JobLauncher
代表一个简单的界面,用于使用给定的JobParameters
集启动Job
,如以下示例所示:
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
期望实现从JobRepository
获得有效的JobExecution
并执行Job
。
1.6. itemReader
ItemReader
是一种抽象,表示一次检索Step
的 Importing。 ItemReader
耗尽了它可以提供的 Item 后,将通过返回null
来表明这一点。可以在Reader 和 Writer中找到有关ItemReader
接口及其各种实现的更多详细信息。
1.7. ItemWriter
ItemWriter
是一个抽象,表示一次Step
,一个批次或大块 Item 的输出。通常,ItemWriter
不知道接下来应该接收的 Importing,只知道当前调用中传递的 Item。可以在Reader 和 Writer中找到有关ItemWriter
接口及其各种实现的更多详细信息。
1.8. Item 处理器
ItemProcessor
是表示 Item 的业务处理的抽象。 ItemReader
读取一个 Item,而ItemWriter
写入一个 Item,而ItemProcessor
提供了一个转换或应用其他业务处理的访问点。如果在处理 Item 时确定该 Item 无效,则返回null
指示不应将该 Item 写出。有关ItemProcessor
界面的更多详细信息,请参见Reader 和 Writer。
1.9. 批处理命名空间
先前列出的许多域概念都需要在 Spring ApplicationContext
中进行配置。尽管上述接口的实现可以在标准 bean 定义中使用,但已提供了命名空间以简化配置,如以下示例所示:
<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
https://www.springframework.org/schema/batch/spring-batch.xsd">
<job id="ioSampleJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</tasklet>
</step>
</job>
</beans:beans>
只要声明了批处理名称空间,就可以使用其任何元素。有关配置作业的更多信息,请参见配置和运行作业。可以在配置步骤中找到有关配置Step
的更多信息。