1. Spring Batch Integration


XML

Java

1.1. Spring Batch Integration 简介

Spring Batch 的许多用户可能会遇到 Spring Batch 范围之外的要求,但可以使用 Spring Integration 有效且简洁地实现这些要求。相反,Spring Integration 用户可能会遇到 Spring Batch 要求,需要一种有效集成两个框架的方法。在这个 context 中,有几个模式和 use-cases emerge,而 Spring Batch Integration 解决了这些要求。

Spring Batch 和 Spring Integration 之间的 line 并不总是很清楚,但有两条建议可以帮助:考虑粒度,并应用 common 模式。其中一些 common 模式在本参考手册部分中有所描述。

将消息传递添加到批处理 process 可以实现操作的自动化以及 key 关注点的分离和策略。例如,消息可能触发 job 执行,然后可以通过各种方式公开消息的发送。或者,当 job 完成或失败时,该 event 可能会触发要发送的消息,并且这些消息的使用者可能具有与 application 本身无关的操作问题。消息传递也可以嵌入到 job 中(用于 example 读取或写入项目以便通过 channels 进行处理)。 Remote partitioning 和 remote chunking 提供了在多个 workers 上分配工作负载的方法。

本节介绍以下 key 概念:

  • 命名空间支持

  • 通过消息启动批处理作业

  • 提供信息性消息的反馈

  • 异步处理器

  • 外部化批处理 Process 执行

1.1.1. 命名空间支持

自 Spring Batch Integration 1.3 以来,添加了专用的 XML Namespace 支持,旨在提供更简单的 configuration 体验。在 order 中激活命名空间,将以下命名空间声明添加到 Spring XML Application Context 文件中:

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
  xsi:schemaLocation="
    http://www.springframework.org/schema/batch-integration
    https://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd">

    ...

</beans>

Spring Batch Integration 的完全配置的 Spring XML Application Context 文件可能如下所示:

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:batch="http://www.springframework.org/schema/batch"
  xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
  xsi:schemaLocation="
    http://www.springframework.org/schema/batch-integration
    https://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd
    http://www.springframework.org/schema/batch
    https://www.springframework.org/schema/batch/spring-batch.xsd
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd">

    ...

</beans>

将 version numbers 附加到引用的 XSD 文件也是允许的,但是,由于 version-less 声明始终使用最新的 schema,我们通常不建议将 version number 附加到 XSD name。添加 version number 可能会在更新 Spring Batch Integration 依赖项时产生问题,因为它们可能需要更新版本的 XML schema。

1.1.2. 通过消息启动批处理作业

使用核心 Spring Batch API 启动批处理作业时,基本上有 2 个选项:

  • 从命令 line,与CommandLineJobRunner

  • 以编程方式,使用JobOperator.start()JobLauncher.run()

对于 example,您可能希望在使用 shell 脚本调用批处理作业时使用CommandLineJobRunner。或者,您可以直接使用JobOperator(对于 example,当使用 Spring Batch 作为 web application 的一部分时)。但是,更复杂的用例呢?也许您需要轮询一个 remote(S)FTP 服务器来检索 Batch Job 的数据,或者您的 application 必须同时支持多个不同的数据源。例如,您可能不仅从 web 接收数据 files,而且还从 FTP 和其他在调用 Spring Batch 之前,可能需要对输入 files 进行额外的转换。

因此,使用 Spring Integration 及其众多适配器执行批处理 job 会更强大。例如,您可以使用文件入站 Channel 适配器来监视 file-system 中的目录,并在输入文件到达后立即启动 Batch Job。此外,您可以创建使用多个不同适配器的 Spring Integration 流,以便仅使用 configuration 同时从多个源轻松地接收批处理作业的数据。使用 Spring Integration 实现所有这些场景很简单,因为它允许JobLauncher的解耦,event-driven 执行。

Spring Batch Integration 提供了可用于启动批处理作业的JobLaunchingMessageHandler class。 JobLaunchingMessageHandler的输入由 Spring Integration 消息提供,该消息具有JobLaunchRequest类型的有效负载。这个 class 是一个围绕Job的 wrapper,它需要启动并围绕启动 Batch job 所需的JobParameters

