1. 缩放和并行处理

单线程,单流程作业可以解决许多批处理问题,因此在考虑更复杂的实现之前,适当地检查是否满足您的需求始终是一个好主意。测量实际工作的绩效,并查 Watch 最简单的实现是否首先满足您的需求。即使使用标准硬件,您也可以在一分钟之内读写数百兆的文件。

当您准备开始通过并行处理实现作业时,Spring Batch 提供了一系列选项,本章对此进行了介绍,尽管其他地方介绍了一些功能。从高层次上讲,有两种并行处理模式:

  • 单进程,多线程

  • Multi-process

这些也分为以下几类:

  • 多线程步骤(单进程)

  • 并行步骤(单个过程)

  • 步骤的远程分块(多进程)

  • 分区步骤(单个或多个进程)

首先,我们回顾一下单进程选项。然后,我们回顾多进程选项。

1.1.多线程步骤

开始并行处理的最简单方法是将TaskExecutor添加到您的 Step 配置中。

例如,您可以添加tasklet的属性,如以下示例所示:

<step id="loading">
    <tasklet task-executor="taskExecutor">...</tasklet>
</step>

使用 Java 配置时,可以将TaskExecutor添加到步骤中,如以下示例所示:

Java Configuration

@Bean
public TaskExecutor taskExecutor(){
    return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
        return this.stepBuilderFactory.get("sampleStep")
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .taskExecutor(taskExecutor)
                                .build();
}

在此示例中,taskExecutor是对另一个实现TaskExecutor接口的 bean 定义的引用。 TaskExecutor是标准的 Spring 界面,因此请查阅《 Spring 用户指南》以获取可用实现的详细信息。最简单的多线程TaskExecutorSimpleAsyncTaskExecutor

以上配置的结果是,Step通过在单独的执行线程中读取,处理和写入每个 Item 块(每个提交间隔)来执行。请注意,这意味着要处理的 Item 没有固定的 Sequences,与单线程情况相比,大块可能包含不连续的 Item。除了任务 Actuator 设置的任何限制(例如,是否由线程池支持)之外,tasklet 配置中还有一个限制值,该限制默认为 4.您可能需要增加此限制以确保线程池是充分利用。

例如,您可以增加油门极限,如以下示例所示:

<step id="loading"> <tasklet
    task-executor="taskExecutor"
    throttle-limit="20">...</tasklet>
</step>

使用 Java 配置时,构建器提供对油门限制的访问:

Java Configuration

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
        return this.stepBuilderFactory.get("sampleStep")
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .taskExecutor(taskExecutor)
                                .throttleLimit(20)
                                .build();
}

还请注意,您的步骤中使用的任何池化资源(例如DataSource)可能对并发设置了限制。确保在这些步骤中使这些资源中的池至少与所需的并发线程数一样大。

对于一些常见的批处理用例,使用多线程Step实现存在一些实际限制。 Step中的许多参与者(例如 Reader 和 Writer)都是有状态的。如果状态不是按线程隔离的,则这些组件不能在多线程Step中使用。特别是,Spring Batch 的大多数现成的读取器和写入器都不是为多线程使用而设计的。但是,可以与 Stateless 或线程安全的读取器和写入器一起使用,并且Spring Batchsample中有一个示例(称为parallelJob),该示例显示了使用过程指示器(请参见防止国家持续存在)来跟踪已处理的 Item 在数据库 Importing 表中。

Spring Batch 提供了ItemWriterItemReader的一些实现。通常,他们在 Javadoc 中说出它们是否是线程安全的,或者为避免在并发环境中发生问题而必须采取的措施。如果 Javadoc 中没有信息,则可以检查实现以查 Watch 是否存在任何状态。如果 Reader 不是线程安全的,则可以用提供的SynchronizedItemStreamReader装饰它,或在您自己的同步委托器中使用它。您可以将调用同步到read(),并且只要处理和写入是块中最昂贵的部分,您的步骤仍可能比单线程配置中的完成速度快得多。

1.2.Parallel 步骤

只要可以将需要并行化的应用程序逻辑划分为不同的职责并分配给各个步骤,那么就可以在单个过程中对其进行并行化。并行步骤执行易于配置和使用。

例如,与step3并行执行步骤(step1,step2)很简单,如以下示例所示:

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

使用 Java 配置时,与step3并行执行步骤(step1,step2)很简单,如以下示例所示:

Java Configuration

@Bean
public Job job() {
    return jobBuilderFactory.get("job")
        .start(splitFlow())
        .next(step4())
        .build()        //builds FlowJobBuilder instance
        .build();       //builds Job instance
}

