1. 缩放和 Parallel 处理


XML

Java

使用单线程,单个 process 作业可以解决许多批处理问题,因此在考虑更复杂的 implementation 之前,正确检查是否满足您的需求始终是一个很好的选择。测量现实 job 的 performance,看看最简单的 implementation 是否满足您的需求。即使使用标准硬件,您也可以在一分钟内读取和写入几百兆字节的文件。

当您准备开始实现带有 parallel 处理的 job 时,Spring Batch 提供了一系列选项,本章将介绍这些选项,尽管其他地方也有一些 features。在高级别,有两种 parallel 处理模式:

  • 单 process,multi-threaded

  • Multi-process

这些 break 也分为几类,如下:

  • Multi-threaded Step(单个 process)

  • Parallel Steps(单个 process)

  • Remote Step 的分块(多 process)

  • 分区 Step(单个或多个 process)

首先,我们审查 single-process 选项。然后我们查看 multi-process 选项。

1.1. Multi-threaded Step

启动 parallel 处理的最简单方法是在 Step configuration 中添加TaskExecutor

对于 example,您可以添加tasklet的属性,如以下 example 所示:

<step id="loading">
    <tasklet task-executor="taskExecutor">...</tasklet>
</step>

使用 java configuration 时,可以将TaskExecutor添加到 step,如下面的 example 所示:

Java Configuration

@Bean
public TaskExecutor taskExecutor(){
    return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
        return this.stepBuilderFactory.get("sampleStep")
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .taskExecutor(taskExecutor)
                                .build();
}

在此 example 中,taskExecutor是对另一个实现TaskExecutor接口的 bean 定义的 reference。 TaskExecutor 接口是标准的 Spring 接口,因此请参阅 Spring 用户指南以获取可用_implement 的详细信息。最简单的 multi-threaded TaskExecutorSimpleAsyncTaskExecutor

上述 configuration 的结果是Step通过在单独的执行线程中读取,处理和写入每个块(每个提交间隔)来执行。请注意,这意味着要处理的项目没有固定的 order,并且块可能包含与 single-threaded case 相比的项目。除了任务执行程序提供的任何限制(例如它是否由线程池支持)之外,tasklet configuration 中还有一个限制,默认为 4.您可能需要增加此限制以确保线程池是充分利用。

对于 example,您可以增加 throttle-limit,如下面的 example 所示:

<step id="loading"> <tasklet
    task-executor="taskExecutor"
    throttle-limit="20">...</tasklet>
</step>

使用 java configuration 时,构建器提供对限制限制的访问:

Java Configuration

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
        return this.stepBuilderFactory.get("sampleStep")
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .taskExecutor(taskExecutor)
                                .throttleLimit(20)
                                .build();
}

另请注意,step 中使用的任何池化资源可能会限制并发性,例如DataSource。确保使用这些资源中的池至少与 step 中所需的并发线程数一样大。

对于某些 common 批处理用例,使用 multi-threaded Step __mplement 有一些实际限制。 Step中的许多参与者(例如 readers 和 writers)都是有状态的。如果 state 没有被线程隔离,那么这些组件在 multi-threaded Step中不可用。特别是,Spring Batch 中的大多数 off-the-shelf readers 和_write 都不是为 multi-threaded 使用而设计的。但是,可以使用 stateless 或线程安全 readers 和 writers,并且Spring Batch Samples中有一个 sample(称为parallelJob),它显示使用 process 指示符(请参阅防止 State 持久化)来跟踪已处理的项目在数据库中输入 table。

Spring Batch 提供ItemWriterItemReader的一些 implementations。通常,他们在 Javadoc 中说他们是否是线程安全的,或者你需要做些什么来避免并发环境中的问题。如果 Javadoc 中没有信息,您可以检查 implementation 以查看是否有任何 state。如果 reader 不是线程安全的,您可以使用提供的SynchronizedItemStreamReader来装饰它,或者在您自己的同步委托器中使用它。您可以将调用同步到read()和 long 作为处理和写入是块中最昂贵的部分,您的 step 仍然可以比单线程 configuration 更快地完成。

1.2. Parallel Steps

作为 long,需要并行化的 application 逻辑可以拆分为不同的职责并分配给各个步骤,然后可以在单个 process 中并行化。 Parallel Step 执行易于配置和使用。

对于 example,在 parallel 中使用step3执行步骤(step1,step2)非常简单,如下面的示例所示:

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

使用 java configuration 时,在 parallel 中使用step3执行步骤(step1,step2)非常简单,如下面的示例所示:

Java Configuration

