7. 缩放和并行处理

单线程,单流程作业可以解决许多批处理问题,因此在考虑更复杂的实现之前,适当地检查是否满足您的需求始终是一个好主意。测量实际工作的性能,并首先查 Watch 最简单的实现是否满足您的需求:即使使用标准硬件,您也可以在一分钟之内轻松读写数百兆的文件。

当您准备开始通过并行处理实现作业时,Spring Batch 提供了一系列选项,本章对此进行了介绍,尽管其他地方介绍了一些功能。在较高级别上,有两种并行处理模式:单进程,多线程;和多进程。这些也分为以下几类:

  • 多线程步骤(单进程)

  • 并行步骤(单个过程)

  • 步骤的远程分块(多进程)

  • 分区步骤(单个或多个进程)

接下来,我们首先查 Watch 单进程选项,然后再查 Watch 多进程选项。

7.1 多线程步骤

开始并行处理的最简单方法是将TaskExecutor添加到您的 Step 配置中,例如作为tasklet的属性:

<step id="loading">
    <tasklet task-executor="taskExecutor">...</tasklet>
</step>

在此示例中,taskExecutor 是对另一个 Bean 定义的引用,实现了TaskExecutor接口。 TaskExecutor是标准的 Spring 界面,因此请查阅《 Spring 用户指南》以获取可用实现的详细信息。最简单的多线程TaskExecutorSimpleAsyncTaskExecutor

以上配置的结果是,该步骤通过在单独的执行线程中读取,处理和写入每个 Item 块(每个提交间隔)来执行。请注意,这意味着要处理的 Item 没有固定的 Sequences,与单线程情况相比,大块可能包含不连续的 Item。除了任务 Actuator 设置的任何限制(例如,如果它由线程池支持)之外,tasklet 配置中还有一个限制值,默认为 4.您可能需要增加此限制以确保线程池已满利用,例如

<step id="loading"> <tasklet
    task-executor="taskExecutor"
    throttle-limit="20">...</tasklet>
</step>

还请注意,您的步骤中使用的任何池化资源(例如DataSource)可能对并发设置了限制。确保在这些步骤中使这些资源中的池至少与所需的并发线程数一样大。

对于某些常见的批处理用例,使用多线程步骤存在一些实际限制。步骤中的许多参与者(例如,读取器和写入器)都是有状态的,并且如果状态不是按线程划分的,则这些组件不能在多线程步骤中使用。特别是,Spring Batch 的大多数现成的读写器都不是为多线程使用而设计的。但是,可以与 Stateless 或线程安全的读取器和写入器一起使用,并且 Spring Batchsamples 中有一个 samples(parallelJob),该 samples 显示了使用过程指示器(请参见第 6.12 节“防止状态持久性”)来跟踪已被删除的 Item。在数据库 Importing 表中处理。

Spring Batch 提供了ItemWriterItemReader的一些实现。通常,他们在 Javadocs 中说它们是否是线程安全的,或者为避免在并发环境中发生问题而必须采取的措施。如果 Javadocs 中没有信息,则可以检查实现以查 Watch 是否存在任何状态。如果 Reader 不是线程安全的,则在自己的同步委托器中使用它仍然可能是有效的。您可以将调用同步到read(),只要处理和写入是块中最昂贵的部分,您的步骤仍可能比单线程配置中完成速度快得多。

7.2Parallel 步骤

只要可以将需要并行化的应用程序逻辑划分为不同的职责,并分配给各个步骤,然后就可以在单个过程中对其进行并行化。并行步骤执行易于配置和使用,例如,要与step3并行执行步骤(step1,step2),可以配置如下流程:

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

可配置的“任务 Actuator”属性用于指定应使用哪个 TaskExecutor 实现来执行各个流程。默认值为SyncTaskExecutor,但是需要异步 TaskExecutor 才能并行运行这些步骤。请注意,该作业将确保在汇总 Export 状态并进行过渡之前,拆分中的每个流均已完成。

有关更多详细信息,请参见第 5.3.5 节“拆分流”部分。

7.3 远程分块

在远程分块中,分步处理被划分为多个进程,并通过某种中间件相互通信。这是实际模式的图片:

主组件是单个进程,从组件是多个远程进程。显然,如果母版不是瓶颈,则此模式最有效,因此处理必须比读取 Item 更昂贵(在实践中通常是这种情况)。

Master 只是 Spring Batch Step的一种实现,ItemWriter 替换为一个通用版本,该通用版本知道如何将 Item 块作为消息发送到中间件。从站是正在使用的任何中间件的标准侦听器(例如,对于 JMS,它们将是MesssageListeners),它们的作用是通过ChunkProcessor接口使用标准的ItemWriterItemProcessorItemWriter处理 Item 块。使用此模式的优点之一是读取器,处理器和写入器组件都是现成的(与用于步骤的本地执行的组件相同)。这些 Item 是动态划分的,并且工作是通过中间件共享的,因此,如果侦听器都是渴望的使用者,那么负载平衡是自动的。

中间件必须是耐用的,并保证传递和每个消息的单一使用者。 JMS 很明显是候选者,但是网格计算和共享内存产品空间(例如 Java Spaces)中存在其他选择。

7.4 Partitioning

Spring Batch 还提供了一个 SPI,用于对 Step 执行进行分区并远程执行。在这种情况下,远程参与者只是 Step 实例,可以轻松地对其进行配置并将其用于本地处理。这是实际模式的图片:

