13. Spring Batch Integration

13.1. Spring Batch Integration 简介

Spring Batch 的许多用户可能会遇到 Spring Batch 范围之外的需求,但可以使用 Spring Integration 有效且简洁地实现。相反,Spring Batch 用户可能会遇到 Spring Batch 要求,需要一种有效集成两个框架的方法。在这个 context 中,有几个模式和 use-cases emerge 以及 Spring Batch Integration 将满足这些要求。

Spring Batch 和 Spring Integration 之间的 line 并不总是很清楚,但是可以遵循一些指导原则。原则上,它们是:考虑粒度,并应用 common 模式。其中一些 common 模式在本参考手册部分中有所描述。

将消息传递添加到批处理 process 可以实现操作的自动化,以及 key 关注点的分离和策略。对于 example,消息可能会触发 job 执行,然后可以通过多种方式显示消息的发送。或者当 job 完成或失败时可能触发要发送的消息,并且这些消息的使用者可能具有与 application 本身无关的操作问题。消息传递也可以嵌入到 job 中,用于 example 读取或写入项目以通过 channels 进行处理。 Remote partitioning 和 remote chunking 提供了在多个 workers 上分配工作负载的方法。

我们将介绍的一些 key 概念是:

13.1.1. 命名空间支持

自 Spring Batch Integration 1.3 以来,添加了专用的 XML Namespace 支持,旨在提供更简单的 configuration 体验。在 order 中激活命名空间,将以下命名空间声明添加到 Spring XML Application Context 文件中:

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
  xsi:schemaLocation="
    http://www.springframework.org/schema/batch-integration
    http://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd">

    ...

</beans>

Spring Batch Integration 的完全配置的 Spring XML Application Context 文件可能如下所示:

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:batch="http://www.springframework.org/schema/batch"
  xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
  xsi:schemaLocation="
    http://www.springframework.org/schema/batch-integration
    http://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd
    http://www.springframework.org/schema/batch
    http://www.springframework.org/schema/batch/spring-batch.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration.xsd">

    ...

</beans>

将 version numbers 附加到引用的 XSD 文件也是允许的,但由于 version-less 声明将始终使用最新的 schema,因此我们通常不建议将 version number 附加到 XSD name。例如,添加 version number 会在更新 Spring Batch Integration 依赖项时产生问题,因为它们可能需要更新版本的 XML schema。

13.1.2. 通过消息启动批处理作业

使用核心 Spring Batch API 启动批处理作业时,您基本上有两个选项:

  • 命令 line 通过CommandLineJobRunner

  • 以编程方式通过JobOperator.start()JobLauncher.run()

对于 example,您可能希望在使用 shell 脚本调用批处理作业时使用CommandLineJobRunner。或者,当使用 Spring Batch 作为 web application 的一部分时,您可以直接使用JobOperator作为 example。但是,更复杂的 use-cases 呢?也许您需要轮询一个 remote(S)FTP 服务器来检索 Batch Job 的数据。或者您的 application 必须同时支持多个不同的数据源。例如,您可能不仅通过 web 而且还通过 FTP 等接收数据 files。在调用 Spring Batch 之前,可能需要对输入 files 进行额外的转换。

因此,使用 Spring Integration 及其众多适配器执行批处理 job 会更强大。例如,您可以使用文件入站 Channel 适配器来监视 file-system 中的目录,并在输入文件到达后立即启动 Batch Job。此外,您可以创建使用多个不同适配器的 Spring Integration 流,以便仅使用 configuration 同时从多个源轻松接收批处理作业的数据。使用 Spring Integration 实现所有这些场景很容易,因为它允许JobLauncher的解耦 event-driven 执行。

Spring Batch Integration 提供了可用于启动批处理作业的JobLaunchingMessageHandler class。 JobLaunchingMessageHandler的输入由 Spring Integration 消息提供,该有效负载的类型为JobLaunchRequest。这个 class 是 Job 周围需要启动的 wrapper 以及启动 Batch job 所需的JobParameters

下图说明了 order 中典型的 Spring Integration 消息流以启动 Batch job。 EIP(Enterprise IntegrationPatterns)网站提供了消息传递图标及其描述的完整概述。

将文件转换为 JobLaunchRequest

package io.spring.sbi;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

import java.io.File;

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

JobExecution 响应

当正在执行 Batch Job 时,将返回JobExecution实例。此实例可用于确定执行的状态。如果能够成功创建JobExecution,则无论实际执行是否成功,都将始终返回JobExecution