下图说明了 order 中典型的 Spring Integration 消息流以启动 Batch job。 EIP(Enterprise Integration Patterns)网站提供了消息传递图标及其描述的完整概述。

图 1.启动 Batch Job

将文件转换为 JobLaunchRequest
package io.spring.sbi;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

import java.io.File;

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}
JobExecution 响应

当正在执行批处理 job 时,将返回JobExecution实例。此实例可用于确定执行的状态。如果能够成功创建JobExecution,则无论实际执行是否成功,都会始终返回JobExecution

返回JobExecution实例的确切行为取决于提供的TaskExecutor。如果使用了synchronous(single-threaded)TaskExecutor implementation,响应仅返回完成。使用asynchronous TaskExecutor时,会立即返回JobExecution实例。然后,用户可以使用id id实例(使用JobExecution.getJobId())并使用JobExplorer查询以获取 job 的更新状态。有关更多信息,请参阅查询 Repository上的 Spring Batch reference 文档。

Spring Batch Integration Configuration

以下 configuration 创建一个文件inbound-channel-adapter以在提供的目录中侦听 CSV files,将它们移交给我们的变换器(FileMessageToJobRequest),通过 Job 启动网关启动 job,然后使用logging-channel-adapter log 输出。

XML Configuration

<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>

<int-file:inbound-channel-adapter id="filePoller"
    channel="inboundFileChannel"
    directory="file:/tmp/myfiles/"
    filename-pattern="*.csv">
  <int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>

<int:transformer input-channel="inboundFileChannel"
    output-channel="outboundJobRequestChannel">
  <bean class="io.spring.sbi.FileMessageToJobRequest">
    <property name="job" ref="personJob"/>
    <property name="fileParameterName" value="input.file.name"/>
  </bean>
</int:transformer>

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel"/>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

Java Configuration

@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
    simpleJobLauncher.setJobRepository(jobRepository);
    simpleJobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(simpleJobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            handle(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}
Example ItemReader Configuration

现在我们正在轮询 files 并启动作业,我们需要配置 Spring Batch ItemReader(对于 example)以使用 job 参数“input.file.name”定义的位置找到的 files,如下面的 bean configuration 所示:

XML Configuration

<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
    scope="step">
  <property name="resource" value="file://#{jobParameters['input.file.name']}"/>
    ...
</bean>

Java Configuration

@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
    FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
    flatFileItemReader.setResource(new FileSystemResource(resource));
...
    return flatFileItemReader;
}

前面的 example 中的主要兴趣点是将的 value 作为 Resource property value 注入,并将ItemReader bean 设置为具有 Step 范围。将 bean 设置为具有 Step 范围会利用后期 binding 支持,从而允许访问jobParameters变量。

1.2. Job-Launching 网关的可用属性

job-launching 网关具有以下属性,您可以设置它们来控制 job:

  • id:标识基础 Spring bean 定义,它是以下任一个的实例:

  • EventDrivenConsumer

  • PollingConsumer(确切的 implementation 取决于 component 的输入 channel 是SubscribableChannel还是PollableChannel .)

  • auto-startup:Boolean flag 表示端点应在启动时自动启动。默认值为 true。

  • request-channel:此端点的输入MessageChannel

  • 发送JobExecution有效负载的reply-channelMessageChannel

  • reply-timeout:允许您指定此网关在抛出 exception 之前等待将回复消息成功发送到回复 channel 的 long(以毫秒为单位)。此属性仅在 channel 可能阻止时应用(对于 example,当使用当前已满的有界队列 channel 时)。另外,请记住,当发送到DirectChannel时,调用发生在发送者的线程中。因此,发送操作的失败可能是由更下游的其他组件引起的。 reply-timeout属性 maps 到底层MessagingTemplate实例的sendTimeout property。如果未指定,则该属性默认为<emphasis> -1 </emphasis>,这意味着默认情况下Gateway无限期等待。

  • job-launcher:可选。接受自定义JobLauncher bean reference。如果未指定适配器 re-uses 在id id下注册的实例。如果不存在默认实例,则抛出 exception。

  • order:当此端点作为订户连接到SubscribableChannel时,指定调用的 order。

1.3. Sub-Elements