作业作为一系列步骤在左侧执行,并且其中一个步骤标记为“主要”。此图中的从属都是一个步骤的所有相同实例,实际上可以代替主从,从而为工作带来相同的结果。从站通常将是远程服务,但也可能是本地执行线程。主服务器以这种模式发送到从属服务器的消息不需要持久或不需要保证传递:JobRepository中的 Spring Batch 元数据将确保每个从属服务器执行一次,并且每次执行作业仅一次。

Spring Batch 中的 SPI 包含步骤(PartitionStep)的特殊实现,以及针对特定环境需要实现的两个策略接口。策略接口是PartitionHandlerStepExecutionSplitter,它们的作用在下面的序列图中显示:

在这种情况下,右侧的步骤是“远程”从设备,因此可能有许多对象和/或进程在扮演该角色,并且显示了 PartitionStep 驱动执行。 PartitionStep 配置如下所示:

<step id="step1.master">
    <partition step="step1" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</step>

与多线程步骤的油门限制属性相似,grid-size 属性可防止任务执行程序被单个步骤的请求所饱和。

有一个简单的示例,可以在 Spring Batch Samples 的单元测试套件中进行复制和扩展(请参见*PartitionJob.xml配置)。

Spring Batch 为名为“ step1:partition0”等的分区创建了步骤执行,因此许多人更喜欢将主步骤称为“ step1:master”以保持一致性。在 Spring 3.0 中,您可以使用步骤别名(指定name属性而不是id)来实现。

7.4.1 PartitionHandler

PartitionHandler是了解远程或网格环境的结构的组件。它能够将StepExecution请求发送到以某些特定于结构的格式包装的远程步骤,例如 DTO。它不必知道如何分割 Importing 数据,或如何汇总多个 Step 执行的结果。一般而言,它可能也不需要了解弹性或故障转移,因为在许多情况下,这些是结构的功能,并且无论如何,Spring Batch 始终提供独立于结构的可重新启动性:失败的 Job 可以始终重新启动,而只有失败的作业步骤将重新执行。

The PartitionHandler界面可以针对多种结构类型进行专门的实现:例如简单的 RMI 远程处理,EJB 远程处理,自定义 Web 服务,JMS,Java 空间,共享内存网格(例如 Terracotta 或 Coherence),网格执行结构(例如 GridGain)。 Spring Batch 不包含任何专有网格或远程结构的实现。

但是,Spring Batch 确实提供了PartitionHandler的有用实现,该实现使用 Spring 的TaskExecutor策略在单独的执行线程中本地执行步骤。该实现称为TaskExecutorPartitionHandler,它是使用上述 XML 名称空间配置的步骤的默认设置。也可以像这样显式配置:

<step id="step1.master">
    <partition step="step1" handler="handler"/>
</step>

<bean class="org.spr...TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="taskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>

gridSize确定要创建的单独步骤执行的数量,因此可以将其与TaskExecutor中的线程池的大小匹配,或者可以将其设置为大于可用线程的数量,在这种情况下,工作量较小。

TaskExecutorPartitionHandler对于 IO 密集型步骤非常有用,例如复制大量文件或将文件系统复制到内容 Management 系统中。通过提供作为远程调用代理的 Step 实现(例如,使用 Spring Remoting),也可以将其用于远程执行。

7.4.2 Partitioner

分区程序的职责更简单:仅将生成上下文作为 Importing 参数生成新步执行(无需担心重新启动)。它只有一个方法:

public interface Partitioner {
    Map<String, ExecutionContext> partition(int gridSize);
}

此方法的返回值将每个步骤执行的唯一名称(String)与ExecutionContext形式的 Importing 参数相关联。这些名称稍后会在批处理元数据中显示为分区StepExecutions中的步骤名称。 ExecutionContext只是一袋名称/值对,因此它可能包含一系列主键,行号或 Importing 文件的位置。然后,远程Step通常使用#{...}占位符(步骤范围中的后期绑定)绑定到上下文 Importing,如下一节所述。

步骤执行的名称(由Partitioner返回的Map中的键)在 Job 的步骤执行中必须是唯一的,但没有其他特定要求。最简单的方法是使前缀对用户有意义,这是使用前缀后缀命名约定,其中前缀是正在执行的步骤的名称(它本身在Job中是唯一的),而后缀只是一个计数器。使用此约定的框架中有一个SimplePartitioner

可选接口PartitioneNameProvider可用于与分区本身分开提供分区名称。如果Partitioner实现此接口,则在重新启动时仅查询名称。如果分区昂贵,这可能是有用的优化。显然,PartitioneNameProvider提供的名称必须与Partitioner提供的名称匹配。

7.4.3 将 Importing 数据绑定到步骤

由 PartitionHandler 执行的步骤具有相同的配置,并在运行时从 ExecutionContext 绑定它们的 Importing 参数,这非常有效。使用 Spring Batch 的 StepScope 功能很容易做到(在Late Binding的小节中有更详细的介绍)。例如,如果Partitioner使用属性键fileName创建ExecutionContext实例,并为每个步骤调用指向一个不同的文件(或目录),则Partitioner的输出可能如下所示:

表 7.1. 分区程序目标目录处理提供的示例执行上下文名称到执行上下文

步骤执行名称(键)ExecutionContext (value)
filecopy:partition0fileName=/home/data/one
filecopy:partition1fileName=/home/data/two
filecopy:partition2fileName=/home/data/three

然后,可以使用后期绑定到执行上下文将文件名绑定到步骤:

<bean id="itemReader" scope="step"
      class="org.spr...MultiResourceItemReader">
    <property name="resource" value="#{stepExecutionContext[fileName]}/*"/>
</bean>