@Bean
public Job job() {
    return jobBuilderFactory.get("job")
        .start(splitFlow())
        .next(step4())
        .build()        //builds FlowJobBuilder instance
        .build();       //builds Job instance
}

@Bean
public Flow splitFlow() {
    return new FlowBuilder<SimpleFlow>("splitFlow")
        .split(taskExecutor())
        .add(flow1(), flow2())
        .build();
}

@Bean
public Flow flow1() {
    return new FlowBuilder<SimpleFlow>("flow1")
        .start(step1())
        .next(step2())
        .build();
}

@Bean
public Flow flow2() {
    return new FlowBuilder<SimpleFlow>("flow2")
        .start(step3())
        .build();
}

@Bean
public TaskExecutor taskExecutor(){
    return new SimpleAsyncTaskExecutor("spring_batch");
}

可配置任务执行程序用于指定应该使用哪个TaskExecutor implementation 来执行各个流。默认值为SyncTaskExecutor,但需要异步TaskExecutor来运行 parallel 中的步骤。请注意,job 确保在聚合退出状态和转换之前,拆分中的每个流都完成。

有关详细信息,请参阅分流部分。

1.3. Remote Chunking

在 remote chunking 中,Step处理分为多个进程,通过一些中间件相互通信。下图显示了 pattern:

图 1. Remote Chunking

master component 是一个 process,而 slave 是多个 remote 进程。如果 master 不是瓶颈,这种 pattern 效果最好,因此处理必须比读取项目更昂贵(在实践中通常是这种情况)。

master 是 Spring Batch Step的 implementation,ItemWriter被一般的 version 取代,后者知道如何将消息传递给中间件作为消息。对于任何正在使用的中间件,从属都是标准 listeners(例如,对于 JMS,它们将是MesssageListener implementations),它们的作用是使用标准ItemWriterItemProcessorItemWriter,通过ChunkProcessor接口来处理项目块。使用此 pattern 的一个优点是 reader,processor 和 writer 组件是 off-the-shelf(与用于本地执行 step 的组件相同)。这些项目是动态划分的,工作通过中间件共享,因此,如果 listeners 都是渴望消费者,那么负载平衡是自动的。

中间件必须经久耐用,保证交付,每个消息都有一个 consumer。 JMS 是显而易见的候选者,但其他选项(如 JavaSpaces)存在于网格计算和共享 memory 产品领域。

有关详细信息,请参阅Spring Batch Integration - Remote Chunking部分。

1.4. 分区

Spring Batch 还提供了一个 SPI,用于分区Step执行并远程执行。在这种情况下,remote 参与者是Step实例,可以很容易地配置并用于本地处理。下图显示了 pattern:

图 2.分区

Job在 left-hand 侧作为Step实例序列运行,其中一个Step实例标记为 master。这张照片中的奴隶都是Step的相同实例,实际上可以代替 master,导致Job的结果相同。从属服务器通常是 remote 服务,但也可以是本地执行线程。 master 发送给此 pattern 中的从属的消息不需要持久或保证传递。 JobRepository中的 Spring Batch 元数据确保每个从属执行一次,每次Job执行只执行一次。

Spring Batch 中的 SPI 由Step(称为PartitionStep)的特殊 implementation 和两个需要针对特定环境实现的策略接口组成。策略接口是PartitionHandlerStepExecutionSplitter,它们的作用如下面的序列图所示:

图 3.分区 SPI

在这种情况下,右侧的Step是“remote”从属,因此,可能有许多 objects 或者进程正在扮演这个角色,并且PartitionStep被显示为驱动执行。

以下 example 显示了PartitionStep configuration:

<step id="step1.master">
    <partition step="step1" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</step>

以下 example 显示了使用 java configuration 的PartitionStep configuration:

Java Configuration

@Bean
public Step step1Master() {
    return stepBuilderFactory.get("step1.master")
        .<String, String>partitioner("step1", partitioner())
        .step(step1())
        .gridSize(10)
        .taskExecutor(taskExecutor())
        .build();
}

与 multi-threaded step 的throttle-limit属性类似,grid-size属性可防止任务执行程序被单个 step 的请求所饱和。

有一个简单的 example 可以在单元测试套件中复制和扩展Spring Batch Samples(参见Partition*Job.xml configuration)。

Spring Batch 为名为“step1:partition0”的分区创建 step 执行,依此类推。许多人更喜欢将 master step“step1:master”称为一致性。您可以为 step 使用别名(通过指定name属性而不是id属性)。

1.4.1. PartitionHandler

