13. Spring 批处理集成

13.1. Spring Batch 集成介绍

Spring Batch 的许多用户可能会遇到超出 Spring Batch 范围的要求,但可以使用 Spring Integration 来高效,简洁地实现。相反,Spring Batch 用户可能会遇到 Spring Batch 要求,并需要一种有效地集成两个框架的方法。在这种情况下,出现了几种模式和用例,Spring Batch Integration 将解决这些需求。

Spring Batch 和 Spring Integration 之间的界线并不总是很清楚,但是有些准则可以遵循。原则上,这些是:考虑粒度,并应用通用模式。这些常见模式中的一些在本参考手册部分中进行了描述。

将消息添加到批处理过程中,可以实现自动化操作,还可以对关键问题进行分离和制定策略。例如,一条消息可能触发作业执行,然后可以通过多种方式公开消息的发送。或者,当作业完成或失败时,可能会触发消息发送,而这些消息的使用者可能会遇到与应用程序本身无关的操作问题。消息传递也可以嵌入到作业中,例如读取或写入要通过通道进行处理的 Item。远程分区和远程分块提供了在多个工作人员上分配工作负载的方法。

我们将介绍的一些关键概念是:

13.1.1. 命名空间支持

从 Spring Batch Integration 1.3 开始,添加了专用的 XML 命名空间支持,目的是提供更轻松的配置体验。为了激活名称空间,请将以下名称空间声明添加到您的 Spring XML Application Context 文件中:

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
  xsi:schemaLocation="
    http://www.springframework.org/schema/batch-integration
    http://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd">

    ...

</beans>

完整配置的用于 Spring Batch Integration 的 Spring XML Application Context 文件可能如下所示:

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:batch="http://www.springframework.org/schema/batch"
  xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
  xsi:schemaLocation="
    http://www.springframework.org/schema/batch-integration
    http://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd
    http://www.springframework.org/schema/batch
    http://www.springframework.org/schema/batch/spring-batch.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration.xsd">

    ...

</beans>

也允许将版本号附加到引用的 XSD 文件中,但是由于无版本声明将始终使用最新的架构,因此我们通常不建议将版本号附加到 XSD 名称中。例如,添加版本号可能会在更新 Spring Batch Integration 依赖项时产生问题,因为它们可能需要 XML 模式的最新版本。

13.1.2. 通过消息启动批处理作业

使用核心 Spring Batch API 启动批处理作业时,您基本上有 2 个选择:

  • 通过CommandLineJobRunner的命令行

  • 通过JobOperator.start()JobLauncher.run()编程。

例如,当您使用 Shell 脚本调用批处理作业时,可能要使用CommandLineJobRunner。或者,您可以直接使用JobOperator,例如,在将 Spring Batch 用作 Web 应用程序的一部分时。但是,如何处理更复杂的用例呢?也许您需要轮询远程(S)FTP 服务器以检索批处理作业的数据。或者您的应用程序必须同时支持多个不同的数据源。例如,您不仅可以通过 Web 接收数据文件,还可以通过 FTP 接收数据文件。也许在调用 Spring Batch 之前需要对 Importing 文件进行其他转换。

因此,使用 Spring Integration 及其众多适配器来执行批处理作业将更加强大。例如,您可以使用文件入站通道适配器监视文件系统中的目录,并在 Importing 文件到达后立即启动批处理作业。另外,您可以创建使用多个不同适配器的 Spring Integration 流,仅使用配置即可轻松地同时从多个源中获取批处理作业的数据。使用 Spring Integration 轻松实现所有这些场景,因为它允许事件驱动的JobLauncher解耦执行。

Spring Batch Integration 提供了JobLaunchingMessageHandler类,可用于启动批处理作业。 JobLaunchingMessageHandler的 Importing 由 Spring Integration 消息提供,其有效载荷类型为JobLaunchRequest。该类是需要启动的 Job 的包装,以及启动 Batch 作业所需的JobParameters

下图说明了用于启动批处理作业的典型 Spring Integration 消息流。 EIP(企业集成模式)网站提供了消息图标及其描述的完整概述。

将文件转换为 JobLaunchRequest

package io.spring.sbi;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

import java.io.File;

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

JobExecution 响应

执行批处理作业时,将返回JobExecution实例。该实例可用于确定执行状态。如果能够成功创建JobExecution,则无论实际执行是否成功,都将始终返回它。