@Bean
public Flow splitFlow() {
    return new FlowBuilder<SimpleFlow>("splitFlow")
        .split(taskExecutor())
        .add(flow1(), flow2())
        .build();
}

@Bean
public Flow flow1() {
    return new FlowBuilder<SimpleFlow>("flow1")
        .start(step1())
        .next(step2())
        .build();
}

@Bean
public Flow flow2() {
    return new FlowBuilder<SimpleFlow>("flow2")
        .start(step3())
        .build();
}

@Bean
public TaskExecutor taskExecutor(){
    return new SimpleAsyncTaskExecutor("spring_batch");
}

可配置任务 Actuator 用于指定应使用哪种TaskExecutor实现来执行各个流程。默认值为SyncTaskExecutor,但是需要异步TaskExecutor来并行运行这些步骤。请注意,该作业可确保拆分中的每个流在汇总 Export 状态和过渡之前完成。

有关更多详细信息,请参见Split Flows部分。

1.3.远程分块

在远程分块中,Step处理被划分为多个进程,并通过某种中间件相互通信。下图显示了该模式:

Remote Chunking

图 1.远程分块

主组件是单个进程,而从组件是多个远程进程。如果母版不是瓶颈,则此模式最有效,因此处理必须比读取 Item 更昂贵(在实践中通常如此)。

主版本是 Spring Batch Step的实现,其中ItemWriter被通用版本取代,该通用版本知道如何将 Item 块作为消息发送到中间件。从属设备是任何正在使用的中间件的标准侦听器(例如,对于 JMS,它们是MesssageListener实现),它们的作用是通过ChunkProcessor接口使用标准ItemWriterItemProcessorItemWriter处理 Item 块。使用此模式的优点之一是读取器,处理器和写入器组件都是现成的(与用于步骤的本地执行的组件相同)。这些 Item 是动态划分的,并且通过中间件共享工作,因此,如果侦听器都是渴望的使用者,那么负载平衡是自动的。

中间件必须是耐用的,并保证传递,每个消息由一个使用者使用。 JMS 是明显的候选者,但是网格计算和共享内存产品空间中还存在其他选项(例如 JavaSpaces)。

有关更多详细信息,请参见Split Flows部分。

1.4. Partitioning

Spring Batch 还提供了一个 SPI,用于划分Step执行并远程执行。在这种情况下,远程参与者是Step个实例,它们可以很容易地被配置并用于本地处理。下图显示了该模式:

Partitioning Overview

图 2.分区

Job作为一系列Step实例在左侧运行,并且Step实例之一被标记为主机。此图中的从属都是Step的相同实例,实际上可以代替 master,从而导致Job的结果相同。从站通常将成为远程服务,但也可能是本地执行线程。主机以这种方式发送给从机的消息不必是持久的或有保证的传递。 JobRepository中的 Spring Batch 元数据可确保每个从属对象执行一次,并且每次Job执行仅执行一次。

Spring Batch 中的 SPI 由Step的特殊实现(称为PartitionStep)和需要针对特定环境实现的两个策略接口组成。策略接口是PartitionHandlerStepExecutionSplitter,它们的作用在以下序列图中显示:

Partitioning SPI

图 3.对 SPI 进行分区

在这种情况下,右边的Step是“远程”从属,因此,可能有许多对象和/或进程在扮演该角色,并且显示了PartitionStep驱动执行。

以下示例显示了PartitionStep配置:

<step id="step1.master">
    <partition step="step1" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</step>

以下示例显示了使用 Java 配置的PartitionStep配置:

Java Configuration

@Bean
public Step step1Master() {
    return stepBuilderFactory.get("step1.master")
        .<String, String>partitioner("step1", partitioner())
        .step(step1())
        .gridSize(10)
        .taskExecutor(taskExecutor())
        .build();
}

类似于多线程步骤的throttle-limit属性,grid-size属性可防止任务执行程序被单个步骤的请求所饱和。

有一个简单的示例可以在Spring Batchsample的单元测试套件中复制和扩展(请参见Partition*Job.xml配置)。

Spring Batch 为名为“ step1:partition0”的分区创建步骤执行,依此类推。为了保持一致性,许多人喜欢将主步骤称为“ step1:master”。您可以为该步骤使用别名(通过指定name属性而不是id属性)。

1.4.1. PartitionHandler

PartitionHandler是了解远程或网格环境的结构的组件。它能够将StepExecution请求发送到远程Step实例,并以某些特定于结构的格式包装,例如 DTO。它不必知道如何拆分 Importing 数据或如何汇总多个Step执行的结果。一般而言,它可能也不需要了解弹性或故障转移,因为在许多情况下,这些是结构的功能。无论如何,Spring Batch 始终提供独立于架构的可重启性。发生故障的Job始终可以重新启动,并且仅重新执行发生故障的Steps