PartitionHandler是知道远程或网格环境结构的 component。它能够向 remote Step实例发送StepExecution请求,以一些 fabric-specific 格式包装,就像 DTO 一样。它不必知道如何拆分输入数据或如何聚合多个Step执行的结果。一般来说,它可能也不需要了解弹性或故障转移,因为在许多情况下这些都是织物的特征。无论如何,Spring Batch 始终提供独立于结构的可重启性。失败的Job总是可以重新启动,只有失败的Steps是 re-executed。

PartitionHandler接口可以为各种结构类型提供专门的 implementation,包括简单的 RMI 远程处理,EJB 远程处理,自定义 web service,JMS,Java 空间,共享 memory 网格(如 Terracotta 或 Coherence)和网格执行结构(如 GridGain)。 Spring Batch 不包含任何专有网格或远程结构的 implementations。

但是,Spring Batch 提供了一个有用的PartitionHandler实现,它使用 Spring 中的TaskExecutor策略在不同的执行线程中本地执行Step实例。 implementation 称为TaskExecutorPartitionHandler

TaskExecutorPartitionHandler是使用前面显示的 XML 命名空间配置的 step 的默认值。它也可以显式配置,如下面的示例所示:

<step id="step1.master">
    <partition step="step1" handler="handler"/>
</step>

<bean class="org.spr...TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="taskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>

可以在 java configuration 中显式配置TaskExecutorPartitionHandler,如下面的示例所示:

Java Configuration

@Bean
public Step step1Master() {
    return stepBuilderFactory.get("step1.master")
        .partitioner("step1", partitioner())
        .partitionHandler(partitionHandler())
        .build();
}

@Bean
public PartitionHandler partitionHandler() {
    TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
    retVal.setTaskExecutor(taskExecutor());
    retVal.setStep(step1());
    retVal.setGridSize(10);
    return retVal;
}

gridSize属性确定要创建的单独 step 执行的数量,因此它可以与TaskExecutor中的线程池的大小匹配。或者,可以将其设置为大于可用线程数,这使得工作块更小。

TaskExecutorPartitionHandler对 IO-intensive Step实例很有用,例如复制__iles 文件的大数或将文件系统复制到内容管理系统中。它还可以用于 remote 执行,方法是提供Step implementation,它是 remote 调用的代理(例如使用 Spring Remoting)。

1.4.2. 分区

Partitioner有一个更简单的责任:生成执行上下文作为新 step 执行的输入参数(无需担心重启)。它有一个方法,如以下接口定义所示:

public interface Partitioner {
    Map<String, ExecutionContext> partition(int gridSize);
}

此方法的 return value 将每个 step 执行(String)的唯一 name 与ExecutionContext形式的输入参数相关联。这些名称稍后在批处理元数据中显示为分区StepExecutions中的 step name。 ExecutionContext只是一对 name-value 对,所以它可能包含一系列主键, line numbers 或输入文件的位置。然后 remote Step通常使用#{…}占位符(step 范围内的_binding)绑定到 context 输入,如下一节所示。

step 执行的名称(由Partitioner返回的Map中的键)在Job的 step 执行中必须是唯一的,但没有任何其他特定要求。最简单的方法(并使名称对用户有意义)是使用前缀后缀命名约定,其中前缀是正在执行的 step 的 name(它本身在Job中是唯一的),以及后缀只是一个柜台。 framework 中有SimplePartitioner使用此约定。

可以使用名为PartitionNameProvider的可选接口与分区本身分开提供分区名称。如果Partitioner实现此接口,则在重新启动时,仅查询名称。如果分区很昂贵,这可能是一个有用的优化。 PartitionNameProvider提供的名称必须_匹配Partitioner提供的名称。

1.4.3. 将输入数据绑定到步骤

对于PartitionHandler执行的具有相同配置的步骤以及它们的输入参数在运行时从ExecutionContext绑定非常有效。使用 Spring Batch 的 StepScope feature 很容易做到(在晚 Binding部分有更详细的介绍)。例如,如果Partitioner使用名为fileName的属性 key 创建ExecutionContext实例,指向每个 step 调用的不同文件(或目录),则Partitioner输出可能类似于以下 table 的内容:

Step Execution Name(key)ExecutionContext(value)
filecopy:partition0fileName=/home/data/one
filecopy:partition1fileName=/home/data/two
filecopy:partition2fileName=/home/data/three

然后可以使用__bin 延迟将 name 文件绑定到 step,执行 context,如下面的 example 所示:

XML Configuration

<bean id="itemReader" scope="step"
      class="org.spr...MultiResourceItemReader">
    <property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>

Java Configuration

@Bean
public MultiResourceItemReader itemReader(
        @Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
        return new MultiResourceItemReaderBuilder<String>()
                        .delegate(fileReader())
                        .name("itemReader")
                        .resources(resources)
                        .build();
}