如何返回JobExecution实例的确切行为取决于所提供的TaskExecutor。如果使用synchronous(单线程)TaskExecutor实现,则仅在after作业完成时返回JobExecution响应。使用asynchronous TaskExecutor时,会立即返回JobExecution实例。然后,用户可以使用JobExecution实例(JobExecution.getJobId())的id并使用JobExplorerJobRepository查询作业的更新状态。有关更多信息,请参阅查询存储库上的Spring Batch参考文档。

以下配置将在提供的目录中创建文件inbound-channel-adapter来侦听 CSV 文件,将其交给我们的转换器(FileMessageToJobRequest),通过* Job Launching Gateway *启动作业,然后只需通过logging-channel-adapter记录JobExecution的输出。

Spring Batch 集成配置

<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>

<int-file:inbound-channel-adapter id="filePoller"
    channel="inboundFileChannel"
    directory="file:/tmp/myfiles/"
    filename-pattern="*.csv">
  <int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>

<int:transformer input-channel="inboundFileChannel"
    output-channel="outboundJobRequestChannel">
  <bean class="io.spring.sbi.FileMessageToJobRequest">
    <property name="job" ref="personJob"/>
    <property name="fileParameterName" value="input.file.name"/>
  </bean>
</int:transformer>

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel"/>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

现在我们正在轮询文件并启动作业,我们需要配置例如 Spring Batch ItemReader以利用由作业参数“ input.file.name”表示的找到的文件:

ItemReader 配置示例

<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
    scope="step">
  <property name="resource" value="file://#{jobParameters['input.file.name']}"/>
    ...
</bean>

这里的主要兴趣点是将#{jobParameters['input.file.name']}的值注入为 Resource 属性值,并将 ItemReader bean 设置为* Step scope *以利用后期绑定支持的优势,该支持允许访问jobParameters变量。

Job-Launching 网关的可用属性
  • id标识基础 Spring bean 定义,该定义是以下之一的实例:

  • EventDrivenConsumer

    • PollingConsumer

确切的实现取决于组件的 Importing 通道是否为:

  • SubscribableChannel

    • PollableChannel
  • auto-startup布尔值标志,指示端点应在启动时自动启动。默认值为* true *。

  • request-channel此端点的 ImportingMessageChannel

  • 生成的JobExecution有效负载将发送到reply-channel Message Channel

  • reply-timeout允许您指定此网关在引发异常之前 awaitawait 成功将回复消息发送到回复通道的时间。仅当通道可能阻塞时(例如,当使用当前已满的有界队列通道时),此属性才适用。另外,请记住,发送到DirectChannel时,调用将在发送者的线程中进行。因此,发送操作的失败可能是由更下游的其他组件引起的。 reply-timeout属性 Map 到基础MessagingTemplate实例的sendTimeout属性。如果未指定,则该属性默认为* -1 *,这意味着默认情况下,网关将无限期 await。该值以毫秒为单位指定。

  • job-launcher传递自定义JobLauncher bean 引用。此属性是可选的。如果未指定,适配器将重新使用在 ID jobLauncher下注册的实例。如果不存在默认实例,则将引发异常。

  • order指定将此端点作为订户连接到SubscribableChannel时的调用 Sequences。

Sub-Elements

当此网关从PollableChannel接收消息时,您必须提供全局默认的 Poller 或为Job Launching Gateway提供 Poller 子元素:

<batch-int:job-launching-gateway request-channel="queueChannel"
    reply-channel="replyChannel" job-launcher="jobLauncher">
  <int:poller fixed-rate="1000"/>
</batch-int:job-launching-gateway>

13.1.3. 提供信息性消息的反馈

由于 Spring Batch 作业可以长期运行,因此提供进度信息至关重要。例如,如果批处理作业的某些或全部部分失败,则可能希望通知利益相关者。 Spring Batch 支持通过以下方式收集此信息:

  • 主动轮询或

  • 事件驱动,使用侦听器。

异步启动 Spring Batch 作业时,例如通过使用Job Launching Gateway,将返回JobExecution实例。因此,通过使用JobExplorerJobRepository检索JobExecution的更新实例,可以使用JobExecution.getJobId()连续轮询状态更新。但是,这被认为不是最佳选择,因此应首选事件驱动的方法。

因此,Spring Batch 提供了以下监听器:

  • StepListener

  • ChunkListener

  • JobExecutionListener

在以下示例中,使用StepExecutionListener配置了 Spring Batch 作业。因此,Spring Integration 将接收并处理步骤之前/之后的任何事件。例如,可以使用Router检查接收到的StepExecution。根据检查的结果,可能会发生各种事情,例如将邮件路由到邮件出站通道适配器,以便可以根据某种条件发送电子邮件通知。

