154. Spring Integration

Spring Cloud GCP 提供了 Spring Integration 适配器,使您的应用程序可以使用由 Google Cloud Platform 服务备份的企业集成模式。

154.1 用于 Cloud Pub/Sub 的通道适配器

Google Cloud Pub/Sub 的通道适配器将您的 Spring MessageChannels连接到 Google Cloud Pub/Sub 主题和订阅。这样可以在由 Google Cloud Pub/Sub 备份的不同流程,应用程序或微服务之间进行消息传递。

spring-cloud-gcp-pubsub模块中包含用于 Google Cloud Pub/Sub 的 Spring Integration Channel Adapters。

使用 Spring Cloud GCP BOM 进行 Maven 坐标:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-gcp-pubsub</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
</dependency>

Gradle coordinates:

dependencies {
    compile group: 'org.springframework.cloud', name: 'spring-cloud-gcp-pubsub'
    compile group: 'org.springframework.integration', name: 'spring-integration-core'
}

154.1.1 入站通道适配器

PubSubInboundChannelAdapter是 GCP 发布/订阅的入站通道适配器,它侦听 GCP 发布/订阅的新消息。它将新消息转换为内部 Spring Message,然后将其发送到绑定的输出通道。

Google Pub/Sub 将消息有效负载视为字节数组。因此,默认情况下,入站通道适配器将使用byte[]作为有效负载来构造 Spring Message。但是,可以通过设置PubSubInboundChannelAdapterpayloadType属性来更改所需的有效负载类型。 PubSubInboundChannelAdapter将转换为所需的有效负载类型的委托给PubSubTemplate中配置的PubSubMessageConverter

要使用入站通道适配器,必须在用户应用程序端提供并配置PubSubInboundChannelAdapter

@Bean
public MessageChannel pubsubInputChannel() {
    return new PublishSubscribeChannel();
}

@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(
    @Qualifier("pubsubInputChannel") MessageChannel inputChannel,
    SubscriberFactory subscriberFactory) {
    PubSubInboundChannelAdapter adapter =
        new PubSubInboundChannelAdapter(subscriberFactory, "subscriptionName");
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.MANUAL);

    return adapter;
}

在示例中,我们首先指定适配器将向其写入传入消息的MessageChannelMessageChannel实现在这里并不重要。根据您的用例,您可能希望使用MessageChannel而不是PublishSubscribeChannel

然后,我们声明一个PubSubInboundChannelAdapter bean。它需要我们刚刚创建的通道和一个SubscriberFactory,后者从用于发布/订阅的 Google Cloud JavaClient 端创建Subscriber对象。用于 GCP Pub/Sub 的 Spring Boot 启动器提供了已配置的SubscriberFactory

PubSubInboundChannelAdapter支持三种确认模式,其中AckMode.AUTO为默认值。

自动确认(AckMode.AUTO)

如果适配器将消息发送到通道,并且未引发任何异常,则消息将被 GCP 发布/订阅确认。如果在处理邮件时抛出RuntimeException,则该邮件将被否定。

自动确认 OK(AckMode.AUTO_ACK)

如果适配器将消息发送到通道,并且未引发任何异常,则消息将被 GCP 发布/订阅确认。如果在处理消息时抛出RuntimeException,则消息既不会被确认也不会被拒绝。

当使用订阅的确认截止时间超时作为重试传递回退机制时,此功能很有用。

手动确认(AckMode.MANUAL)

适配器将BasicAcknowledgeablePubsubMessage对象附加到MessageHeaders。用户可以使用GcpPubSubHeaders.ORIGINAL_MESSAGE键提取BasicAcknowledgeablePubsubMessage并将其用于(n)确认消息。

@Bean
@ServiceActivator(inputChannel = "pubsubInputChannel")
public MessageHandler messageReceiver() {
    return message -> {
        LOGGER.info("Message arrived! Payload: " + new String((byte[]) message.getPayload()));
        BasicAcknowledgeablePubsubMessage originalMessage =
              message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
        originalMessage.ack();
    };
}

154.1.2 出站通道适配器

PubSubMessageHandler是 GCP 发布/订阅的出站通道适配器,它在 Spring MessageChannel上侦听新消息。它使用PubSubTemplate将其发布到 GCP 发布/订阅主题。

要构造消息的 Pub/Sub 表示,出站通道适配器需要将 Spring Message有效负载转换为 Pub/Sub 期望的字节数组表示。它将此转换委派给PubSubTemplate。要自定义转换,可以在PubSubTemplate中指定PubSubMessageConverter,该Object应当将Object有效负载和 Spring Message的 Headers 转换为PubsubMessage

要使用出站通道适配器,必须在用户应用程序端提供并配置PubSubMessageHandler bean。

@Bean
@ServiceActivator(inputChannel = "pubsubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
    return new PubSubMessageHandler(pubsubTemplate, "topicName");
}

提供的PubSubTemplate包含将消息发布到 GCP 发布/订阅主题的所有必要配置。

PubSubMessageHandler默认情况下异步发布消息。可以将发布超时配置为同步发布。如果未提供任何内容,则适配器将无限期 await 响应。

可以通过setPublishFutureCallback()方法为PubSubMessageHandler中的publish()调用设置用户定义的回调。如果成功,这些对于处理消息 ID 很有用,如果抛出错误,则对于处理消息 ID 很有用。

要覆盖默认目标,可以使用GcpPubSubHeaders.DESTINATIONHeaders。