GatewayPollableChannel接收消息时,您必须提供 global 默认Poller或向Job Launching Gateway提供Poller sub-element,如下面的 example 所示:

XML Configuration

<batch-int:job-launching-gateway request-channel="queueChannel"
    reply-channel="replyChannel" job-launcher="jobLauncher">
  <int:poller fixed-rate="1000">
</batch-int:job-launching-gateway>

Java Configuration

@Bean
@ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000"))
public JobLaunchingGateway sampleJobLaunchingGateway() {
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher());
    jobLaunchingGateway.setOutputChannel(replyChannel());
    return jobLaunchingGateway;
}

1.3.1. 提供信息性消息的反馈

由于 Spring Batch 作业可以运行 long,因此提供进度信息通常很关键。例如,如果批 job 的部分或全部部分失败,则可能希望通知 stake-holders。 Spring Batch 为通过以下方式收集的信息提供支持:

  • Active polling

  • Event-driven listeners

异步启动 Spring Batch job 时(对于 example,使用Job Launching Gateway),将返回JobExecution实例。因此,JobExecution.getJobId()可用于通过使用JobExplorerJobRepository检索JobExecution的更新实例来连续轮询状态更新。但是,这被认为是 sub-optimal,应该首选 event-driven 方法。

因此,Spring Batch 提供 listeners,包括三个最常用的 listeners:

  • StepListener

  • ChunkListener

  • JobExecutionListener

在下图所示的 example 中,Spring Batch job 已配置StepExecutionListener。因此,Spring Integration 在 events 之前或之后接收并处理任何 step。例如,可以使用Router检查收到的StepExecution。根据检查结果,可能会发生各种事情(例如将消息路由到 Mail Outbound Channel Adapter),以便根据某些条件发送电子邮件通知。

图 2.处理信息性消息

以下 two-part example 显示了 listener 如何配置为Gateway发送消息StepExecution events 并 log 将其输出发送到logging-channel-adapter

首先,创建通知 integration beans:

XML Configuration

<int:channel id="stepExecutionsChannel"/>

<int:gateway id="notificationExecutionsListener"
    service-interface="org.springframework.batch.core.StepExecutionListener"
    default-request-channel="stepExecutionsChannel"/>

<int:logging-channel-adapter channel="stepExecutionsChannel"/>

Java Configuration

@Bean
@ServiceActivator(inputChannel = "stepExecutionsChannel")
public LoggingHandler loggingHandler() {
    LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN);
    adapter.setLoggerName("TEST_LOGGER");
    adapter.setLogExpressionString("headers.id + ': ' + payload");
    return adapter;
}

@MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel")
public interface NotificationExecutionListener extends StepExecutionListener {}

您需要将@IntegrationComponentScan annotation 添加到 configuration 中。

其次,修改 job 以添加 step-level listener:

XML Configuration

<job id="importPayments">
    <step id="step1">
        <tasklet ../>
            <chunk ../>
            <listeners>
                <listener ref="notificationExecutionsListener"/>
            </listeners>
        </tasklet>
        ...
    </step>
</job>

Java Configuration

public Job importPaymentsJob() {
    return jobBuilderFactory.get("importPayments")
        .start(stepBuilderFactory.get("step1")
                .chunk(200)
                .listener(notificationExecutionsListener())
                ...
}

1.3.2. 异步处理器

异步处理器可帮助您扩展项目的处理。在异步处理器用例中,AsyncItemProcessor用作调度程序,在新线程上为 item 执行ItemProcessor的逻辑。 item 完成后,Future将传递给AsynchItemWriter进行写入。

因此,您可以通过使用异步 item 处理来增加 performance,基本上允许您实现 fork-join 场景。 AsyncItemWriter收集结果并在所有结果可用后立即写回块。

以下 example 显示了如何配置AsyncItemProcessor

XML Configuration

<bean id="processor"
    class="org.springframework.batch.integration.async.AsyncItemProcessor">
  <property name="delegate">
    <bean class="your.ItemProcessor"/>
  </property>
  <property name="taskExecutor">
    <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
  </property>
</bean>

Java Configuration

@Bean
public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) {
    AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor();
    asyncItemProcessor.setTaskExecutor(taskExecutor);
    asyncItemProcessor.setDelegate(itemProcessor);
    return asyncItemProcessor;
}