返回JobExecution实例的确切行为取决于提供的TaskExecutor。如果使用synchronous(single-threaded)TaskExecutor implementation,响应仅在 job 完成时返回after。使用asynchronous TaskExecutor时,会立即返回JobExecution实例。然后,用户可以使用id id实例(JobExecution.getJobId())并使用JobExplorer查询以获取 job 的更新状态。有关更多信息,请参阅查询 Repository上的Spring Batch reference 文档。

以下 configuration 将创建一个文件inbound-channel-adapter来监听提供的目录中的 CSV files,将它们交给我们的变换器(FileMessageToJobRequest),通过 Job 启动网关启动 job 然后只需通过logging-channel-adapter log 输出。

Spring Batch Integration Configuration

<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>

<int-file:inbound-channel-adapter id="filePoller"
    channel="inboundFileChannel"
    directory="file:/tmp/myfiles/"
    filename-pattern="*.csv">
  <int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>

<int:transformer input-channel="inboundFileChannel"
    output-channel="outboundJobRequestChannel">
  <bean class="io.spring.sbi.FileMessageToJobRequest">
    <property name="job" ref="personJob"/>
    <property name="fileParameterName" value="input.file.name"/>
  </bean>
</int:transformer>

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel"/>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

现在我们正在轮询 files 并启动作业,我们需要为 example 配置 Spring Batch ItemReader以利用由 job 参数“input.file.name”表示的找到的文件:

Example ItemReader Configuration

<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
    scope="step">
  <property name="resource" value="file://#{jobParameters['input.file.name']}"/>
    ...
</bean>

这里主要关注的是注入的 value 作为 Resource property value,并将 ItemReader bean 设置为 Step 范围,以利用后期 binding 支持,允许访问jobParameters变量。

Job-Launching 网关的可用属性
  • id标识基础 Spring bean 定义,它是以下任一个的实例:

  • EventDrivenConsumer

  • PollingConsumer

确切的 implementation 取决于 component 的输入 channel 是否为:

  • SubscribableChannel

  • PollableChannel

  • auto-startup Boolean flag 表示端点应在启动时自动启动。默认的 istrue。

  • request-channel此端点的输入MessageChannel

  • reply-channel Message Channel将生成JobExecution有效负载。

  • reply-timeout允许您指定在抛出 exception 之前,此网关如何等待将回复消息成功发送到回复 channel。此属性仅在 channel 可能阻止时应用,对于 example,当使用当前已满的有界队列 channel 时。另外,请记住,当发送到DirectChannel时,调用将发生在发送者的线程中。因此,发送操作的失败可能是由更下游的其他组件引起的。 reply-timeout属性 maps 到底层MessagingTemplate实例的sendTimeout property。如果未指定,该属性将默认为 to-1,这意味着默认情况下,网关将无限期地等待。 value 以毫秒为单位指定。

  • job-launcher传入自定义JobLauncher bean reference。此属性是可选的。如果未指定,则适配器将 re-use 在 id jobLauncher下注册的实例。如果不存在默认实例,则抛出 exception。

  • order当此端点作为订户连接到SubscribableChannel时,指定用于调用的 order。

Sub-Elements

当此网关从PollableChannel接收消息时,您必须提供 global 默认轮询器或向Job Launching Gateway提供 Poller sub-element:

<batch-int:job-launching-gateway request-channel="queueChannel"
    reply-channel="replyChannel" job-launcher="jobLauncher">
  <int:poller fixed-rate="1000"/>
</batch-int:job-launching-gateway>

13.1.3. 提供信息性消息的反馈

由于 Spring Batch 作业可以运行 long,因此提供进度信息将至关重要。例如,如果 Batch Job 的部分或全部部分失败,可能希望通知 stake-holders。 Spring Batch 为通过以下方式收集的信息提供支持:

  • Active polling or

  • Event-driven,使用 listeners。

异步启动 Spring Batch job 时,e.g. 通过使用Job Launching Gateway,返回JobExecution实例。因此,JobExecution.getJobId()可以用于通过使用JobExplorerJobRepository检索JobExecution的更新实例来连续轮询状态更新。但是,这被认为是 sub-optimal,应该首选 event-driven 方法。

因此,Spring Batch 提供 listeners,例如:

  • StepListener

  • ChunkListener

  • JobExecutionListener

在下面的示例中,Spring Batch job 配置了StepExecutionListener。因此,Spring Integration 将接收并处理任何 step before/after step events。例如,可以使用Router检查收到的StepExecution。根据检查结果,example 将消息路由到 Mail Outbound Channel Adapter 可能会出现各种问题,因此可以根据某些条件发送电子邮件通知。