以下是如何配置侦听器以针对StepExecution事件向Gateway发送消息并将其输出记录到logging-channel-adapter的示例:

首先创建通知集成 bean:

<int:channel id="stepExecutionsChannel"/>

<int:gateway id="notificationExecutionsListener"
    service-interface="org.springframework.batch.core.StepExecutionListener"
    default-request-channel="stepExecutionsChannel"/>

<int:logging-channel-adapter channel="stepExecutionsChannel"/>

然后修改您的工作以添加步骤级别的侦听器:

<job id="importPayments">
    <step id="step1">
        <tasklet ../>
            <chunk ../>
            <listeners>
                <listener ref="notificationExecutionsListener"/>
            </listeners>
        </tasklet>
        ...
    </step>
</job>

13.1.4. 异步处理器

异步处理器可帮助您扩展 Item 的处理。在异步处理器用例中,AsyncItemProcessor充当调度程序,对新线程上的 Item 执行ItemProcessor的逻辑。处理器完成后,Future将传递到AsynchItemWriter进行写入。

因此,您可以使用异步 Item 处理来提高性能,基本上可以实现* fork-join *方案。 AsyncItemWriter将收集结果并在所有结果可用后立即写回该块。

AsyncItemProcessorAsyncItemWriter的配置都很简单,首先是AsyncItemProcessor

<bean id="processor"
    class="org.springframework.batch.integration.async.AsyncItemProcessor">
  <property name="delegate">
    <bean class="your.ItemProcessor"/>
  </property>
  <property name="taskExecutor">
    <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
  </property>
</bean>

属性“ delegate”实际上是对ItemProcessor bean 的引用,而“ taskExecutor”属性是对所选TaskExecutor的引用。

然后,我们配置AsyncItemWriter

<bean id="itemWriter"
    class="org.springframework.batch.integration.async.AsyncItemWriter">
  <property name="delegate">
    <bean id="itemWriter" class="your.ItemWriter"/>
  </property>
</bean>

同样,属性“ delegate”实际上是对ItemWriter bean 的引用。

13.1.5. 外化批处理执行

到目前为止讨论的集成方法建议用例,其中 Spring Integration 像 Shell 一样包装 Spring Batch。但是,Spring Batch 也可以在内部使用 Spring Integration。使用这种方法,Spring Batch 用户可以将 Item 甚至块的处理委派给外部流程。这使您可以卸载复杂的处理。 Spring Batch Integration 为以下方面提供了专门的支持:

  • Remote Chunking

  • Remote Partitioning

Remote Chunking

更进一步,人们还可以使用 Spring Batch Integration 提供的ChunkMessageChannelItemWriter来外部化块处理,它将发送 Item 并收集结果。发送后,Spring Batch 将 continue 读取和分组 Item,而无需 await 结果。而是由ChunkMessageChannelItemWriter负责收集结果并将其重新集成到 Spring Batch 流程中。

使用 Spring Integration,您可以完全控制进程的并发性,例如使用QueueChannel而不是DirectChannel。此外,通过依赖 Spring Integration 丰富的通道适配器集合(例如 JMS 或 AMQP),您可以将 Batch 作业的块分发到外部系统进行处理。

一个具有要远程分块的步骤的简单作业,其配置将类似于以下内容:

<job id="personJob">
  <step id="step1">
    <tasklet>
      <chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
    </tasklet>
    ...
  </step>
</job>

ItemReader 参考将指向您要用于在主服务器上读取数据的 bean。如上所述,ItemWriter 引用指向特殊的 ItemWriter“ ChunkMessageChannelItemWriter”。处理器(如果有)保留在从属配置上,而不再是主控配置。以下配置提供了基本的主设置。建议在实现用例时检查所有其他组件属性,例如节流阀限制等。

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int-jms:outbound-channel-adapter id="requests" destination-name="requests"/>

<bean id="messagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate">
  <property name="defaultChannel" ref="requests"/>
  <property name="receiveTimeout" value="2000"/>
</bean>

<bean id="itemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step">
  <property name="messagingOperations" ref="messagingTemplate"/>
  <property name="replyChannel" ref="replies"/>
</bean>

<bean id="chunkHandler"
    class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean">
  <property name="chunkWriter" ref="itemWriter"/>
  <property name="step" ref="step1"/>
</bean>

