On this page
1. 配置步骤
如领域章节所讨论,Step
是一个域对象,该域对象封装了批处理作业的独立 Sequences 阶段,并包含定义和控制实际批处理所需的所有信息。这是一个模糊的描述,因为任何给定Step
的内容都由开发人员自行编写Job
来决定。 Step
可以像开发人员期望的那样简单或复杂。简单的Step
可能会将文件中的数据加载到数据库中,而只需要很少或不需要任何代码(取决于所使用的实现)。较复杂的Step
可能具有复杂的业务规则,这些业务规则将在处理过程中应用,如下图所示:
图 1.步骤
1.1. 块处理
Spring Batch 在最常见的实现中使用“面向块的”处理风格。面向块的处理是指一次读取一个数据并创建在事务边界内写出的“块”。从ItemReader
读取一项,将其交给ItemProcessor
并进行汇总。一旦读取的 Item 数等于提交间隔,整个块就由ItemWriter
写入,然后提交事务。下图显示了该过程:
图 2.块处理
以下代码显示了相同的概念:
List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
Object item = itemReader.read()
Object processedItem = itemProcessor.process(item);
items.add(processedItem);
}
itemWriter.write(items);
1.1.1. 配置步骤
尽管Step
所需的依赖项列表相对较短,但它是一个极其复杂的类,可能包含许多协 Writer。
为了简化配置,可以使用 Spring Batch 名称空间,如以下示例所示:
XML Configuration
<job id="sampleJob" job-repository="jobRepository">
<step id="step1">
<tasklet transaction-manager="transactionManager">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
</job>
使用 Java 配置时,可以使用 Spring Batch 构建器,如以下示例所示:
Java Configuration
/**
* Note the JobRepository is typically autowired in and not needed to be explicitly
* configured
*/
@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
return this.jobBuilderFactory.get("sampleJob")
.repository(jobRepository)
.start(sampleStep)
.build();
}
/**
* Note the TransactionManager is typically autowired in and not needed to be explicitly
* configured
*/
@Bean
public Step sampleStep(PlatformTransactionManager transactionManager) {
return this.stepBuilderFactory.get("sampleStep")
.transactionManager(transactionManager)
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.build();
}
上面的配置包括创建面向 Item 的步骤所需的唯一依赖项:
reader
:提供处理 Item 的ItemReader
。writer
:处理ItemReader
提供的 Item 的ItemWriter
。transaction-manager
:Spring 的PlatformTransactionManager
,在处理期间开始并提交事务。transaction-manager
:Spring 的PlatformTransactionManager
,在处理期间开始并提交事务。job-repository
:在处理期间(即将提交之前)定期存储StepExecution
和ExecutionContext
的JobRepository
。对于嵌入式(在中定义的一个),它是元素上的一个属性。对于独立步骤,它被定义为的属性。repository
:在处理期间(即将提交之前)定期存储StepExecution
和ExecutionContext
的JobRepository
。commit-interval
:提交事务之前要处理的 Item 数。chunk
:表示这是基于物料的步骤,是提交事务之前要处理的物料数。
应该注意的是,job-repository
默认为jobRepository
,transaction-manager
默认为transactionManger
。另外,ItemProcessor
是可选的,因为该 Item 可以直接从 Reader 传递给编写器。
应该注意的是,repository
默认为jobRepository
,transactionManager
默认为transactionManger
(全部通过@EnableBatchProcessing
通过基础结构提供)。同样,ItemProcessor
是可选的,因为该 Item 可以直接从 Reader 传递给编写器。
1.1.2. 从父步骤继承
如果一组Steps
共享相似的配置,则定义一个“父” Step
可能会有所帮助,具体的Steps
可以从中继承属性。与 Java 中的类继承类似,“子” Step
将其元素和属性与父类结合在一起。子代也将覆盖父代的Steps
。
在下面的示例中,Step
“ concreteStep1”继承自“ parentStep”。它使用'itemReader','itemProcessor','itemWriter',startLimit=5
和allowStartIfComplete=true
实例化。此外,commitInterval
为'5',因为它被“ concreteStep1” Step
覆盖,如以下示例所示:
<step id="parentStep">
<tasklet allow-start-if-complete="true">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
<step id="concreteStep1" parent="parentStep">
<tasklet start-limit="5">
<chunk processor="itemProcessor" commit-interval="5"/>
</tasklet>
</step>
作业元素内的步骤上仍需要id
属性。这有两个原因:
保留
StepExecution
时,将id
用作步骤名称。如果在一个作业中的多个步骤中引用了同一独立步骤,则会发生错误。如本章稍后所述,在创建作业流程时,
next
属性应引用流程中的步骤,而不是独立步骤。
Abstract Step
有时,可能有必要定义不是完整的Step
配置的父级Step
。例如,如果reader
,writer
和tasklet
属性未包含在Step
配置中,则初始化将失败。如果必须定义没有这些属性的父对象,则应使用abstract
属性。 abstract
Step
仅被扩展,从不实例化。
在下面的示例中,如果Step
abstractParentStep
未声明为抽象,则不会实例化。 Step
(“ concreteStep2”)具有“ itemReader”,“ itemWriter”和 commit-interval = 10.
<step id="abstractParentStep" abstract="true">
<tasklet>
<chunk commit-interval="10"/>
</tasklet>
</step>
<step id="concreteStep2" parent="abstractParentStep">
<tasklet>
<chunk reader="itemReader" writer="itemWriter"/>
</tasklet>
</step>
Merging Lists
Steps
上的一些可配置元素是列表,例如<listeners/>
元素。如果父级和子级Steps
都声明了<listeners/>
元素,则子级的列表将覆盖父级的列表。为了允许孩子将其他侦听器添加到 parent 定义的列表中,每个列表元素都具有merge
属性。如果该元素指定merge="true"
,则子级列表与父级列表合并,而不是覆盖它。
在以下示例中,使用两个侦听器创建了Step
“ concreteStep3”:listenerOne
和listenerTwo
:
<step id="listenersParentStep" abstract="true">
<listeners>
<listener ref="listenerOne"/>
<listeners>
</step>
<step id="concreteStep3" parent="listenersParentStep">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="5"/>
</tasklet>
<listeners merge="true">
<listener ref="listenerTwo"/>
<listeners>
</step>
1.1.3. 提交间隔
如前所述,一个步骤将读取和写入 Item,并使用提供的PlatformTransactionManager
定期提交。 commit-interval
为 1 时,它将在写入每个单独的 Item 后提交。在许多情况下,这并不理想,因为开始和提交事务非常昂贵。理想情况下,最好在每个事务中处理尽可能多的 Item,这完全取决于要处理的数据类型和与之交互的资源。因此,可以配置在提交中处理的 Item 数。以下示例显示了一个step
,其tasklet
的commit-interval
值为 10.
XML Configuration
<job id="sampleJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
</job>
Java Configuration
@Bean
public Job sampleJob() {
return this.jobBuilderFactory.get("sampleJob")
.start(step1())
.end()
.build();
}
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.build();
}
在前面的示例中,每个事务处理 10 个 Item。在处理开始时,开始事务。同样,每次在ItemReader
上调用read
时,计数器都会递增。当达到 10 时,聚合项的列表将传递到ItemWriter
,并提交事务。
1.1.4. 配置重启步骤
在“ 配置和运行作业”部分中,讨论了重新启动Job
的问题。重新启动对步骤有很多影响,因此可能需要一些特定的配置。
设置开始限制
在许多情况下,您可能希望控制Step
的启动次数。例如,可能需要配置特定的Step
使其仅运行一次,因为它会使某些必须手动修复的资源无效,然后才能再次运行它。这可以在步骤级别上配置,因为不同的步骤可能有不同的要求。只能执行一次的Step
与可以无限运行的Step
作为同一Job
的一部分存在。以下代码片段显示了启动限制配置的示例:
XML Configuration
<step id="step1">
<tasklet start-limit="1">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
Java Configuration
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.startLimit(1)
.build();
}
上面的步骤只能运行一次。再次尝试运行它会引发StartLimitExceededException
。请注意,开始限制的默认值为Integer.MAX_VALUE
。
重新开始完成的步骤
对于可重新启动的作业,可能有一个或多个步骤应始终运行,无论它们是否第一次成功。一个示例可能是验证步骤或Step
,它们在处理之前清理资源。在正常处理重新启动的作业期间,将跳过状态为“已完成”(表示已成功完成)的任何步骤。将allow-start-if-complete
设置为“ true”将覆盖此设置,以便该步骤始终运行,如以下示例所示:
XML Configuration
<step id="step1">
<tasklet allow-start-if-complete="true">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
Java Configuration
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.allowStartIfComplete(true)
.build();
}
步骤重新启动配置示例
以下示例显示如何配置作业以使其具有可以重新启动的步骤:
XML Configuration
<job id="footballJob" restartable="true">
<step id="playerload" next="gameLoad">
<tasklet>
<chunk reader="playerFileItemReader" writer="playerWriter"
commit-interval="10" />
</tasklet>
</step>
<step id="gameLoad" next="playerSummarization">
<tasklet allow-start-if-complete="true">
<chunk reader="gameFileItemReader" writer="gameWriter"
commit-interval="10"/>
</tasklet>
</step>
<step id="playerSummarization">
<tasklet start-limit="2">
<chunk reader="playerSummarizationSource" writer="summaryWriter"
commit-interval="10"/>
</tasklet>
</step>
</job>
Java Configuration
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.end()
.build();
}
@Bean
public Step playerLoad() {
return this.stepBuilderFactory.get("playerLoad")
.<String, String>chunk(10)
.reader(playerFileItemReader())
.writer(playerWriter())
.build();
}
@Bean
public Step gameLoad() {
return this.stepBuilderFactory.get("gameLoad")
.allowStartIfComplete(true)
.<String, String>chunk(10)
.reader(gameFileItemReader())
.writer(gameWriter())
.build();
}
@Bean
public Step playerSummarization() {
return this.stepBuilderFactor.get("playerSummarization")
.startLimit(2)
.<String, String>chunk(10)
.reader(playerSummarizationSource())
.writer(summaryWriter())
.build();
}
前面的示例配置用于一项作业,该作业加载有关足球 match 的信息并进行总结。它包含三个步骤:playerLoad
,gameLoad
和playerSummarization
。 playerLoad
步骤从平面文件加载玩家信息,而gameLoad
步骤对游戏执行相同的操作。然后,最后的步骤playerSummarization
根据所提供的游戏总结每个玩家的统计信息。假定playerLoad
加载的文件仅必须加载一次,但是gameLoad
可以加载在特定目录中找到的任何游戏,并将它们成功加载到数据库后将其删除。结果,playerLoad
步骤不包含其他配置。它可以启动任意次,如果完成,将被跳过。但是,如果自上次运行以来已添加了额外的文件,则每次都需要运行gameLoad
步骤。为了始终启动,将“ allow-start-if-complete”设置为“ true”。 (假设装入的数据库表游戏具有过程指示器,以确保可以通过汇总步骤正确找到新游戏)。汇总步骤是作业中最重要的步骤,配置为起始限制为 2.这很有用,因为如果该步骤连续失败,则会将新的退出代码返回给控制作业执行的操作员,并且可以在进行手动干预之前,请勿重新开始。
Note
这项工作提供了此文档的示例,并且与示例 Item 中的footballJob
不同。
本节的其余部分描述了footballJob
示例的三个运行中的每个运行情况。
Run 1:
playerLoad
运行并成功完成,将 400 位玩家添加到“玩家”表中。gameLoad
运行并处理 11 个文件的游戏数据,并将其内容加载到“ GAMES”表中。playerSummarization
开始处理,并在 5 分钟后失败。
Run 2:
playerLoad
无法运行,因为它已经成功完成,并且allow-start-if-complete
为'false'(默认)。gameLoad
再次运行并处理另外 2 个文件,并将它们的内容也加载到“ GAMES”表中(带有进程指示符,指示它们尚未处理)。playerSummarization
开始处理所有剩余的游戏数据(使用进程指示器进行过滤),并在 30 分钟后再次失败。
Run 3:
playerLoad
无法运行,因为它已经成功完成,并且allow-start-if-complete
为'false'(默认)。gameLoad
再次运行并处理另外 2 个文件,并将它们的内容也加载到“ GAMES”表中(带有进程指示符,指示它们尚未处理)。playerSummarization
没有启动,并且作业被立即终止,因为这是playerSummarization
的第三次执行,并且其限制仅为 2.要么必须提高限制,要么必须将Job
作为新的JobInstance
执行。
1.1.5. 配置跳过逻辑
在许多情况下,处理时遇到的错误不应导致Step
失败,而应跳过这些错误。通常这是必须由了解数据本身及其含义的人做出的决定。例如,财务数据可能无法跳过,因为它会导致资金被转移,这需要完全准确。另一方面,加载供应商列表可能会导致跳过。如果由于格式错误或缺少必要的信息而未加载供应商,则可能没有问题。通常,这些不良记录也会被记录下来,稍后在讨论侦听器时将予以介绍。
以下示例显示了使用跳过限制的示例:
XML Configuration
<step id="step1">
<tasklet>
<chunk reader="flatFileItemReader" writer="itemWriter"
commit-interval="10" skip-limit="10">
<skippable-exception-classes>
<include class="org.springframework.batch.item.file.FlatFileParseException"/>
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
Java Configuration
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)
.skip(FlatFileParseException.class)
.build();
}
在前面的示例中,使用了FlatFileItemReader
。如果在任何时候抛出FlatFileParseException
,则将跳过该 Item 并计入总跳过限制 10.在块处理(读取,处理,写入)的任何阶段都可能引发声明的异常(及其子类)。 ),但单独的计数是在步骤执行过程中对读取,处理和写入的跳跃进行计数,但该限制适用于所有跳跃。一旦达到跳过限制,找到的下一个异常将导致该步骤失败。换句话说,第十一跳会触发异常,而不是第十。
前面的示例的一个问题是,除了FlatFileParseException
之外,任何其他异常都会导致Job
失败。在某些情况下,这可能是正确的行为。但是,在其他情况下,可能更容易确定哪些异常应导致失败并跳过其他所有内容,如以下示例所示:
XML Configuration
<step id="step1">
<tasklet>
<chunk reader="flatFileItemReader" writer="itemWriter"
commit-interval="10" skip-limit="10">
<skippable-exception-classes>
<include class="java.lang.Exception"/>
<exclude class="java.io.FileNotFoundException"/>
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
Java Configuration
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)
.skip(Exception.class)
.noSkip(FileNotFoundException.class)
.build();
}
通过将java.lang.Exception
标识为可跳过的异常类,该配置指示所有Exceptions
都是可跳过的。但是,通过“排除” java.io.FileNotFoundException
,配置将可跳过的异常类的列表优化为全部Exceptions
* except * FileNotFoundException
。如果遇到任何排除的异常类都是致命的(也就是说,它们不会被跳过)。
对于遇到的任何异常,可跳过性由类层次结构中最接近的超类确定。任何未分类的异常均被视为“致命”异常。
<include/>
和<exclude/>
元素的 Sequences 无关紧要。
skip
和noSkip
调用的 Sequences 无关紧要。
1.1.6. 配置重试逻辑
在大多数情况下,您希望异常导致跳过或Step
失败。但是,并非所有 exception 都是确定性的。如果在读取时遇到FlatFileParseException
,则始终将其抛出该记录。重置ItemReader
没有帮助。但是,对于其他异常,例如DeadlockLoserDataAccessException
,它指示当前进程已尝试更新另一个进程已锁定的记录,await 并重试可能会导致成功。在这种情况下,重试应配置如下:
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter"
commit-interval="2" retry-limit="3">
<retryable-exception-classes>
<include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
</retryable-exception-classes>
</chunk>
</tasklet>
</step>
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.build();
}
Step
允许限制单个 Item 的重试次数以及“可重试”的 exception 列表。有关重试工作方式的更多详细信息,请参见retry。
1.1.7. 控制回滚
默认情况下,无论重试还是跳过,从ItemWriter
引发的任何异常都会导致由Step
控制的事务回滚。如果如前所述配置了 skip,则从ItemReader
引发的异常不会引起回滚。但是,在许多情况下,从ItemWriter
引发的异常不应导致回滚,因为没有采取任何行动来使事务无效。因此,可以为Step
配置一系列不应引起回滚的异常,如以下示例所示:
XML Configuration
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
<no-rollback-exception-classes>
<include class="org.springframework.batch.item.validator.ValidationException"/>
</no-rollback-exception-classes>
</tasklet>
</step>
Java Configuration
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.noRollback(ValidationException.class)
.build();
}
Transactional Readers
ItemReader
的基本 Contract 是仅向前。该步骤缓冲读取器的 Importing,因此在回滚的情况下,不需要从读取器中重新读取 Item。但是,在某些情况下,Reader 是构建在诸如 JMS 队列之类的事务资源之上的。在这种情况下,由于队列与回滚的事务相关联,因此将从队列中拉出的消息放回原处。因此,可以将步骤配置为不缓冲 Item,如以下示例所示:
XML Configuration
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"
is-reader-transactional-queue="true"/>
</tasklet>
</step>
Java Configuration
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.readerIsTransactionalQueue()
.build();
}
1.1.8. 事务属性
事务属性可用于控制isolation
,propagation
和timeout
设置。可以在Spring 核心文档中找到有关设置事务属性的更多信息。以下示例设置isolation
,propagation
和timeout
事务属性:
XML Configuration
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
<transaction-attributes isolation="DEFAULT"
propagation="REQUIRED"
timeout="30"/>
</tasklet>
</step>
Java Configuration
@Bean
public Step step1() {
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setPropagationBehavior(Propagation.REQUIRED.value());
attribute.setIsolationLevel(Isolation.DEFAULT.value());
attribute.setTimeout(30);
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.transactionAttribute(attribute)
.build();
}
1.1.9. 使用步骤注册 ItemStream
该步骤必须在其生命周期中的必要时点处理ItemStream
回调(有关ItemStream
接口的更多信息,请参见ItemStream)。如果步骤失败并且可能需要重新启动,这是至关重要的,因为ItemStream
接口是步骤获取执行之间有关持久状态所需的信息的位置。
如果ItemReader
,ItemProcessor
或ItemWriter
本身实现ItemStream
接口,则将自动注册这些接口。任何其他流都需要单独注册。这通常是将间接依赖项(例如委托)注入到读取器和写入器中的情况。可以通过'streams'元素在Step
上注册流,如以下示例所示:
XML Configuration
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="compositeWriter" commit-interval="2">
<streams>
<stream ref="fileItemWriter1"/>
<stream ref="fileItemWriter2"/>
</streams>
</chunk>
</tasklet>
</step>
<beans:bean id="compositeWriter"
class="org.springframework.batch.item.support.CompositeItemWriter">
<beans:property name="delegates">
<beans:list>
<beans:ref bean="fileItemWriter1" />
<beans:ref bean="fileItemWriter2" />
</beans:list>
</beans:property>
</beans:bean>
Java Configuration
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(itemReader())
.writer(compositeItemWriter())
.stream(fileItemWriter1())
.stream(fileItemWriter2())
.build();
}
/**
* In Spring Batch 4, the CompositeItemWriter implements ItemStream so this isn't
* necessary, but used for an example.
*/
@Bean
public CompositeItemWriter compositeItemWriter() {
List<ItemWriter> writers = new ArrayList<>(2);
writers.add(fileItemWriter1());
writers.add(fileItemWriter2());
CompositeItemWriter itemWriter = new CompositeItemWriter();
itemWriter.setDelegates(writers);
return itemWriter;
}
在上面的示例中,CompositeItemWriter
不是ItemStream
,但是它的两个委托都是。因此,必须将两个委托 Writer 都明确注册为流,以便框架正确处理它们。 ItemReader
不需要显式注册为流,因为它是Step
的直接属性。现在该步骤可以重新启动,并且在发生故障时,读取器和写入器的状态可以正确保留。
1.1.10. 拦截步骤执行
就像Job
一样,在执行Step
期间会发生许多事件,用户可能需要执行某些功能。例如,为了写出需要页脚的平面文件,必须在Step
完成时通知ItemWriter
,以便可以写入页脚。这可以通过许多Step
作用域侦听器之一来完成。
实现StepListener
extensions 之一的任何类(但由于接口本身为空,因此不能实现该接口本身)可以应用于通过listeners
元素的步骤。 listeners
元素在步骤,tasklet 或块声明中有效。建议您在应用其功能的级别声明侦听器,或者,如果它具有多种功能(例如StepExecutionListener
和ItemReadListener
),则在其应用的最精细级别声明它。以下示例显示了在块级别应用的侦听器:
XML Configuration
<step id="step1">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="10"/>
<listeners>
<listener ref="chunkListener"/>
</listeners>
</tasklet>
</step>
Java Configuration
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(10)
.reader(reader())
.writer(writer())
.listener(chunkListener())
.build();
}
如果使用名称空间<step>
元素或*StepFactoryBean
工厂之一,则本身实现StepListener
接口之一的ItemReader
,ItemWriter
或ItemProcessor
会自动向Step
注册。这仅适用于直接注入Step
的组件。如果侦听器嵌套在另一个组件中,则需要对其进行显式注册(如先前在使用步骤注册 ItemStream下所述)。
除了StepListener
接口之外,还提供 Comments 来解决相同的问题。普通的旧 Java 对象可以使用带有这些注解的方法,然后将其转换为相应的StepListener
类型。Comments 块组件(例如ItemReader
或ItemWriter
或Tasklet
)的自定义实现也是很常见的。 XML 解析器会针对<listener/>
元素对 Comments 进行分析,并在构建器中使用listener
方法对其进行注册,因此,您所需要做的就是使用 XML 名称空间或构建器来通过步骤注册侦听器。
StepExecutionListener
StepExecutionListener
代表Step
执行的最通用的侦听器。它允许在启动Step
之前和结束Step
之后发出通知,无论它是正常结束还是失败,如以下示例所示:
public interface StepExecutionListener extends StepListener {
void beforeStep(StepExecution stepExecution);
ExitStatus afterStep(StepExecution stepExecution);
}
ExitStatus
是afterStep
的返回类型,以使侦听器有机会修改在Step
完成时返回的退出代码。
与此接口对应的 Comments 为:
@BeforeStep
@AfterStep
ChunkListener
块定义为在事务范围内处理的 Item。在每个提交间隔提交事务都会提交一个“块”。 ChunkListener
可用于在块开始处理之前或块成功完成之后执行逻辑,如以下接口定义所示:
public interface ChunkListener extends StepListener {
void beforeChunk(ChunkContext context);
void afterChunk(ChunkContext context);
void afterChunkError(ChunkContext context);
}
在事务开始之后但在ItemReader
上调用 read 之前,调用 beforeChunk 方法。相反,在提交块后调用afterChunk
(如果发生回滚则根本不调用)。
与此接口对应的 Comments 为:
@BeforeChunk
@AfterChunk
@AfterChunkError
没有块声明时可以应用ChunkListener
。 TaskletStep
负责调用ChunkListener
,因此它也适用于非面向 Item 的 Tasklet(在 Tasklet 之前和之后调用)。
ItemReadListener
在先前讨论跳过逻辑时,曾提到记录跳过的记录可能会有所帮助,以便稍后进行处理。如果发生读取错误,可以使用ItemReaderListener
完成,如以下接口定义所示:
public interface ItemReadListener<T> extends StepListener {
void beforeRead();
void afterRead(T item);
void onReadError(Exception ex);
}
在每次调用ItemReader
之前先调用beforeRead
方法。每次成功调用 read 之后,都会调用afterRead
方法,并将所读取的 Item 传递给afterRead
方法。如果读取时发生错误,则调用onReadError
方法。提供了遇到的异常,以便可以将其记录下来。
与此接口对应的 Comments 为:
@BeforeRead
@AfterRead
@OnReadError
ItemProcessListener
就像ItemReadListener
一样,可以“监听”Item 的处理,如以下接口定义所示:
public interface ItemProcessListener<T, S> extends StepListener {
void beforeProcess(T item);
void afterProcess(T item, S result);
void onProcessError(T item, Exception e);
}
在ItemProcessor
上的process
之前调用beforeProcess
方法,并将该方法交给要处理的 Item。成功处理 Item 后,将调用afterProcess
方法。如果处理时出错,则调用onProcessError
方法。提供遇到的异常以及尝试处理的 Item,以便可以记录它们。
与此接口对应的 Comments 为:
@BeforeProcess
@AfterProcess
@OnProcessError
ItemWriteListener
可以使用ItemWriteListener
来“侦听”Item 的编写,如以下接口定义所示:
public interface ItemWriteListener<S> extends StepListener {
void beforeWrite(List<? extends S> items);
void afterWrite(List<? extends S> items);
void onWriteError(Exception exception, List<? extends S> items);
}
beforeWrite
方法在ItemWriter
上的write
之前被调用,并被传递到写入的 Item 列表中。成功写入 Item 后,将调用afterWrite
方法。如果写入时发生错误,则调用onWriteError
方法。提供遇到的异常以及尝试写入的 Item,以便可以记录它们。
与此接口对应的 Comments 为:
@BeforeWrite
@AfterWrite
@OnWriteError
SkipListener
ItemReadListener
,ItemProcessListener
和ItemWriteListener
都提供了用于通知错误的机制,但没有一个通知您实际上已跳过记录。例如,即使重试并成功执行onWriteError
也会被调用。因此,有一个单独的界面来跟踪跳过的 Item,如以下界面定义所示:
public interface SkipListener<T,S> extends StepListener {
void onSkipInRead(Throwable t);
void onSkipInProcess(T item, Throwable t);
void onSkipInWrite(S item, Throwable t);
}
每次在阅读时跳过某项都会调用onSkipInRead
。应当注意,回滚可能导致同一项被注册为多次跳过。写入时跳过某项时会调用onSkipInWrite
。由于已成功读取(而不是跳过)该 Item,因此还提供了该 Item 本身作为参数。
与此接口对应的 Comments 为:
@OnSkipInRead
@OnSkipInWrite
@OnSkipInProcess
跳过侦听器和事务
SkipListener
最常见的用例之一是注销跳过的 Item,以便可以使用另一个批处理甚至人工流程来评估和解决导致跳过的问题。由于在很多情况下原始事务可能会被回滚,因此 Spring Batch 提供了两个保证:
每个 Item 仅调用一次适当的跳过方法(取决于错误发生的时间)。
SkipListener
总是在提交事务之前被调用。这是为了确保ItemWriter
内的故障不会使侦听器调用的任何事务资源回滚。
1.2. TaskletStep
Chunk-oriented processing不是处理Step
的唯一方法。如果Step
必须包含一个简单的存储过程调用怎么办?您可以将调用实现为ItemReader
,并在过程完成后返回 null。但是,这样做有点不自然,因为将需要没有操作ItemWriter
。 Spring Batch 为此场景提供了TaskletStep
。
Tasklet
是一个简单的接口,具有一个方法execute
,该方法被TaskletStep
重复调用,直到它返回RepeatStatus.FINISHED
或引发异常以指示失败。对Tasklet
的每次调用都包装在一个事务中。 Tasklet
实现者可以调用存储过程,脚本或简单的 SQL 更新语句。
要创建TaskletStep
,元素的'ref'属性应引用定义Tasklet
对象的 bean。 中不应使用任何元素。以下示例显示了一个简单的任务集:
<step id="step1">
<tasklet ref="myTasklet"/>
</step>
要创建TaskletStep
,传递给构建器的tasklet
方法的 bean 应该实现Tasklet
接口。构建TaskletStep
时,不应调用chunk
。以下示例显示了一个简单的任务集:
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.tasklet(myTasklet())
.build();
}
Note
TaskletStep
如果实现了StepListener
接口,则会自动将小任务注册为StepListener
。
1.2.1. TaskletAdapter
与ItemReader
和ItemWriter
接口的其他适配器一样,Tasklet
接口包含一个实现,可以使其自身适应任何现有的类:TaskletAdapter
。一个可能有用的示例是现有的 DAO,用于更新一组记录上的标志。可以使用TaskletAdapter
来调用此类,而不必为Tasklet
接口编写适配器,如以下示例所示:
XML Configuration
<bean id="myTasklet" class="o.s.b.core.step.tasklet.MethodInvokingTaskletAdapter">
<property name="targetObject">
<bean class="org.mycompany.FooDao"/>
</property>
<property name="targetMethod" value="updateFoo" />
</bean>
Java Configuration
@Bean
public MethodInvokingTaskletAdapter myTasklet() {
MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter();
adapter.setTargetObject(fooDao());
adapter.setTargetMethod("updateFoo");
return adapter;
}
1.2.2. 示例 Tasklet 实现
许多批处理作业包含必须在主处理开始之前执行的步骤,以设置各种资源,或者在处理完成后清理这些资源。对于需要大量处理文件的工作,通常需要在成功将文件上载到其他位置后在本地删除某些文件。以下示例(取自Spring Batch 示例 Item)是具有这种职责的Tasklet
实现:
public class FileDeletingTasklet implements Tasklet, InitializingBean {
private Resource directory;
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
File dir = directory.getFile();
Assert.state(dir.isDirectory());
File[] files = dir.listFiles();
for (int i = 0; i < files.length; i++) {
boolean deleted = files[i].delete();
if (!deleted) {
throw new UnexpectedJobExecutionException("Could not delete file " +
files[i].getPath());
}
}
return RepeatStatus.FINISHED;
}
public void setDirectoryResource(Resource directory) {
this.directory = directory;
}
public void afterPropertiesSet() throws Exception {
Assert.notNull(directory, "directory must be set");
}
}
前面的Tasklet
实现删除指定目录中的所有文件。应该注意的是,execute
方法仅被调用一次。剩下的就是从Step
引用Tasklet
:
XML Configuration
<job id="taskletJob">
<step id="deleteFilesInDir">
<tasklet ref="fileDeletingTasklet"/>
</step>
</job>
<beans:bean id="fileDeletingTasklet"
class="org.springframework.batch.sample.tasklet.FileDeletingTasklet">
<beans:property name="directoryResource">
<beans:bean id="directory"
class="org.springframework.core.io.FileSystemResource">
<beans:constructor-arg value="target/test-outputs/test-dir" />
</beans:bean>
</beans:property>
</beans:bean>
Java Configuration
@Bean
public Job taskletJob() {
return this.jobBuilderFactory.get("taskletJob")
.start(deleteFilesInDir())
.build();
}
@Bean
public Step deleteFilesInDir() {
return this.stepBuilderFactory.get("deleteFilesInDir")
.tasklet(fileDeletingTasklet())
.build();
}
@Bean
public FileDeletingTasklet fileDeletingTasklet() {
FileDeletingTasklet tasklet = new FileDeletingTasklet();
tasklet.setDirectoryResource(new FileSystemResource("target/test-outputs/test-dir"));
return tasklet;
}
1.3. 控制步骤流
由于能够将一个拥有的作业中的各个步骤组合在一起,因此需要能够控制该作业如何从一个步骤“流”到另一个步骤。 Step
的失败并不一定意味着Job
应该失败。此外,可能有不止一种类型的“成功”确定接下来应执行哪个Step
。根据一组Steps
的配置方式,某些步骤甚至可能根本不执行。
1.3.1. Sequences 流
最简单的流程场景是一项工作,其中所有步骤都按 Sequences 执行,如下图所示:
图 3.Sequences 流
可以通过使用 step 元素的'next'属性来实现,如以下示例所示:
XML Configuration
<job id="job">
<step id="stepA" parent="s1" next="stepB" />
<step id="stepB" parent="s2" next="stepC"/>
<step id="stepC" parent="s3" />
</job>
Java Configuration
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(stepA())
.next(stepB())
.next(stepC())
.build();
}
在上述情况下,“步骤 A”首先运行,因为它是第一个列出的Step
。如果“步骤 A”正常完成,则“步骤 B”运行,依此类推。但是,如果“步骤 A”失败,则整个Job
失败,并且“步骤 B”不执行。
Note
使用 Spring Batch 名称空间,配置中列出的第一步总是 Job
运行的第一步。其他步骤元素的 Sequences 无关紧要,但是第一步必须始终首先出现在 xml 中。
1.3.2. 条件流
在上面的示例中,只有两种可能性:
Step
成功,应执行下一个Step
。Step
失败,因此Job
应该失败。
在许多情况下,这可能就足够了。但是,如果Step
的故障应触发一个不同的Step
而不是引起故障的情况呢?下图显示了这样的流程:
图 4.条件流
为了处理更复杂的场景,Spring Batch 名称空间允许在 step 元素内定义过渡元素。 next
元素就是这样一种过渡。像next
属性一样,next
元素告诉Job
接下来要执行哪个Step
。但是,与该属性不同,给定的Step
允许任何数量的next
元素,并且在失败的情况下没有默认行为。这意味着,如果使用过渡元素,则必须明确定义Step
过渡的所有行为。还要注意,单个步骤不能同时具有next
属性和transition
元素。
next
元素指定要匹配的模式以及下一步要执行的步骤,如以下示例所示:
XML Configuration
<job id="job">
<step id="stepA" parent="s1">
<next on="*" to="stepB" />
<next on="FAILED" to="stepC" />
</step>
<step id="stepB" parent="s2" next="stepC" />
<step id="stepC" parent="s3" />
</job>
Java Configuration
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(stepA())
.on("*").to(stepB())
.from(stepA()).on("FAILED").to(stepC())
.end()
.build();
}
使用 XML 配置时,过渡元素的on
属性使用简单的模式匹配方案来匹配Step
的执行所产生的ExitStatus
。
使用 Java 配置时,on
方法使用简单的模式匹配方案来匹配Step
的执行所产生的ExitStatus
。
模式中仅允许使用两个特殊字符:
“ *”匹配零个或多个字符
“?”完全匹配一个字符
例如,“ c * t”匹配“ cat”和“ count”,而“ c?t”匹配“ cat”但不匹配“ count”。
尽管Step
上的过渡元素数量没有限制,但是如果Step
执行导致ExitStatus
未被元素覆盖,则框架将引发异常,并且Job
失败。框架自动排序从最具体到最不具体的过渡。这意味着,即使在上面的示例中将订单交换为“ stepA”,“ _ FAILED”的ExitStatus
仍将进入“ stepC”。
批处理状态与退出状态
为条件流配置Job
时,了解BatchStatus
和ExitStatus
之间的区别很重要。 BatchStatus
是同时为JobExecution
和StepExecution
的属性的枚举,框架使用它来记录Job
或Step
的状态。它可以是以下值之一:COMPLETED
,STARTING
,STARTED
,STOPPING
,STOPPED
,FAILED
,ABANDONED
或UNKNOWN
。其中大多数是不言自明的:COMPLETED
是步骤或作业成功完成时设置的状态,FAILED
是失败时设置的状态,依此类推。
以下示例在使用 XML 配置时包含'next'元素:
<next on="FAILED" to="stepB" />
以下示例在使用 Java 配置时包含'on'元素:
...
.from(stepA()).on("FAILED").to(stepB())
...
乍一 Watch,似乎'on'引用了它所属的Step
的BatchStatus
。但是,它实际上引用了Step
的ExitStatus
。顾名思义,ExitStatus
代表Step
完成执行后的状态。
更具体地说,当使用 XML 配置时,前面的 XML 配置示例中显示的'next'元素引用了ExitStatus
的退出代码。
使用 Java 配置时,前面的 Java 配置示例中显示的'on'方法引用了ExitStatus
的退出代码。
用英语说:“如果退出代码为FAILED
,则转到 stepB”。默认情况下,退出代码始终与Step
的BatchStatus
相同,这就是上面的 Importing 起作用的原因。但是,如果退出代码需要不同怎么办?一个很好的例子来自 samples Item 中的 skip sample 作业:
XML Configuration
<step id="step1" parent="s1">
<end on="FAILED" />
<next on="COMPLETED WITH SKIPS" to="errorPrint1" />
<next on="*" to="step2" />
</step>
Java Configuration
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1()).on("FAILED").end()
.from(step1()).on("COMPLETED WITH SKIPS").to(errorPrint1())
.from(step1()).on("*").to(step2())
.end()
.build();
}
step1
具有三种可能性:
Step
失败,在这种情况下作业应该失败。Step
成功完成。Step
已成功完成,但退出代码为“ COMPLETED WITH SKIPS”。在这种情况下,应运行不同的步骤来处理错误。
以上配置有效。但是,需要根据跳过了记录的执行条件来更改退出代码,如以下示例所示:
public class SkipCheckingListener extends StepExecutionListenerSupport {
public ExitStatus afterStep(StepExecution stepExecution) {
String exitCode = stepExecution.getExitStatus().getExitCode();
if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
stepExecution.getSkipCount() > 0) {
return new ExitStatus("COMPLETED WITH SKIPS");
}
else {
return null;
}
}
}
上面的代码是StepExecutionListener
,首先检查以确保Step
成功,然后检查StepExecution
上的跳过计数是否大于 0.如果同时满足这两个条件,则返回退出代码为COMPLETED WITH SKIPS
的新ExitStatus
。
1.3.3. 配置停止
在讨论BatchStatus 和 ExitStatus之后,您可能会想知道如何为Job
确定BatchStatus
和ExitStatus
。通过已执行的代码为Step
确定这些状态时,基于配置确定Job
的状态。
到目前为止,讨论的所有作业配置都至少具有一个没有过渡的最终Step
。例如,执行以下步骤后,Job
结束,如以下示例所示:
<step id="stepC" parent="s3"/>
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1())
.build();
}
如果未为Step
定义过渡,则Job
的状态定义如下:
如果
Step
以ExitStatus
FAILED 结尾,则Job
的BatchStatus
和ExitStatus
均为FAILED
。否则,
Job
的BatchStatus
和ExitStatus
均为COMPLETED
。
尽管这种终止批处理作业的方法对于某些批处理作业(例如简单的 Sequences 步骤作业)已足够,但是可能需要自定义定义的作业停止方案。为此,Spring Batch 提供了三个过渡元素来停止Job
(除了我们之前讨论的next element之外)。这些停止元素中的每一个都会停止具有特定BatchStatus
的Job
。重要的是要注意,停止过渡元素对Job
中的任何Steps
的BatchStatus
或ExitStatus
都没有影响。这些元素仅影响Job
的最终状态。例如,作业中的每个步骤都可能具有FAILED
的状态,但作业中可能具有COMPLETED
的状态。
一步一步结束
配置步进结束会指示Job
以COMPLETED
的BatchStatus
停止。状态为COMPLETED
的Job
无法重新启动(框架抛出JobInstanceAlreadyCompleteException
)。
使用 XML 配置时,“ end”元素用于此任务。 end
元素还允许使用可选的“退出代码”属性,该属性可用于自定义Job
的ExitStatus
。如果未提供“退出代码”属性,则ExitStatus
默认为COMPLETED
,以匹配BatchStatus
。
使用 Java 配置时,“ end”方法用于此任务。 end
方法还允许使用可选的'exitStatus'参数,该参数可用于自定义Job
的ExitStatus
。如果未提供“ exitStatus”值,则ExitStatus
默认为COMPLETED
,以匹配BatchStatus
。
在以下情况下,如果step2
失败,则Job
以BatchStatus
COMPLETED
停止并且ExitStatus
COMPLETED
停止并且step3
不运行。否则,执行移至step3
。请注意,如果step2
失败,则Job
无法重新启动(因为状态为COMPLETED
)。
<step id="step1" parent="s1" next="step2">
<step id="step2" parent="s2">
<end on="FAILED"/>
<next on="*" to="step3"/>
</step>
<step id="step3" parent="s3">
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1())
.next(step2())
.on("FAILED").end()
.from(step2()).on("*").to(step3())
.end()
.build();
}
未能通过
将步骤配置为在给定点失败将指示Job
以FAILED
的BatchStatus
停止。与 end 不同,Job
的故障不会阻止Job
重新启动。
使用 XML 配置时,'fail'元素还允许使用可选的'exit-code'属性,该属性可用于自定义Job
的ExitStatus
。如果未提供“退出代码”属性,则ExitStatus
默认为FAILED
,以匹配BatchStatus
。
在以下情况下,如果step2
失败,则Job
以BatchStatus
FAILED
停止并且ExitStatus
EARLY TERMINATION
停止并且step3
不执行。否则,执行移至step3
。此外,如果step2
失败并且Job
重新启动,则从step2
重新开始执行。
XML Configuration
<step id="step1" parent="s1" next="step2">
<step id="step2" parent="s2">
<fail on="FAILED" exit-code="EARLY TERMINATION"/>
<next on="*" to="step3"/>
</step>
<step id="step3" parent="s3">
Java Configuration
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1())
.next(step2()).on("FAILED").fail()
.from(step2()).on("*").to(step3())
.end()
.build();
}
按给定步骤停止作业
将作业配置为在特定步骤停止将指示Job
以STOPPED
的BatchStatus
停止。停止Job
可以暂时中断处理,以便操作员可以在重新启动Job
之前采取一些措施。
使用 XML 配置时,“停止”元素需要一个“重新启动”属性,该属性指定“重新启动作业”时应执行的步骤。
使用 Java 配置时,stopAndRestart
方法需要一个'restart'属性,该属性指定“重新启动作业”时应执行的步骤。
在以下情况下,如果step1
以COMPLETE
结尾,则作业将停止。重新启动后,执行将从step2
开始。
<step id="step1" parent="s1">
<stop on="COMPLETED" restart="step2"/>
</step>
<step id="step2" parent="s2"/>
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1()).on("COMPLETED").stopAndRestart(step2())
.end()
.build();
}
1.3.4. 程序流程决策
在某些情况下,可能需要比ExitStatus
更多的信息来决定下一步执行哪个步骤。在这种情况下,可以使用JobExecutionDecider
来辅助决策,如以下示例所示:
public class MyDecider implements JobExecutionDecider {
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
String status;
if (someCondition()) {
status = "FAILED";
}
else {
status = "COMPLETED";
}
return new FlowExecutionStatus(status);
}
}
在以下示例作业配置中,decision
指定要使用的决策程序以及所有转换:
XML Configuration
<job id="job">
<step id="step1" parent="s1" next="decision" />
<decision id="decision" decider="decider">
<next on="FAILED" to="step2" />
<next on="COMPLETED" to="step3" />
</decision>
<step id="step2" parent="s2" next="step3"/>
<step id="step3" parent="s3" />
</job>
<beans:bean id="decider" class="com.MyDecider"/>
在以下示例中,使用 Java 配置时,将实现JobExecutionDecider
的 bean 直接传递给next
调用。
Java Configuration
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1())
.next(decider()).on("FAILED").to(step2())
.from(decider()).on("COMPLETED").to(step3())
.end()
.build();
}
1.3.5. 分流
到目前为止,所描述的每种情况都涉及一个Job
,它一次线性地执行其步骤。除了这种典型的样式外,Spring Batch 还允许使用并行流来配置作业。
XML 名称空间允许您使用'split'元素。如下例所示,“ split”元素包含一个或多个“ flow”元素,可以在其中定义整个单独的流。 “拆分”元素还可以包含任何先前讨论的过渡元素,例如“下一个”属性或“下一个”,“结束”或“失败”元素。
<split id="split1" next="step4">
<flow>
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
<flow>
<step id="step3" parent="s3"/>
</flow>
</split>
<step id="step4" parent="s4"/>
基于 Java 的配置使您可以通过提供的构建器配置拆分。如下例所示,“ split”元素包含一个或多个“ flow”元素,可以在其中定义整个单独的流。 “拆分”元素还可以包含任何先前讨论的过渡元素,例如“下一个”属性或“下一个”,“结束”或“失败”元素。
@Bean
public Job job() {
Flow flow1 = new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
Flow flow2 = new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
return this.jobBuilderFactory.get("job")
.start(flow1)
.split(new SimpleAsyncTaskExecutor())
.add(flow2)
.next(step4())
.end()
.build();
}
1.3.6. 外部化流程定义和作业之间的依赖关系
可以将作业中的部分流程外部化为单独的 Bean 定义,然后重新使用。有两种方法可以这样做。首先是简单地将流声明为对其他地方定义的流的引用,如以下示例所示:
XML Configuration
<job id="job">
<flow id="job1.flow1" parent="flow1" next="step3"/>
<step id="step3" parent="s3"/>
</job>
<flow id="flow1">
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
Java Configuration
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(flow1())
.next(step3())
.end()
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
如前面的示例所示,定义外部流程的效果是将外部流程中的步骤插入作业中,就像它们已被内联声明一样。这样,许多作业可以引用相同的模板流,并将这些模板组成不同的逻辑流。这也是分离单个流的集成测试的好方法。
外部化流程的另一种形式是使用JobStep
。 JobStep
与FlowStep
类似,但实际上为指定流程中的步骤创建并启动了单独的作业执行。
以下 XML 代码段显示了JobStep
的示例:
XML Configuration
<job id="jobStepJob" restartable="true">
<step id="jobStepJob.step1">
<job ref="job" job-launcher="jobLauncher"
job-parameters-extractor="jobParametersExtractor"/>
</step>
</job>
<job id="job" restartable="true">...</job>
<bean id="jobParametersExtractor" class="org.spr...DefaultJobParametersExtractor">
<property name="keys" value="input.file"/>
</bean>
以下 Java 代码段显示了JobStep
的示例:
Java Configuration
@Bean
public Job jobStepJob() {
return this.jobBuilderFactory.get("jobStepJob")
.start(jobStepJobStep1(null))
.build();
}
@Bean
public Step jobStepJobStep1(JobLauncher jobLauncher) {
return this.stepBuilderFactory.get("jobStepJobStep1")
.job(job())
.launcher(jobLauncher)
.parametersExtractor(jobParametersExtractor())
.build();
}
@Bean
public Job job() {
return this.jobBuilderFactory.get("job")
.start(step1())
.build();
}
@Bean
public DefaultJobParametersExtractor jobParametersExtractor() {
DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();
extractor.setKeys(new String[]{"input.file"});
return extractor;
}
作业参数提取器是一种策略,用于确定Step
的ExecutionContext
如何转换为Job
的JobParameters
。 JobStep
在您想要一些更精细的选项来监视和报告作业和步骤时很有用。使用JobStep
通常也可以很好地回答以下问题:“如何在作业之间创建依赖关系?”这是将大型系统分解为较小的模块并控制作业流程的好方法。
1.4. 作业和步骤属性的后期绑定
前面显示的 XML 和平面文件示例都使用 Spring Resource
抽象来获取文件。之所以有效,是因为Resource
具有getFile
方法,该方法返回java.io.File
。可以使用标准 Spring 构造来配置 XML 和平面文件资源,如以下示例所示:
XML Configuration
<bean id="flatFileItemReader"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource"
value="file://outputs/file.txt" />
</bean>
Java Configuration
@Bean
public FlatFileItemReader flatFileItemReader() {
FlatFileItemReader<Foo> reader = new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource("file://outputs/file.txt"))
...
}
前面的Resource
从指定的文件系统位置加载文件。请注意,绝对位置必须以双斜杠(//
)开头。在大多数 Spring 应用程序中,此解决方案足够好,因为这些资源的名称在编译时就已知。但是,在批处理方案中,可能需要在运行时将文件名确定为作业的参数。可以使用“ -D”参数读取系统属性来解决。
以下 XML 代码段显示了如何从属性读取文件名:
XML Configuration
<bean id="flatFileItemReader"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="${input.file.name}" />
</bean>
以下 Java 代码段显示了如何从属性读取文件名:
Java Configuration
@Bean
public FlatFileItemReader flatFileItemReader(@Value("${input.file.name}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
要使该解决方案起作用,所需要做的只是一个系统参数(例如-Dinput.file.name="file://outputs/file.txt"
)。
Note
尽管此处可以使用PropertyPlaceholderConfigurer
,但始终设置系统属性不是必需的,因为 Spring 中的ResourceEditor
已经过滤并在系统属性上进行了占位符替换。
通常,在批处理设置中,最好对作业的JobParameters
中的文件名进行参数化设置,而不要通过系统属性来进行设置,然后以这种方式进行访问。为此,Spring Batch 允许后期绑定各种Job
和Step
属性,如以下代码片段所示:
XML Configuration
<bean id="flatFileItemReader" scope="step"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="#{jobParameters['input.file.name']}" />
</bean>
Java Configuration
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
可以以相同的方式访问JobExecution
和StepExecution
级别ExecutionContext
,如以下示例所示:
XML Configuration
<bean id="flatFileItemReader" scope="step"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="#{jobExecutionContext['input.file.name']}" />
</bean>
XML Configuration
<bean id="flatFileItemReader" scope="step"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="#{stepExecutionContext['input.file.name']}" />
</bean>
Java Configuration
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
Java Configuration
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{stepExecutionContext['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
Note
任何使用后期绑定的 bean 必须使用 scope =“ step”声明。有关更多信息,请参见Step Scope。
1.4.1. 步骤范围
上面的所有后期绑定示例在 bean 定义上都声明了“ step”的作用域,如以下示例所示:
XML Configuration
<bean id="flatFileItemReader" scope="step"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="#{jobParameters[input.file.name]}" />
</bean>
Java Configuration
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input.file.name]}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
为了使用后期绑定,需要使用Step
范围,因为在Step
启动之前,bean 才能被实例化,这样才能找到属性。因为默认情况下它不是 Spring 容器的一部分,所以必须通过使用batch
名称空间或通过显式包含StepScope
的 bean 定义或使用@EnableBatchProcessing
Comments 来显式添加范围。仅使用其中一种方法。以下示例使用batch
命名空间:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="...">
<batch:job .../>
...
</beans>
以下示例明确包含了 bean 定义:
<bean class="org.springframework.batch.core.scope.StepScope" />
1.4.2. 工作范围
在 Spring Batch 3.0 中引入的Job
范围在配置上类似于Step
范围,但是是Job
上下文的范围,因此每个正在运行的作业只有一个这样的 bean 实例。此外,还提供了对使用#{..}
占位符从JobContext
访问的引用进行后期绑定的支持。使用此功能,可以从作业或作业执行上下文以及作业参数中提取 Bean 属性,如以下示例所示:
XML Configuration
<bean id="..." class="..." scope="job">
<property name="name" value="#{jobParameters[input]}" />
</bean>
XML Configuration
<bean id="..." class="..." scope="job">
<property name="name" value="#{jobExecutionContext['input.name']}.txt" />
</bean>
Java Configuration
@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
Java Configuration
@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
由于默认情况下它不是 Spring 容器的一部分,因此必须通过使用batch
名称空间,通过为 JobScope 显式包括 bean 定义或使用@EnableBatchProcessing
Comments(但不是全部)来显式添加范围。以下示例使用batch
命名空间:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="...">
<batch:job .../>
...
</beans>
以下示例包括一个显式定义JobScope
的 bean:
<bean class="org.springframework.batch.core.scope.JobScope" />