delegate property 引用您的ItemProcessor bean,taskExecutor property 引用您选择的TaskExecutor

以下 example 显示了如何配置AsyncItemWriter

XML Configuration

<bean id="itemWriter"
    class="org.springframework.batch.integration.async.AsyncItemWriter">
  <property name="delegate">
    <bean id="itemWriter" class="your.ItemWriter"/>
  </property>
</bean>

Java Configuration

@Bean
public AsyncItemWriter writer(ItemWriter itemWriter) {
    AsyncItemWriter asyncItemWriter = new AsyncItemWriter();
    asyncItemWriter.setDelegate(itemWriter);
    return asyncItemWriter;
}

同样,delegate property 实际上是对ItemWriter bean 的 reference。

1.3.3. 外部化批处理 Process 执行

到目前为止讨论的 integration 方法建议使用 Spring Integration 将 Spring Batch 包装成 outer-shell 的用例。但是,Spring Batch 也可以在内部使用 Spring Integration。使用这种方法,Spring Batch 用户可以将项目甚至块的处理委托给外部进程。这允许您卸载复杂的处理。 Spring Batch Integration 为以下方面提供专门的支持:

  • Remote Chunking

  • Remote Partitioning

Remote Chunking

图 3. Remote Chunking

进一步使用 step,还可以使用ChunkMessageChannelItemWriter(由 Spring Batch Integration 提供)来外部化块处理,这将发送项目并收集结果。一旦发送,Spring Batch 将继续读取和分组项目的 process,而无需等待结果。相反,ChunkMessageChannelItemWriter负责收集结果并将它们集成回 Spring Batch process。

使用 Spring Integration,您可以完全控制进程的并发性(例如,使用QueueChannel而不是DirectChannel)。此外,通过依赖 Spring Integration 丰富的 Channel 适配器集合(例如 JMS 和 AMQP),您可以将 Batch job 的块分发到外部系统进行处理。

一个带有 step 远程分块的简单 job 可能具有类似于以下的 configuration:

XML Configuration

<job id="personJob">
  <step id="step1">
    <tasklet>
      <chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
    </tasklet>
    ...
  </step>
</job>

Java Configuration

public Job chunkJob() {
     return jobBuilderFactory.get("personJob")
             .start(stepBuilderFactory.get("step1")
                     .<Person, Person>chunk(200)
                     .reader(itemReader())
                     .writer(itemWriter())
                     .build())
             .build();
 }

ItemReader reference 指向要用于读取 master 数据的 bean。 ItemWriter reference 指向一个特殊的ItemWriter(称为ChunkMessageChannelItemWriter),如上所述。处理器(如果有)不在 master configuration 中,因为它是在 worker 上配置的。以下 configuration 提供了基本的 master 设置。在实现用例时,应检查任何其他 component properties,例如限制限制等。

XML Configuration

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int-jms:outbound-channel-adapter id="jmsRequests" destination-name="requests"/>

<bean id="messagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate">
  <property name="defaultChannel" ref="requests"/>
  <property name="receiveTimeout" value="2000"/>
</bean>

<bean id="itemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step">
  <property name="messagingOperations" ref="messagingTemplate"/>
  <property name="replyChannel" ref="replies"/>
</bean>

<int:channel id="replies">
  <int:queue/>
</int:channel>

<int-jms:message-driven-channel-adapter id="jmsReplies"
    destination-name="replies"
    channel="replies"/>

Java Configuration

@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure outbound flow (requests going to workers)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(requests())
            .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
            .get();
}

/*
 * Configure inbound flow (replies coming from workers)
 */
@Bean
public QueueChannel replies() {
    return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
            .channel(replies())
            .get();
}

/*
 * Configure the ChunkMessageChannelItemWriter
 */
@Bean
public ItemWriter<Integer> itemWriter() {
    MessagingTemplate messagingTemplate = new MessagingTemplate();
    messagingTemplate.setDefaultChannel(requests());
    messagingTemplate.setReceiveTimeout(2000);
    ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter
            = new ChunkMessageChannelItemWriter<>();
    chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
    chunkMessageChannelItemWriter.setReplyChannel(replies());
    return chunkMessageChannelItemWriter;
}

