On this page
1. 配置和运行作业
在domain section中,使用下图作为指南讨论了总体体系结构设计:
图 1.批处理原型
尽管Job
对象似乎是简单的步骤容器,但开发人员必须知道许多配置选项。此外,对于Job
将如何运行以及在此运行期间如何存储其元数据有许多考虑因素。本章将解释Job
的各种配置选项和运行时问题。
1.1. 配置作业
Job接口有多种实现方式,但是构建器可以消除配置差异。
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.end()
.build();
}
Job
(通常是其中的任何Step
)需要JobRepository
。 JobRepository
的配置通过BatchConfigurer处理。
上面的示例说明了由三个Step
实例组成的Job
。与作业相关的构建器还可以包含其他元素,以帮助并行化(Split
),声明性流控制(Decision
)和流定义的外部化(Flow
)。
Job接口有多种实现,但是,名称空间抽象了配置上的差异。它只有三个必需的依赖项:名称,JobRepository
和Step
实例列表。
<job id="footballJob">
<step id="playerload" parent="s1" next="gameLoad"/>
<step id="gameLoad" parent="s2" next="playerSummarization"/>
<step id="playerSummarization" parent="s3"/>
</job>
这里的示例使用父 bean 定义来创建步骤。有关内联声明特定步骤详细信息的更多选项,请参见step configuration部分。 XML 名称空间默认引用 ID 为'jobRepository'的存储库,这是明智的默认设置。但是,可以显式覆盖:
<job id="footballJob" job-repository="specialRepository">
<step id="playerload" parent="s1" next="gameLoad"/>
<step id="gameLoad" parent="s3" next="playerSummarization"/>
<step id="playerSummarization" parent="s3"/>
</job>
除了步骤之外,作业配置还可以包含其他有助于并行化(<split>
),声明性流控制(<decision>
)和流定义外部化(<flow/>
)的元素。
1.1.1. Restartability
执行批处理作业时的一个关键问题与Job
重新启动时的行为有关。如果特定JobInstance
已经存在JobExecution
,则启动Job
被认为是“重新启动”。理想情况下,所有作业都应该能够从中断的地方开始,但是在某些情况下这是不可能的。 *在这种情况下,完全由开发人员决定是否创建新的JobInstance
。但是,Spring Batch 确实提供了一些帮助。如果Job
绝不应该重新启动,而应始终作为新JobInstance
的一部分运行,则可重新启动属性可以设置为'false':
XML Configuration
<job id="footballJob" restartable="false">
...
</job>
Java Configuration
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.preventRestart()
...
.build();
}
换句话说,将 restartable 设置为 false 意味着“此Job
不支持再次启动”。重新启动无法重启的Job
会导致抛出JobRestartException
:
Job job = new SimpleJob();
job.setRestartable(false);
JobParameters jobParameters = new JobParameters();
JobExecution firstExecution = jobRepository.createJobExecution(job, jobParameters);
jobRepository.saveOrUpdate(firstExecution);
try {
jobRepository.createJobExecution(job, jobParameters);
fail();
}
catch (JobRestartException e) {
// expected
}
这个 JUnit 代码片段显示了第一次尝试为不可重新启动的作业创建JobExecution
不会造成任何问题。但是,第二次尝试将抛出JobRestartException
。
1.1.2. 拦截作业执行
在执行作业的过程中,通知其生命周期中的各种事件可能很有用,以便可以执行自定义代码。 SimpleJob
通过在适当的时候调用JobListener
来实现此目的:
public interface JobExecutionListener {
void beforeJob(JobExecution jobExecution);
void afterJob(JobExecution jobExecution);
}
可以通过作业上的 listeners 元素将JobListeners
添加到SimpleJob
中:
XML Configuration
<job id="footballJob">
<step id="playerload" parent="s1" next="gameLoad"/>
<step id="gameLoad" parent="s2" next="playerSummarization"/>
<step id="playerSummarization" parent="s3"/>
<listeners>
<listener ref="sampleListener"/>
</listeners>
</job>
Java Configuration
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.listener(sampleListener())
...
.build();
}
应该注意的是,无论作业成功与否,afterJob
都会被调用。如果需要确定成功或失败,可以从JobExecution
获得:
public void afterJob(JobExecution jobExecution){
if( jobExecution.getStatus() == BatchStatus.COMPLETED ){
//job success
}
else if(jobExecution.getStatus() == BatchStatus.FAILED){
//job failure
}
}
与此接口对应的 Comments 为:
@BeforeJob
@AfterJob
1.1.3. 从父项工作继承
如果一组 Jobs 共享相似但不相同的配置,则定义一个具体的 Jobs 可以继承其属性的“父” Job
可能会有所帮助。与 Java 中的类继承类似,“子” Job
将其元素和属性与父类结合在一起。
在下面的示例中,“ baseJob”是一个抽象的Job
定义,仅定义一个侦听器列表。 Job
“ job1”是一个具体的定义,它继承了“ baseJob”的侦听器列表,并将其与自己的侦听器列表合并,以产生带有两个侦听器和一个Step
的Job
,即“ step1”。
<job id="baseJob" abstract="true">
<listeners>
<listener ref="listenerOne"/>
<listeners>
</job>
<job id="job1" parent="baseJob">
<step id="step1" parent="standaloneStep"/>
<listeners merge="true">
<listener ref="listenerTwo"/>
<listeners>
</job>
有关更多详细信息,请参见从父步骤继承部分。
1.1.4. JobParametersValidator
在 XML 名称空间中声明的作业或使用AbstractJob
的任何子类可以在运行时声明作业参数的验证器。例如,当您需要 assert 一个作业使用其所有必填参数启动时,此功能很有用。有一个DefaultJobParametersValidator
可用于约束简单的强制性和可选参数的组合,对于更复杂的约束,您可以自己实现接口。
XML 名称空间通过作业的子元素来支持验证器的配置,例如:
<job id="job1" parent="baseJob3">
<step id="step1" parent="standaloneStep"/>
<validator ref="parametersValidator"/>
</job>
可以将验证器指定为引用(如上)或在 bean 名称空间中作为嵌套 bean 定义。
通过 Java 构建器支持验证器的配置,例如:
@Bean
public Job job1() {
return this.jobBuilderFactory.get("job1")
.validator(parametersValidator())
...
.build();
}
1.2. Java 配置
除了 XML,Spring 3 还提供了通过 Java 配置应用程序的功能。从 Spring Batch 2.2.0 开始,可以使用相同的 Java 配置来配置批处理作业。基于 Java 的配置有两个组件:@EnableBatchProcessing
Comments 和两个构建器。
@EnableBatchProcessing
的工作方式与 Spring 系列中的其他@ Enable *Comments 类似。在这种情况下,@EnableBatchProcessing
提供了用于构建批处理作业的基本配置。在此基本配置中,除了提供许多可自动装配的 bean 之外,还创建了StepScope
的实例:
JobRepository
-Bean 名称“ jobRepository”JobLauncher
-Bean 名称“ jobLauncher”JobRegistry
-Bean 名称“ jobRegistry”PlatformTransactionManager
-Bean 名称“ transactionManager”JobBuilderFactory
-Bean 名称为“ jobBuilders”StepBuilderFactory
-Bean 名称为“ stepBuilders”
此配置的核心接口是BatchConfigurer
。默认实现提供上面提到的 bean,并且在要提供的上下文中需要DataSource
作为 bean。 JobRepository 将使用此数据源。您可以通过创建BatchConfigurer
接口的自定义实现来自定义这些 bean 中的任何一个。通常,扩展DefaultBatchConfigurer
(如果未找到BatchConfigurer
时提供)并覆盖所需的吸气剂就足够了。但是,可能需要从头开始实现。以下示例显示了如何提供自定义事务 Management 器:
@Bean
public BatchConfigurer batchConfigurer() {
return new DefaultBatchConfigurer() {
@Override
public PlatformTransactionManager getTransactionManager() {
return new MyTransactionManager();
}
};
}
Note
只有一个配置类需要具有@EnableBatchProcessing
Comments。在为类加上 Comments 后,您将可以使用上述所有内容。
使用基本配置后,用户可以使用提供的构建器工厂来配置作业。以下是通过JobBuilderFactory
和StepBuilderFactory
配置的两步作业的示例。
@Configuration
@EnableBatchProcessing
@Import(DataSourceConfiguration.class)
public class AppConfig {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public Job job(@Qualifier("step1") Step step1, @Qualifier("step2") Step step2) {
return jobs.get("myJob").start(step1).next(step2).build();
}
@Bean
protected Step step1(ItemReader<Person> reader,
ItemProcessor<Person, Person> processor,
ItemWriter<Person> writer) {
return steps.get("step1")
.<Person, Person> chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
protected Step step2(Tasklet tasklet) {
return steps.get("step2")
.tasklet(tasklet)
.build();
}
}
1.3. 配置 JobRepository
使用@EnableBatchProcessing
时,为您提供了一个JobRepository
。本节介绍配置您自己的内容。
如前所述,JobRepository用于 Spring Batch 中各种持久化域对象(例如JobExecution
和StepExecution
)的基本 CRUD 操作。许多主要框架功能都需要它,例如JobLauncher
,Job
和Step
。
批处理名称空间抽象了JobRepository
实现及其协 Writer 的许多实现细节。但是,仍然有一些可用的配置选项:
XML Configuration
<job-repository id="jobRepository"
data-source="dataSource"
transaction-manager="transactionManager"
isolation-level-for-create="SERIALIZABLE"
table-prefix="BATCH_"
max-varchar-length="1000"/>
除了 ID 以外,上面列出的所有配置选项都不是必需的。如果未设置,将使用上面显示的默认值。出于认知目的,它们在上面显示。 max-varchar-length
默认为 2500,这是samples 模式脚本中长VARCHAR
列的长度
使用 Java 配置时,会为您提供JobRepository
。如果提供了DataSource
,则立即提供基于 JDBC 的 JDBC,否则不提供基于Map
的 JDBC。但是,您可以通过BatchConfigurer
接口的实现来自定义JobRepository
的配置。
Java Configuration
...
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(transactionManager);
factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
factory.setTablePrefix("BATCH_");
factory.setMaxVarCharLength(1000);
return factory.getObject();
}
...
除了 dataSource 和 transactionManager 外,不需要上面列出的配置选项。如果未设置,将使用上面显示的默认值。出于认知目的,它们在上面显示。 varchar 的最大长度默认为 2500,即samples 模式脚本中的长VARCHAR
列的长度
1.3.1. JobRepository 的事务配置
如果使用名称空间或提供的FactoryBean
,则会在存储库周围自动创建事务建议。这是为了确保正确保留批处理元数据(包括故障后重新启动所需的状态)。如果存储库方法不是事务性的,则框架的行为无法很好地定义。分别指定create*
方法属性中的隔离级别,以确保启动作业时,如果两个进程试图同时启动同一作业,则只有一个成功。该方法的默认隔离级别为 SERIALIZABLE,这非常激进:READ_COMMITTED 也可以工作;如果两个进程不太可能以这种方式冲突,则 READ_UNCOMMITTED 会很好。但是,由于对create*
方法的调用非常短,因此只要数据库平台支持,SERIALIZED 不太可能引起问题。但是,可以重写:
XML Configuration
<job-repository id="jobRepository"
isolation-level-for-create="REPEATABLE_READ" />
Java Configuration
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(transactionManager);
factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
return factory.getObject();
}
如果不使用名称空间或工厂 Bean,那么使用 AOP 配置存储库的事务行为也很重要:
XML Configuration
<aop:config>
<aop:advisor
pointcut="execution(* org.springframework.batch.core..*Repository+.*(..))"/>
<advice-ref="txAdvice" />
</aop:config>
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="*" />
</tx:attributes>
</tx:advice>
该片段可以原样使用,几乎没有变化。还记得还要包括适当的名称空间声明,并确保 spring-tx 和 spring-aop(或整个 spring)在 Classpath 中。
Java Configuration
@Bean
public TransactionProxyFactoryBean baseProxy() {
TransactionProxyFactoryBean transactionProxyFactoryBean = new TransactionProxyFactoryBean();
Properties transactionAttributes = new Properties();
transactionAttributes.setProperty("*", "PROPAGATION_REQUIRED");
transactionProxyFactoryBean.setTransactionAttributes(transactionAttributes);
transactionProxyFactoryBean.setTarget(jobRepository());
transactionProxyFactoryBean.setTransactionManager(transactionManager());
return transactionProxyFactoryBean;
}
1.3.2. 更改表前缀
JobRepository
的另一个可修改属性是元数据表的表前缀。默认情况下,它们都以 BATCH 开头。 BATCH_JOB_EXECUTION 和 BATCH_STEP_EXECUTION 是两个示例。但是,存在修改此前缀的潜在原因。如果模式名称需要在表名称之前,或者在同一模式内需要一组以上的元数据表,那么表前缀将需要更改:
XML Configuration
<job-repository id="jobRepository"
table-prefix="SYSTEM.TEST_" />
Java Configuration
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(transactionManager);
factory.setTablePrefix("SYSTEM.TEST_");
return factory.getObject();
}
鉴于上述更改,对元数据表的每个查询都将以“ SYSTEM.TEST_”为前缀。 BATCH_JOB_EXECUTION 将被称为 SYSTEM.TEST_JOB_EXECUTION。
Note
仅表前缀是可配置的。表名和列名不是。
1.3.3. 内存中的存储库
在某些情况下,您可能不想将域对象持久保存到数据库中。原因之一可能是速度。在每个提交点存储域对象会花费额外的时间。另一个原因可能是您不需要为特定工作保留状态。因此,Spring 批处理提供了作业存储库的内存 Map 版本:
XML Configuration
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager"/>
</bean>
Java Configuration
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(transactionManager);
factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
return factory.getObject();
}
请注意,内存中的存储库是易失性的,因此不允许在 JVM 实例之间重新启动。它还不能保证同时启动两个具有相同参数的作业实例,并且不适合在多线程 Job 或本地分区Step
中使用。因此,在需要这些功能的地方,请使用存储库的数据库版本。
但是,确实需要定义事务 Management 器,因为存储库中存在回滚语义,并且因为业务逻辑可能仍是事务性的(例如 RDBMS 访问)。出于测试目的,许多人认为ResourcelessTransactionManager
有用。
1.3.4. 存储库中的非标准数据库类型
如果使用的数据库平台不在受支持的平台列表中,并且 SQL 变量足够接近,则可以使用一种受支持的类型。为此,您可以使用原始的JobRepositoryFactoryBean
代替名称空间快捷方式,并使用它来将数据库类型设置为最接近的匹配项:
XML Configuration
<bean id="jobRepository" class="org...JobRepositoryFactoryBean">
<property name="databaseType" value="db2"/>
<property name="dataSource" ref="dataSource"/>
</bean>
Java Configuration
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setDatabaseType("db2");
factory.setTransactionManager(transactionManager);
return factory.getObject();
}
(如果未指定,JobRepositoryFactoryBean
会尝试从DataSource
自动检测数据库类型.)平台之间的主要差异主要是由增加主键的策略引起的,因此通常也有必要覆盖incrementerFactory
(使用 Spring 框架的标准实现之一)。
如果甚至不起作用,或者您没有使用 RDBMS,那么唯一的选择可能是实现SimpleJobRepository
所依赖的各种Dao
接口,并以正常的 Spring 方式手动将它们连接起来。
1.4. 配置 JobLauncher
使用@EnableBatchProcessing
时,为您提供了一个JobRepository
。本节介绍配置您自己的内容。
JobLauncher
接口的最基本实现是SimpleJobLauncher
。为了获得执行,它唯一需要的依赖项是JobRepository
:
XML Configuration
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
Java Configuration
...
// This would reside in your BatchConfigurer implementation
@Override
protected JobLauncher createJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
...
一旦获得JobExecution,它将被传递给 Job 的 execute 方法,最终将JobExecution
返回给调用者:
图 2. Job Launcher 序列
该序列很简单,从调度程序启动时效果很好。但是,尝试从 HTTP 请求启动时会出现问题。在这种情况下,启动需要异步完成,以便SimpleJobLauncher
立即返回到其调用方。这是因为在长时间运行的进程(例如批处理)所需的时间内保持 HTTP 请求处于打开状态是一种不好的做法。下面是一个示例序列:
图 3.异步作业启动器序列
可以通过配置TaskExecutor
轻松地将SimpleJobLauncher
配置为允许这种情况:
XML Configuration
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor">
<bean class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
</property>
</bean>
Java Configuration
@Bean
public JobLauncher jobLauncher() {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository());
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
spring TaskExecutor
接口的任何实现都可以用来控制作业的异步执行方式。
1.5. 运行工作
至少,启动批处理作业需要两件事:要启动的Job
和JobLauncher
。两者都可以包含在相同的上下文或不同的上下文中。例如,如果从命令行启动作业,则将为每个作业实例化一个新的 JVM,因此每个作业将具有自己的JobLauncher
。但是,如果从HttpRequest
范围内的 Web 容器中运行,通常将配置一个JobLauncher
用于异步作业启动,多个请求将被调用以启动其作业。
1.5.1. 从命令行运行作业
对于想要从企业计划程序运行其作业的用户,命令行是主要界面。这是因为大多数调度程序(除非是 Quartz,否则除非使用 NativeJob)都直接与 os 进程配合使用,而这些进程主要是从 Shell 脚本开始的。除了 Shell 脚本(例如 Perl,Ruby),甚至还有“构建工具”(例如 ant 或 maven)之外,还有许多启动 Java 进程的方法。但是,由于大多数人都熟悉 Shell 脚本,因此本示例将重点介绍它们。
The CommandLineJobRunner
因为启动作业的脚本必须启动 Java 虚拟机,所以需要一个具有 main 方法的类作为主要入口点。 Spring Batch 提供了一个实现此目的的实现:CommandLineJobRunner
。重要的是要注意,这只是引导应用程序的一种方法,但是有许多方法可以启动 Java 进程,并且绝对不应将此类视为 Authority。 CommandLineJobRunner
执行四个任务:
加载适当的
ApplicationContext
将命令行参数解析为
JobParameters
根据参数找到合适的工作
使用应用程序上下文中提供的
JobLauncher
启动作业。
所有这些任务仅使用传入的参数即可完成。以下是必填参数:
表 1. CommandLineJobRunner 参数
jobPath | 将用于创建ApplicationContext 的 XML 文件的位置。该文件应包含运行完整作业所需的所有内容 |
jobName | 要运行的作业的名称。 |
这些参数必须首先以路径和第二个名称传递。这些参数之后的所有参数均被视为JobParameters
,并且必须采用“名称=值”的格式:
<bash$ java CommandLineJobRunner endOfDayJob.xml endOfDay schedule.date(date)=2007/05/05
<bash$ java CommandLineJobRunner io.spring.EndOfDayJobConfiguration endOfDay schedule.date(date)=2007/05/05
在大多数情况下,您可能想使用 Lists 在 jar 中声明您的主类,但为简单起见,直接使用了该类。此示例使用的是domainLanguageOfBatch中的“ EndOfDay”示例。第一个参数是“ endOfDayJob.xml”,它是包含 Job 的 Spring ApplicationContext
。第二个参数'endOfDay'表示作业名称。最后一个参数'schedule.date(date)= 2007/05/05'将转换为JobParameters
。 XML 配置示例如下:
<job id="endOfDay">
<step id="step1" parent="simpleStep" />
</job>
<!-- Launcher details removed for clarity -->
<beans:bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher" />
在大多数情况下,您可能想使用 Lists 在 jar 中声明您的主类,但为简单起见,直接使用了该类。此示例使用的是domainLanguageOfBatch中的“ EndOfDay”示例。第一个参数是“ io.spring.EndOfDayJobConfiguration”,它是包含 Job 的配置类的完全限定类名。第二个参数'endOfDay'表示作业名称。最后一个参数'schedule.date(date)= 2007/05/05'将转换为 JobParameters。以下是 Java 配置的示例:
@Configuration
@EnableBatchProcessing
public class EndOfDayJobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job endOfDay() {
return this.jobBuilderFactory.get("endOfDay")
.start(step1())
.build();
}
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.tasklet((contribution, chunkContext) -> null)
.build();
}
}
此示例过于简单,因为通常在 Spring Batch 中运行批处理作业还有更多要求,但是它用于显示CommandLineJobRunner
的两个主要要求:Job
和JobLauncher
ExitCodes
从命令行启动批处理作业时,通常使用企业计划程序。大多数调度程序都非常笨,只能在流程级别上工作。这意味着他们只知道他们正在调用的某些 os 进程,例如 shell 脚本。在这种情况下,将作业成功或失败传达回调度程序的唯一方法是通过返回码。返回码是由进程返回到调度程序的数字,指示运行结果。在最简单的情况下:0 是成功,1 是失败。但是,可能存在更复杂的方案:如果作业 A 返回 4 个启动作业 B,并且如果作业 5 返回 5 个启动作业 C。这种类型的行为是在调度程序级别配置的,但是诸如 Spring Batch 提供了一种返回特定批处理作业的“退出代码”的数字表示的方法。在 Spring Batch 中,它封装在ExitStatus
内,在第 5 章中将进行更详细的介绍。出于讨论退出代码的目的,唯一重要的要知道的是ExitStatus
具有由框架设置的退出代码属性(或开发人员),并作为JobLauncher
返回的JobExecution
的一部分返回。 CommandLineJobRunner
使用ExitCodeMapper
接口将此字符串值转换为数字:
public interface ExitCodeMapper {
public int intValue(String exitCode);
}
ExitCodeMapper
的基本约定是,给定字符串退出代码,将返回数字表示形式。作业运行程序使用的默认实现是SimpleJvmExitCodeMapper
,它返回 0 表示完成,返回 1 表示一般错误,返回 2 表示任何作业运行程序错误,例如无法在提供的上下文中找到Job
。如果需要比上述 3 个值更复杂的东西,则必须提供ExitCodeMapper
接口的自定义实现。由于CommandLineJobRunner
是创建ApplicationContext
的类,因此无法“连接在一起”,因此必须自动连接需要覆盖的所有值。这意味着,如果在BeanFactory
中找到ExitCodeMapper
的实现,则将在创建上下文后将其注入运行器。提供自己的ExitCodeMapper
所需要做的就是将实现声明为根级 Bean,并确保它是运行程序加载的ApplicationContext
的一部分。
1.5.2. 从 Web 容器中运行作业
从历史上 Watch,如上所述,已从命令行启动了诸如批处理作业之类的脱机处理。但是,在许多情况下,从HttpRequest
启动是更好的选择。许多此类用例包括报告,临时作业运行和 Web 应用程序支持。因为按定义,批处理作业是长期运行的,所以最重要的问题是确保异步启动该作业:
图 4. Web 容器中的异步作业启动器序列
在这种情况下,该控制器是 Spring MVC 控制器。有关 Spring MVC 的更多信息,请参见https://docs.spring.io/spring/docs/current/spring-framework-reference/web.html#mvc。控制器使用已配置为启动asynchronously的JobLauncher
来启动Job
,它会立即返回JobExecution
。 Job
可能仍在运行,但是,这种非阻塞行为允许控制器立即返回,这在处理HttpRequest
时是必需的。下面是一个示例:
@Controller
public class JobLauncherController {
@Autowired
JobLauncher jobLauncher;
@Autowired
Job job;
@RequestMapping("/jobLauncher.html")
public void handle() throws Exception{
jobLauncher.run(job, new JobParameters());
}
}
1.6. 高级元数据使用
到目前为止,已经讨论了JobLauncher
和JobRepository
接口。它们一起代表了作业的简单启动和批处理域对象的基本 CRUD 操作:
图 5.作业存储库
JobLauncher
使用JobRepository
创建新的JobExecution
对象并运行它们。 Job
和Step
实现稍后在作业运行期间将相同的JobRepository
用于相同执行的基本更新。基本操作足以满足简单的场景,但是在具有成百上千个批处理作业和复杂的调度要求的大型批处理环境中,需要对元数据进行更高级的访问:
图 6.高级作业存储库访问
JobExplorer
和JobOperator
接口(将在下面进行讨论)添加了用于查询和控制元数据的其他功能。
1.6.1. 查询存储库
在使用任何高级功能之前,最基本的需求是能够查询存储库中现有的执行情况。 JobExplorer
界面提供了此功能:
public interface JobExplorer {
List<JobInstance> getJobInstances(String jobName, int start, int count);
JobExecution getJobExecution(Long executionId);
StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);
JobInstance getJobInstance(Long instanceId);
List<JobExecution> getJobExecutions(JobInstance jobInstance);
Set<JobExecution> findRunningJobExecutions(String jobName);
}
从上面的方法签名可以明显 Watch 出,JobExplorer
是JobRepository
的只读版本,并且像JobRepository
一样,可以通过工厂 bean 轻松配置:
XML Configuration
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
p:dataSource-ref="dataSource" />
Java Configuration
...
// This would reside in your BatchConfigurer implementation
@Override
public JobExplorer getJobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(this.dataSource);
return factoryBean.getObject();
}
...
本章前面,其中提到可以修改JobRepository
的表前缀以允许使用不同的版本或架构。由于JobExplorer
使用相同的表,因此它也需要设置前缀的能力:
XML Configuration
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
p:tablePrefix="SYSTEM."/>
Java Configuration
...
// This would reside in your BatchConfigurer implementation
@Override
public JobExplorer getJobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(this.dataSource);
factoryBean.setTablePrefix("SYSTEM.");
return factoryBean.getObject();
}
...
1.6.2. JobRegistry
JobRegistry
(及其父接口JobLocator
)不是强制性的,但如果要跟踪上下文中可用的作业,则它很有用。当作业在其他位置(例如在子上下文中)创建时,对于在应用程序上下文中集中收集作业也很有用。自定义JobRegistry
实现也可以用于操纵已注册作业的名称和其他属性。框架仅提供一种实现,该实现基于从作业名称到作业实例的简单 Map。
<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />
使用@EnableBatchProcessing
时,为您提供了一个JobRegistry
。如果要配置自己的:
...
// This is already provided via the @EnableBatchProcessing but can be customized via
// overriding the getter in the SimpleBatchConfiguration
@Override
@Bean
public JobRegistry jobRegistry() throws Exception {
return new MapJobRegistry();
}
...
有两种自动填充JobRegistry
的方法:使用 bean 后处理器和使用注册商生命周期组件。以下各节将介绍这两种机制。
JobRegistryBeanPostProcessor
这是一个 bean 后处理器,可以在创建所有作业时注册它们:
XML Configuration
<bean id="jobRegistryBeanPostProcessor" class="org.spr...JobRegistryBeanPostProcessor">
<property name="jobRegistry" ref="jobRegistry"/>
</bean>
Java Configuration
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry());
return postProcessor;
}
尽管不是严格必要的,但是示例中的后处理器已被赋予一个 ID,以便可以将其包含在子上下文中(例如作为父 bean 定义),并使在那里创建的所有作业也自动注册。
AutomaticJobRegistrar
这是一个生命周期组件,它创建子上下文并在创建这些上下文时注册这些上下文中的作业。这样做的优点之一是,尽管子上下文中的作业名称在注册表中仍必须是全局唯一的,但它们的依存关系可以具有“自然”名称。因此,例如,您可以创建一组 XML 配置文件,每个配置文件仅包含一个 Job,但都具有相同的 Bean 名称(例如_)的ItemReader
的不同定义。 “Reader”。如果将所有这些文件都导入到同一上下文中,那么读取器定义将相互冲突并覆盖,但是使用自动注册器可以避免这种情况。这使集成由应用程序的单独模块贡献的作业变得更加容易。
XML Configuration
<bean class="org.spr...AutomaticJobRegistrar">
<property name="applicationContextFactories">
<bean class="org.spr...ClasspathXmlApplicationContextsFactoryBean">
<property name="resources" value="classpath*:/config/job*.xml" />
</bean>
</property>
<property name="jobLoader">
<bean class="org.spr...DefaultJobLoader">
<property name="jobRegistry" ref="jobRegistry" />
</bean>
</property>
</bean>
Java Configuration
@Bean
public AutomaticJobRegistrar registrar() {
AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
registrar.setJobLoader(jobLoader());
registrar.setApplicationContextFactories(applicationContextFactories());
registrar.afterPropertiesSet();
return registrar;
}
注册服务商具有两个必填属性,一个是ApplicationContextFactory
的数组(此处是从方便的工厂 bean 创建的),另一个是JobLoader
。 JobLoader
负责 Management 子上下文的生命周期并在JobRegistry
中注册作业。
ApplicationContextFactory
负责创建子上下文,最常见的用法是如上使用ClassPathXmlApplicationContextFactory
。该工厂的功能之一是默认情况下将某些配置从父上下文复制到子级。因此,例如,如果子代中的PropertyPlaceholderConfigurer
或 AOP 配置应与父代相同,则不必重新定义它。
如果需要,可以将AutomaticJobRegistrar
与JobRegistryBeanPostProcessor
结合使用(只要也使用DefaultJobLoader
)。例如,如果在主要父级上下文以及子级位置中定义了作业,则可能需要这样做。
1.6.3. JobOperator
如前所述,JobRepository
提供对元数据的 CRUD 操作,而JobExplorer
提供对元数据的只读操作。但是,与批处理操作员通常一起使用时,这些操作一起执行常见的监视任务(如停止,重新启动或汇总作业)时最有用。 Spring Batch 通过JobOperator
接口提供了以下类型的操作:
public interface JobOperator {
List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;
List<Long> getJobInstances(String jobName, int start, int count)
throws NoSuchJobException;
Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;
String getParameters(long executionId) throws NoSuchJobExecutionException;
Long start(String jobName, String parameters)
throws NoSuchJobException, JobInstanceAlreadyExistsException;
Long restart(long executionId)
throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
NoSuchJobException, JobRestartException;
Long startNextInstance(String jobName)
throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;
boolean stop(long executionId)
throws NoSuchJobExecutionException, JobExecutionNotRunningException;
String getSummary(long executionId) throws NoSuchJobExecutionException;
Map<Long, String> getStepExecutionSummaries(long executionId)
throws NoSuchJobExecutionException;
Set<String> getJobNames();
}
上面的操作表示来自许多不同接口的方法,例如JobLauncher
,JobRepository
,JobExplorer
和JobRegistry
。因此,提供的JobOperator
,SimpleJobOperator
的实现具有许多依赖性:
<bean id="jobOperator" class="org.spr...SimpleJobOperator">
<property name="jobExplorer">
<bean class="org.spr...JobExplorerFactoryBean">
<property name="dataSource" ref="dataSource" />
</bean>
</property>
<property name="jobRepository" ref="jobRepository" />
<property name="jobRegistry" ref="jobRegistry" />
<property name="jobLauncher" ref="jobLauncher" />
</bean>
/**
* All injected dependencies for this bean are provided by the @EnableBatchProcessing
* infrastructure out of the box.
*/
@Bean
public SimpleJobOperator jobOperator(JobExplorer jobExplorer,
JobRepository jobRepository,
JobRegistry jobRegistry) {
SimpleJobOperator jobOperator = new SimpleJobOperator();
jobOperator.setJobExplorer(jobExplorer);
jobOperator.setJobRepository(jobRepository);
jobOperator.setJobRegistry(jobRegistry);
jobOperator.setJobLauncher(jobLauncher);
return jobOperator;
}
Note
如果您在作业存储库上设置了表前缀,请不要忘记也在作业资源 Management 器上进行设置。
1.6.4. JobParametersIncrementer
JobOperator
上的大多数方法都是不言自明的,而更详细的解释可以在接口的 javadoc上找到。但是,startNextInstance
方法值得注意。此方法将始终启动 Job 的新实例。如果JobExecution
中存在严重问题,并且作业需要从头开始重新开始,这将非常有用。但是与JobLauncher
不同,后者需要一个新的JobParameters
对象,如果参数与先前的任何参数集不同,该对象将触发新的JobInstance
,startNextInstance
方法将使用与Job
绑定的JobParametersIncrementer
来将Job
强制为新实例:
public interface JobParametersIncrementer {
JobParameters getNext(JobParameters parameters);
}
JobParametersIncrementer
的约定是,给定JobParameters对象,它将通过递增其可能包含的任何必要值来返回“下一个” JobParameters 对象。该策略很有用,因为该框架无法知道JobParameters
的哪些更改使其成为“下一个”实例。例如,如果JobParameters
中的唯一值是日期,并且应该创建下一个实例,那么该值应该增加一天吗?还是一周(例如,如果工作是每周一次)?对于有助于识别 Job 的任何数值,可以说相同,如下所示:
public class SampleIncrementer implements JobParametersIncrementer {
public JobParameters getNext(JobParameters parameters) {
if (parameters==null || parameters.isEmpty()) {
return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
}
long id = parameters.getLong("run.id",1L) + 1;
return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
}
}
在此示例中,键为“ run.id”的值用于区分JobInstances
。如果传入的JobParameters
为空,则可以假定Job
从未运行过,因此可以返回其初始状态。但是,如果不是,则将获得旧值,将其增加一并返回。
可以通过名称空间中的'incrementer'属性将一个增量器与Job
关联:
<job id="footballJob" incrementer="sampleIncrementer">
...
</job>
可以通过构建器中提供的incrementer
方法将增量器与“作业”关联:
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.incrementer(sampleIncrementer())
...
.build();
}
1.6.5. 停止工作
JobOperator
最常见的用例之一是正常停止作业:
Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());
关闭不是立即关闭的,因为没有办法强制立即关闭,尤其是当执行当前在框架无法控制的开发人员代码中时,例如业务服务。但是,一旦控制权返回到框架,它将把当前StepExecution
的状态设置为BatchStatus.STOPPED
,保存它,然后对JobExecution
进行同样的操作,然后再完成操作。
1.6.6. 中止工作
可以重新启动FAILED
的作业执行(如果Job
可重新启动)。框架不会重新启动状态为ABANDONED
的作业执行。 ABANDONED
状态还用于步骤执行中,以在重新启动的作业执行中将其标记为可跳过:如果作业正在执行,并且在上一个失败的作业执行中遇到了标记为ABANDONED
的步骤,它将 continue 进行下一个步骤(由作业流程定义和步骤执行退出状态决定)。
如果进程终止("kill -9"
或服务器故障),则该作业当然不会运行,但是JobRepository
无法知道,因为在进程终止之前没有人告诉过它。您必须手动告诉它,您知道执行失败或应被视为中止(将其状态更改为FAILED
或ABANDONED
)-这是一项业务决策,无法自动执行。仅当状态不可重新启动或知道重新启动数据有效时,才将状态更改为FAILED
。 Spring Batch Admin JobService
中有一个 Util 可以中止作业执行。