下面是 listener 如何配置为Gateway发送消息StepExecution events 并 log 将其输出发送到logging-channel-adapter的示例:

首先创建通知 integration beans:

<int:channel id="stepExecutionsChannel"/>

<int:gateway id="notificationExecutionsListener"
    service-interface="org.springframework.batch.core.StepExecutionListener"
    default-request-channel="stepExecutionsChannel"/>

<int:logging-channel-adapter channel="stepExecutionsChannel"/>

然后修改 job 以添加 step level listener:

<job id="importPayments">
    <step id="step1">
        <tasklet ../>
            <chunk ../>
            <listeners>
                <listener ref="notificationExecutionsListener"/>
            </listeners>
        </tasklet>
        ...
    </step>
</job>

13.1.4. 异步处理器

异步处理器可帮助您扩展项目的处理。在异步处理器 use-case 中,AsyncItemProcessor充当调度程序,在新线程上为 item 执行ItemProcessor的逻辑。处理器完成后,Future将传递给AsynchItemWriter以进行写入。

因此,您可以通过使用异步 item 处理来增加 performance,基本上允许您实现 fork-join 场景。只要所有结果都可用,AsyncItemWriter将收集结果并回写块。

__1 和AsyncItemWriter的配置都很简单,首先是AsyncItemProcessor

<bean id="processor"
    class="org.springframework.batch.integration.async.AsyncItemProcessor">
  <property name="delegate">
    <bean class="your.ItemProcessor"/>
  </property>
  <property name="taskExecutor">
    <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
  </property>
</bean>

property“delegate”实际上是对ItemProcessor bean 的 reference,“taskExecutor” property 是你自己选择的TaskExecutor

然后我们配置AsyncItemWriter

<bean id="itemWriter"
    class="org.springframework.batch.integration.async.AsyncItemWriter">
  <property name="delegate">
    <bean id="itemWriter" class="your.ItemWriter"/>
  </property>
</bean>

同样,property“delegate”实际上是对ItemWriter bean 的 reference。

13.1.5. 外部化批处理 Process 执行

到目前为止讨论的 integration 方法建议 use-cases 其中 Spring Integration 包装 Spring Batch 像 outer-shell。但是,Spring Batch 也可以在内部使用 Spring Integration。使用这种方法,Spring Batch 用户可以将项目甚至块的处理委托给外部进程。这允许您卸载复杂的处理。 Spring Batch Integration 为以下方面提供专门的支持:

  • Remote Chunking

  • Remote Partitioning

Remote Chunking

进一步采用一个步骤,也可以使用_Sp_ _Batch Integration 提供的ChunkMessageChannelItemWriter来外部化块处理,这将发送项目并收集结果。一旦发送,Spring Batch 将继续读取和分组项目的 process,而不等待结果。相反,ChunkMessageChannelItemWriter负责收集结果并将它们集成回 Spring Batch process。

使用 Spring Integration,您可以完全控制进程的并发性,例如使用QueueChannel而不是DirectChannel。此外,通过依赖 Spring Integration 丰富的 Channel 适配器集合(E.g.JMS 或 AMQP),您可以将 Batch job 的块分发到外部系统进行处理。

一个带有 step 远程分块的简单 job 将具有类似于以下的 configuration:

<job id="personJob">
  <step id="step1">
    <tasklet>
      <chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
    </tasklet>
    ...
  </step>
</job>

ItemReader reference 将指向您想要用于读取 master 上的数据的 bean。 ItemWriter reference 指向一个特殊的 ItemWriter“ChunkMessageChannelItemWriter”,如上所述。处理器(如果有)不在 master configuration 中,因为它在从站上配置。以下 configuration 提供了基本的 master 设置。建议在实现用例时检查任何其他 component properties,例如节流限制等。

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int-jms:outbound-channel-adapter id="requests" destination-name="requests"/>

<bean id="messagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate">
  <property name="defaultChannel" ref="requests"/>
  <property name="receiveTimeout" value="2000"/>
</bean>

<bean id="itemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step">
  <property name="messagingOperations" ref="messagingTemplate"/>
  <property name="replyChannel" ref="replies"/>
</bean>

<bean id="chunkHandler"
    class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean">
  <property name="chunkWriter" ref="itemWriter"/>
  <property name="step" ref="step1"/>
</bean>

<int:channel id="replies">
  <int:queue/>
</int:channel>