前面的 configuration 为我们提供了许多 beans。我们使用 ActiveMQ 和 Spring Integration 提供的 inbound/outbound JMS 适配器配置我们的消息传递中间件。如图所示,11 bean 由我们的 job step 引用,使用ChunkMessageChannelItemWriter在已配置的中间件上写入块。

现在我们可以转到 worker configuration,如下面的示例所示:

XML Configuration

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int:channel id="requests"/>
<int:channel id="replies"/>

<int-jms:message-driven-channel-adapter id="incomingRequests"
    destination-name="requests"
    channel="requests"/>

<int-jms:outbound-channel-adapter id="outgoingReplies"
    destination-name="replies"
    channel="replies">
</int-jms:outbound-channel-adapter>

<int:service-activator id="serviceActivator"
    input-channel="requests"
    output-channel="replies"
    ref="chunkProcessorChunkHandler"
    method="handleChunk"/>

<bean id="chunkProcessorChunkHandler"
    class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
  <property name="chunkProcessor">
    <bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
      <property name="itemWriter">
        <bean class="io.spring.sbi.PersonItemWriter"/>
      </property>
      <property name="itemProcessor">
        <bean class="io.spring.sbi.PersonItemProcessor"/>
      </property>
    </bean>
  </property>
</bean>

Java Configuration

@Bean
public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    return factory;
}

/*
 * Configure inbound flow (requests coming from the master)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
            .channel(requests())
            .get();
}

/*
 * Configure outbound flow (replies going to the master)
 */
@Bean
public DirectChannel replies() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(replies())
            .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
            .get();
}

/*
 * Configure the ChunkProcessorChunkHandler
 */
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
    ChunkProcessor<Integer> chunkProcessor
            = new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
    ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler
            = new ChunkProcessorChunkHandler<>();
    chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
    return chunkProcessorChunkHandler;
}

master configuration 中的大多数 configuration 项应该看起来很熟悉。 Workers 不需要访问 Spring Batch JobRepository也不需要访问实际的 job configuration 文件。感兴趣的主要 bean 是chunkProcessorChunkHandlerChunkProcessorChunkHandlerchunkProcessor property 采用一个已配置的SimpleChunkProcessor,您可以在(以及可选的,ItemProcessor)11(_)中为_工程提供一个 reference,

有关更多信息,请参阅Remote Chunking上“可伸缩性”一章的部分。

从 version 4.1 开始,Spring Batch Integration 引入了@EnableBatchIntegration annotation,可用于简化 remote 分块设置。这个 annotation 提供了两个 beans,可以在 application context 中自动装配:

  • RemoteChunkingMasterStepBuilderFactory:用于配置 master step

  • RemoteChunkingWorkerBuilder:用于配置 remote worker integration 流程

这些 API 负责配置许多组件,如下图所示:

图 4. Remote Chunking Configuration

在 master 端,RemoteChunkingMasterStepBuilderFactory允许您通过声明来配置 master step:

  • item reader 读取项目并将它们发送给 workers

  • 输出 channel(“传出请求”)向 workers 发送请求

  • 输入 channel(“传入回复”)以接收来自 workers 的回复

不需要显式配置ChunkMessageChannelItemWriterMessagingTemplate(如果需要,仍然可以显式配置它们)。

在 worker 方面,RemoteChunkingWorkerBuilder允许您将 worker 配置为:

  • 收听 master 在输入 channel 上发送的请求(“传入请求”)

  • 使用已配置的ItemProcessorItemWriter为每个请求调用ChunkProcessorChunkHandlerhandleChunk方法

  • 将输出 channel(“传出回复”)的回复发送给 master

无需显式配置SimpleChunkProcessorChunkProcessorChunkHandler(如果需要,可以显式配置它们)。

以下 example 显示了如何使用这些 API:

@EnableBatchIntegration
@EnableBatchProcessing
public class RemoteChunkingJobConfiguration {

    @Configuration
    public static class MasterConfiguration {

        @Autowired
        private RemoteChunkingMasterStepBuilderFactory masterStepBuilderFactory;