@Autowired
private MessageChannel pubsubOutputChannel;

public void handleMessage(Message<?> msg) throws MessagingException {
    final Message<?> message = MessageBuilder
        .withPayload(msg.getPayload())
        .setHeader(GcpPubSubHeaders.TOPIC, "customTopic").build();
    pubsubOutputChannel.send(message);
}

也可以使用setTopicExpression()setTopicExpressionString()方法为主题设置 SpEL 表达式。

154.1.3HeadersMap

这些通道适配器包含 HeadersMap 器,使您可以将 Headers 从 SpringMap 或过滤出到 Google Cloud Pub/Sub 消息,反之亦然。默认情况下,入站通道适配器将 Google Cloud Pub/Sub 消息上的每个 HeadersMap 到适配器产生的 Spring 消息。出站通道适配器将 Spring 消息中的每个 HeadersMap 到 Google Cloud Pub/Sub 的 Headers(由 Spring 添加的 Headers 除外),例如带有键"id""timestamp""gcp_pubsub_acknowledgement"的 Headers。在此过程中,出站 Map 器还将 Headers 的值转换为字符串。

每个适配器都声明一个setHeaderMapper()方法,以使您可以进一步自定义要从 SpringMap 到 Google Cloud Pub/Sub 的标题,反之亦然。

例如,要过滤出头"foo""bar"以及所有以前缀“ prefix_”开头的头,可以将setHeaderMapper()与此模块提供的PubSubHeaderMapper实现一起使用。

PubSubMessageHandler adapter = ...
...
PubSubHeaderMapper headerMapper = new PubSubHeaderMapper();
headerMapper.setOutboundHeaderPatterns("!foo", "!bar", "!prefix_*", "*");
adapter.setHeaderMapper(headerMapper);

Note

PubSubHeaderMapper.setOutboundHeaderPatterns()PubSubHeaderMapper.setInboundHeaderPatterns()中声明模式的 Sequences 很重要。第一种模式优先于以下模式。

在前面的示例中,"*"模式表示每个 Headers 都已 Map。但是,由于它在列表先前的模式优先中排在最后。

154.2 Sample

Available examples:

154.3 Google 云端存储的通道适配器

Google Cloud Storage 的通道适配器允许您通过MessageChannels读写文件到 Google Cloud Storage。

Spring Cloud GCP 提供了两个入站适配器GcsInboundFileSynchronizingMessageSourceGcsStreamingMessageSource,以及一个出站适配器GcsMessageHandler

spring-cloud-gcp-storage模块中包含用于 Google Cloud Storage 的 Spring Integration Channel 适配器。

要使用 Spring Integration for Spring Cloud GCP 的“存储”部分,还必须提供spring-integration-file依赖项,因为它不会传递。

使用 Spring Cloud GCP BOM 进行 Maven 坐标:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-gcp-storage</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>
</dependency>

Gradle coordinates:

dependencies {
    compile group: 'org.springframework.cloud', name: 'spring-cloud-gcp-starter-storage'
    compile group: 'org.springframework.integration', name: 'spring-integration-file'
}

154.3.1 入站通道适配器

Google Cloud Storage 入站通道适配器会轮询 Google Cloud Storage 存储桶以查找新文件,并将每个文件以Message有效负载的形式发送到@InboundChannelAdapter注解中指定的MessageChannel。这些文件临时存储在本地文件系统的文件夹中。

这是有关如何配置 Google Cloud Storage 入站通道适配器的示例。

@Bean
@InboundChannelAdapter(channel = "new-file-channel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> synchronizerAdapter(Storage gcs) {
  GcsInboundFileSynchronizer synchronizer = new GcsInboundFileSynchronizer(gcs);
  synchronizer.setRemoteDirectory("your-gcs-bucket");

  GcsInboundFileSynchronizingMessageSource synchAdapter =
          new GcsInboundFileSynchronizingMessageSource(synchronizer);
  synchAdapter.setLocalDirectory(new File("local-directory"));

  return synchAdapter;
}

154.3.2 入站流媒体通道适配器

入站流传输通道适配器与普通的入站通道适配器相似,不同之处在于它不需要将文件存储在文件系统中。

这是有关如何配置 Google Cloud Storage 入站流媒体通道适配器的示例。

@Bean
@InboundChannelAdapter(channel = "streaming-channel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<InputStream> streamingAdapter(Storage gcs) {
  GcsStreamingMessageSource adapter =
          new GcsStreamingMessageSource(new GcsRemoteFileTemplate(new GcsSessionFactory(gcs)));
  adapter.setRemoteDirectory("your-gcs-bucket");
  return adapter;
}

154.3.3 出站通道适配器

出站通道适配器允许将文件写入 Google Cloud Storage。当它收到包含类型File的有效负载的Message时,会将文件写入适配器中指定的 Google Cloud Storage 存储桶。

这是有关如何配置 Google Cloud Storage 出站通道适配器的示例。

@Bean
@ServiceActivator(inputChannel = "writeFiles")
public MessageHandler outboundChannelAdapter(Storage gcs) {
  GcsMessageHandler outboundChannelAdapter = new GcsMessageHandler(new GcsSessionFactory(gcs));
  outboundChannelAdapter.setRemoteDirectoryExpression(new ValueExpression<>("your-gcs-bucket"));

  return outboundChannelAdapter;
}

154.4 Sample

sample application可用。