<int:channel id="replies">
  <int:queue/>
</int:channel>

<int-jms:message-driven-channel-adapter id="jmsReplies"
    destination-name="replies"
    channel="replies"/>

这种配置为我们提供了许多 bean。我们使用 Spring Integration 提供的 ActiveMQ 和入站/出站 JMS 适配器配置消息传递中间件。如图所示,我们的工作步骤引用的itemWriter bean 利用ChunkMessageChannelItemWriter在已配置的中间件上写入块。

现在让我们 continue 进行从属配置:

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int:channel id="requests"/>
<int:channel id="replies"/>

<int-jms:message-driven-channel-adapter id="jmsIn"
    destination-name="requests"
    channel="requests"/>

<int-jms:outbound-channel-adapter id="outgoingReplies"
    destination-name="replies"
    channel="replies">
</int-jms:outbound-channel-adapter>

<int:service-activator id="serviceActivator"
    input-channel="requests"
    output-channel="replies"
    ref="chunkProcessorChunkHandler"
    method="handleChunk"/>

<bean id="chunkProcessorChunkHandler"
    class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
  <property name="chunkProcessor">
    <bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
      <property name="itemWriter">
        <bean class="io.spring.sbi.PersonItemWriter"/>
      </property>
      <property name="itemProcessor">
        <bean class="io.spring.sbi.PersonItemProcessor"/>
      </property>
    </bean>
  </property>
</bean>

这些配置项中的大多数应该从主配置中 Watch 起来很熟悉。从站不需要访问诸如 Spring Batch JobRepository之类的东西,也不需要访问实际的作业配置文件。感兴趣的主要 bean 是“ chunkProcessorChunkHandler”。 ChunkProcessorChunkHandlerchunkProcessor属性采用已配置的SimpleChunkProcessor,在该位置您将提供对ItemWriter以及可选的ItemProcessor的引用,该引用将在从属服务器接收到来自主服务器的块时在从属服务器上运行。

有关更多信息,请参阅 Spring Batch 手册,特别是关于Remote Chunking的章节。

Remote Partitioning

另一方面,当问题不是 Item 处理,而是相关的 I/O 成为瓶颈时,远程分区很有用。使用远程分区,可以将工作分配给执行完整 Spring Batch 步骤的从属服务器。因此,每个从站都有自己的ItemReaderItemProcessorItemWriter。为此,Spring Batch Integration 提供了MessageChannelPartitionHandler

PartitionHandler接口的此实现使用MessageChannel实例向远程工作程序发送指令并接收其响应。这提供了用于与远程工作人员通信的传输方式(例如 JMS 或 AMQP)的良好抽象。

参考手册第Remote Partitioning部分概述了配置远程分区所需的概念和组件,并显示了使用默认TaskExecutorPartitionHandler在单独的本地执行线程中进行分区的示例。为了对多个 JVM 进行远程分区,需要两个附加组件:

  • 远程处理结构或网格环境

  • 支持所需的远程结构或网格环境的 PartitionHandler 实现

与远程分块类似,JMS 可以用作“远程结构”,并且如上所述要使用的 PartitionHandler 实现是MessageChannelPartitionHandler。下面显示的示例假设现有分区作业,并着重于MessageChannelPartitionHandler和 JMS 配置:

<bean id="partitionHandler"
   class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
  <property name="stepName" value="step1"/>
  <property name="gridSize" value="3"/>
  <property name="replyChannel" ref="outbound-replies"/>
  <property name="messagingOperations">
    <bean class="org.springframework.integration.core.MessagingTemplate">
      <property name="defaultChannel" ref="outbound-requests"/>
      <property name="receiveTimeout" value="100000"/>
    </bean>
  </property>
</bean>

<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
    channel="outbound-requests"/>

<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
    channel="inbound-requests"/>

<bean id="stepExecutionRequestHandler"
    class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
  <property name="jobExplorer" ref="jobExplorer"/>
  <property name="stepLocator" ref="stepLocator"/>
</bean>

<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
    output-channel="outbound-staging"/>

<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
    channel="outbound-staging"/>

<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
    channel="inbound-staging"/>

<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
    output-channel="outbound-replies"/>

<int:channel id="outbound-replies">
  <int:queue/>
</int:channel>

<bean id="stepLocator"
    class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />

还要确保分区handler属性 Map 到partitionHandler bean:

<job id="personJob">
  <step id="step1.master">
    <partition partitioner="partitioner" handler="partitionHandler"/>
    ...
  </step>
</job>