        @Bean
        public TaskletStep masterStep() {
            return this.masterStepBuilderFactory.get("masterStep")
                       .chunk(100)
                       .reader(itemReader())
                       .outputChannel(requests()) // requests sent to workers
                       .inputChannel(replies())   // replies received from workers
                       .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemoteChunkingWorkerBuilder workerBuilder;

        @Bean
        public IntegrationFlow workerFlow() {
            return this.workerBuilder
                       .itemProcessor(itemProcessor())
                       .itemWriter(itemWriter())
                       .inputChannel(requests()) // requests received from the master
                       .outputChannel(replies()) // replies sent to the master
                       .build();
        }

        // Middleware beans setup omitted

    }

}

你可以找到 remote chunking job 这里的完整 example。

Remote Partitioning

图 5. Remote 分区

另一方面,Remote Partitioning 在不是项目处理而是导致瓶颈的相关 I/O 时非常有用。使用 Remote Partitioning,可以将工作分配给执行完整 Spring Batch 步骤的 workers。因此,每个 worker 都有自己的ItemReaderItemProcessorItemWriter。为此,Spring Batch Integration 提供MessageChannelPartitionHandler

这个PartitionHandler接口的实现使用MessageChannel实例向 remote workers 发送指令并接收它们的响应。这提供了用于与 remote workers 通信的传输(例如 JMS 和 AMQP)的良好抽象。

解决remote 分区的“可伸缩性”一章的部分概述了配置 remote 分区所需的概念和组件,并显示了在单独的本地执行线程中使用默认TaskExecutorPartitionHandler分区的 example。对于 remote 分区到多个 JVM,需要两个额外的组件:

  • 远程织物或网格环境

  • 一个PartitionHandler implementation,支持所需的远程处理结构或网格环境

与 remote chunking 类似,JMS 可以用作“远程处理结构”。在这种情况下,使用MessageChannelPartitionHandler实例作为PartitionHandler implementation,如上所述。以下 example 假定现有的分区 job 并关注MessageChannelPartitionHandler和 JMS configuration:

XML Configuration

<bean id="partitionHandler"
   class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
  <property name="stepName" value="step1"/>
  <property name="gridSize" value="3"/>
  <property name="replyChannel" ref="outbound-replies"/>
  <property name="messagingOperations">
    <bean class="org.springframework.integration.core.MessagingTemplate">
      <property name="defaultChannel" ref="outbound-requests"/>
      <property name="receiveTimeout" value="100000"/>
    </bean>
  </property>
</bean>

<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
    channel="outbound-requests"/>

<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
    channel="inbound-requests"/>

<bean id="stepExecutionRequestHandler"
    class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
  <property name="jobExplorer" ref="jobExplorer"/>
  <property name="stepLocator" ref="stepLocator"/>
</bean>

<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
    output-channel="outbound-staging"/>

<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
    channel="outbound-staging"/>

<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
    channel="inbound-staging"/>

<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
    output-channel="outbound-replies"/>

<int:channel id="outbound-replies">
  <int:queue/>
</int:channel>

<bean id="stepLocator"
    class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />

Java Configuration

/*
 * Configuration of the master side
 */
@Bean
public PartitionHandler partitionHandler() {
    MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
    partitionHandler.setStepName("step1");
    partitionHandler.setGridSize(3);
    partitionHandler.setReplyChannel(outboundReplies());
    MessagingTemplate template = new MessagingTemplate();
    template.setDefaultChannel(outboundRequests());
    template.setReceiveTimeout(100000);
    partitionHandler.setMessagingOperations(template);
    return partitionHandler;
}

@Bean
public QueueChannel outboundReplies() {
    return new QueueChannel();
}

@Bean
public DirectChannel outboundRequests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsRequests() {
    return IntegrationFlows.from("outboundRequests")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("requestsQueue"))
            .get();
}

@Bean
@ServiceActivator(inputChannel = "inboundStaging")
public AggregatorFactoryBean partitioningMessageHandler() throws Exception {
    AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
    aggregatorFactoryBean.setProcessorBean(partitionHandler());
    aggregatorFactoryBean.setOutputChannel(outboundReplies());
    // configure other propeties of the aggregatorFactoryBean
    return aggregatorFactoryBean;
}

@Bean
public DirectChannel inboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundJmsStaging() {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("stagingQueue"))
            .channel(inboundStaging())
            .get();
}