PartitionHandler接口可以具有适用于多种结构类型的专用实现,包括简单的 RMI 远程处理,EJB 远程处理,自定义 Web 服务,JMS,Java 空间,共享内存网格(例如 Terracotta 或 Coherence)和网格执行结构(例如 GridGain)。 Spring Batch 不包含任何专有网格或远程结构的实现。

但是,Spring Batch 确实提供了PartitionHandler的有用实现,该实现使用 Spring 的TaskExecutor策略在单独的执行线程中本地执行Step个实例。该实现称为TaskExecutorPartitionHandler

TaskExecutorPartitionHandler是使用前面显示的 XML 名称空间配置的默认步骤。也可以显式配置它,如以下示例所示:

<step id="step1.master">
    <partition step="step1" handler="handler"/>
</step>

<bean class="org.spr...TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="taskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>

可以在 java 配置中显式配置TaskExecutorPartitionHandler,如以下示例所示:

Java Configuration

@Bean
public Step step1Master() {
    return stepBuilderFactory.get("step1.master")
        .partitioner("step1", partitioner())
        .partitionHandler(partitionHandler())
        .build();
}

@Bean
public PartitionHandler partitionHandler() {
    TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
    retVal.setTaskExecutor(taskExecutor());
    retVal.setStep(step1());
    retVal.setGridSize(10);
    return retVal;
}

gridSize属性确定要创建的单独步骤执行的次数,因此可以与TaskExecutor中的线程池大小匹配。或者,可以将其设置为大于可用线程数,从而使工作块较小。

TaskExecutorPartitionHandler对于 IO 密集型Step实例很有用,例如复制大量文件或将文件系统复制到内容 Management 系统中。通过提供Step实现(也可以作为远程调用的代理)(例如,使用 Spring Remoting),也可以将其用于远程执行。

1.4.2. Partitioner

Partitioner的责任更简单:仅将生成上下文作为 Importing 参数生成,仅用于新步骤执行(无需担心重新启动)。它具有一个方法,如以下接口定义所示:

public interface Partitioner {
    Map<String, ExecutionContext> partition(int gridSize);
}

此方法的返回值将每个步骤执行的唯一名称(String)与ExecutionContext形式的 Importing 参数相关联。这些名称稍后将在 Batch 元数据中显示为分区StepExecutions中的步骤名称。 ExecutionContext只是一袋名称/值对,因此它可能包含一系列主键,行号或 Importing 文件的位置。然后,远程Step通常使用#{…}占位符(步骤范围中的后期绑定)绑定到上下文 Importing,如下一节所述。

Job的步骤执行中,步骤执行的名称(由Partitioner返回的Map中的键)必须是唯一的,但没有其他特定要求。最简单的方法(并使名称对用户有意义)是使用前缀后缀命名约定,其中前缀是正在执行的步骤的名称(其本身在Job中是唯一的),并且后缀只是一个计数器。使用此约定的框架中有一个SimplePartitioner

可以使用名为PartitionNameProvider的可选接口来与分区本身分开提供分区名称。如果Partitioner实现此接口,则在重新启动时仅查询名称。如果分区昂贵,这可能是有用的优化。 PartitionNameProvider提供的名称必须与Partitioner提供的名称匹配。

1.4.3.将 Importing 数据绑定到步骤

对于由PartitionHandler执行的步骤具有相同的配置,并在运行时从ExecutionContext绑定其 Importing 参数,这非常有效。使用 Spring Batch 的 StepScope 功能很容易做到(在Late Binding的小节中有更详细的介绍)。例如,如果Partitioner使用名为fileName的属性键创建ExecutionContext实例,并为每个步骤调用指向一个不同的文件(或目录),则Partitioner的输出可能类似于下表的内容:

表 1. Partitioner定位目录处理提供的执行上下文的示例步骤执行名称

步骤执行名称(键)ExecutionContext (value)
filecopy:partition0fileName=/home/data/one
filecopy:partition1fileName=/home/data/two
filecopy:partition2fileName=/home/data/three

然后,可以使用后期绑定到执行上下文将文件名绑定到步骤,如以下示例所示:

XML Configuration

<bean id="itemReader" scope="step"
      class="org.spr...MultiResourceItemReader">
    <property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>

Java Configuration

@Bean
public MultiResourceItemReader itemReader(
        @Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
        return new MultiResourceItemReaderBuilder<String>()
                        .delegate(fileReader())
                        .name("itemReader")
                        .resources(resources)
                        .build();
}