<int-jms:message-driven-channel-adapter id="jmsReplies"
    destination-name="replies"
    channel="replies"/>

这个 configuration 为我们提供了许多 beans。我们使用 Spring Integration 提供的 ActiveMQ 和 inbound/outbound JMS 适配器配置我们的消息传递中间件。如图所示,我们的 job step 引用的itemWriter bean 利用ChunkMessageChannelItemWriter在已配置的中间件上编写块。

现在让我们转到 slave configuration:

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int:channel id="requests"/>
<int:channel id="replies"/>

<int-jms:message-driven-channel-adapter id="jmsIn"
    destination-name="requests"
    channel="requests"/>

<int-jms:outbound-channel-adapter id="outgoingReplies"
    destination-name="replies"
    channel="replies">
</int-jms:outbound-channel-adapter>

<int:service-activator id="serviceActivator"
    input-channel="requests"
    output-channel="replies"
    ref="chunkProcessorChunkHandler"
    method="handleChunk"/>

<bean id="chunkProcessorChunkHandler"
    class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
  <property name="chunkProcessor">
    <bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
      <property name="itemWriter">
        <bean class="io.spring.sbi.PersonItemWriter"/>
      </property>
      <property name="itemProcessor">
        <bean class="io.spring.sbi.PersonItemProcessor"/>
      </property>
    </bean>
  </property>
</bean>

master configuration 中的大多数 configuration 项应该看起来很熟悉。 Slaves 不需要访问 Spring Batch JobRepository之类的东西,也不需要访问实际的 job configuration 文件。感兴趣的主要豆是“chunkProcessorChunkHandler”。 ChunkProcessorChunkHandlerchunkProcessor property 采用一个已配置的SimpleChunkProcessor,你可以在ItemWriter中提供 reference,也可以选择你的ItemProcessor,它会在从 master 收到块时在从站上运行。

有关详细信息,请参阅 Spring Batch 手册,特别是Remote Chunking章节。

Remote Partitioning

另一方面,当问题不是项目的处理时,Remote Partitioning 很有用,但关联的 I/O 代表了瓶颈。使用 Remote Partitioning,可以将工作分配给执行完整 Spring Batch 步骤的从属。因此,每个从站都有自己的ItemReaderItemProcessorItemWriter。为此,Spring Batch Integration 提供MessageChannelPartitionHandler

这个PartitionHandler接口的实现使用MessageChannel实例向 remote workers 发送指令并接收它们的响应。这为用于与 remote workers 通信的传输(E.g.JMS 或 AMQP)提供了很好的抽象。

reference 手册部分Remote Partitioning概述了配置 Remote Partitioning 所需的概念和组件,并显示了在单独的本地执行线程中使用默认TaskExecutorPartitionHandler分区的 example。对于 Remote 分区到多个 JVM,需要两个额外的组件:

  • 远程织物或网格环境

  • PartitionHandler implementation,支持所需的远程处理结构或网格环境

与 Remote Chunking 类似,JMS 可以用作“远程处理结构”,如上所述使用的 PartitionHandler implementation 是MessageChannelPartitionHandler。下面显示的 example 假定一个现有的分区 job,并专注于MessageChannelPartitionHandler和 JMS configuration:

<bean id="partitionHandler"
   class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
  <property name="stepName" value="step1"/>
  <property name="gridSize" value="3"/>
  <property name="replyChannel" ref="outbound-replies"/>
  <property name="messagingOperations">
    <bean class="org.springframework.integration.core.MessagingTemplate">
      <property name="defaultChannel" ref="outbound-requests"/>
      <property name="receiveTimeout" value="100000"/>
    </bean>
  </property>
</bean>

<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
    channel="outbound-requests"/>

<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
    channel="inbound-requests"/>

<bean id="stepExecutionRequestHandler"
    class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
  <property name="jobExplorer" ref="jobExplorer"/>
  <property name="stepLocator" ref="stepLocator"/>
</bean>

<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
    output-channel="outbound-staging"/>

<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
    channel="outbound-staging"/>

<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
    channel="inbound-staging"/>

<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
    output-channel="outbound-replies"/>

<int:channel id="outbound-replies">
  <int:queue/>
</int:channel>

<bean id="stepLocator"
    class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />

还要确保分区handler属性 maps 到partitionHandler bean:

<job id="personJob">
  <step id="step1.master">
    <partition partitioner="partitioner" handler="partitionHandler"/>
    ...
  </step>
</job>
Updated at: 9 months ago
12.10. 测试Table of contentA. ItemReaders 和 ItemWriters 的列表