/*
 * Configuration of the worker side
 */
@Bean
public StepExecutionRequestHandler stepExecutionRequestHandler() {
    StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
    stepExecutionRequestHandler.setJobExplorer(jobExplorer);
    stepExecutionRequestHandler.setStepLocator(stepLocator());
    return stepExecutionRequestHandler;
}

@Bean
@ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
public StepExecutionRequestHandler serviceActivator() throws Exception {
    return stepExecutionRequestHandler();
}

@Bean
public DirectChannel inboundRequests() {
    return new DirectChannel();
}

public IntegrationFlow inboundJmsRequests() {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory())
                    .configureListenerContainer(c -> c.subscriptionDurable(false))
                    .destination("requestsQueue"))
            .channel(inboundRequests())
            .get();
}

@Bean
public DirectChannel outboundStaging() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundJmsStaging() {
    return IntegrationFlows.from("outboundStaging")
            .handle(Jms.outboundGateway(connectionFactory())
                    .requestDestination("stagingQueue"))
            .get();
}

您还必须确保分区handler属性 maps 到partitionHandler bean,如下面的 example 所示:

XML Configuration

<job id="personJob">
  <step id="step1.master">
    <partition partitioner="partitioner" handler="partitionHandler"/>
    ...
  </step>
</job>

Java Configuration

public Job personJob() {
                return jobBuilderFactory.get("personJob")
                                .start(stepBuilderFactory.get("step1.master")
                                                .partitioner("step1.worker", partitioner())
                                                .partitionHandler(partitionHandler())
                                                .build())
                                .build();
        }

您可以找到 remote 分区 job 这里的完整示例。

@EnableBatchIntegration annotation 可用于简化 remote 分区设置。这个 annotation 提供了两个 beans 对 remote 分区有用:

  • RemotePartitioningMasterStepBuilderFactory:用于配置 master step

  • RemotePartitioningWorkerStepBuilderFactory:用于配置 worker step

这些 API 负责配置许多组件,如下图所示:

图 6. Remote Partitioning Configuration(使用 job repository 轮询)

图 7. Remote Partitioning Configuration(带有回复聚合)

在 master 端,RemotePartitioningMasterStepBuilderFactory允许您通过声明来配置 master step:

  • Partitioner用于分区数据

  • 输出 channel(“传出请求”)向 workers 发送请求

  • 输入 channel(“传入回复”)以接收 workers 的回复(配置回复聚合时)

  • 轮询间隔和超时参数(配置 job repository 轮询时)

不需要显式配置MessageChannelPartitionHandlerMessagingTemplate(如果需要,仍然可以显式配置它们)。

在 worker 方面,RemotePartitioningWorkerStepBuilderFactory允许您将 worker 配置为:

  • 收听 master 在输入 channel 上发送的请求(“传入请求”)

  • 为每个请求调用StepExecutionRequestHandlerhandle方法

  • 将输出 channel(“传出回复”)的回复发送给 master

无需显式配置StepExecutionRequestHandler(如果需要,可以显式配置)。

以下 example 显示了如何使用这些 API:

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningJobConfiguration {

    @Configuration
    public static class MasterConfiguration {

        @Autowired
        private RemotePartitioningMasterStepBuilderFactory masterStepBuilderFactory;

        @Bean
        public Step masterStep() {
                 return this.masterStepBuilderFactory
                    .get("masterStep")
                    .partitioner("workerStep", partitioner())
                    .gridSize(10)
                    .outputChannel(outgoingRequestsToWorkers())
                    .inputChannel(incomingRepliesFromWorkers())
                    .build();
        }

        // Middleware beans setup omitted

    }

    @Configuration
    public static class WorkerConfiguration {

        @Autowired
        private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;

        @Bean
        public Step workerStep() {
                 return this.workerStepBuilderFactory
                    .get("workerStep")
                    .inputChannel(incomingRequestsFromMaster())
                    .outputChannel(outgoingRepliesToMaster())
                    .chunk(100)
                    .reader(itemReader())
                    .processor(itemProcessor())
                    .writer(itemWriter())
                    .build();
        }

        // Middleware beans setup omitted

    }

}