151. Google Cloud Pub/Sub

Spring Cloud GCP 提供了一个抽象层,用于发布到 Google Cloud Pub/Sub 主题和从中订阅,以及创建,列出或删除 Google Cloud Pub/Sub 主题和订阅。

提供了一个 Spring Boot 启动器来自动配置各种必需的 Pub/Sub 组件。

使用 Spring Cloud GCP BOM 进行 Maven 坐标:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
</dependency>

Gradle coordinates:

dependencies {
    compile group: 'org.springframework.cloud', name: 'spring-cloud-gcp-starter-pubsub'
}

也可以从Spring InitializrGCP Messaging条目使用此启动器。

151.1 发布/订阅操作和模板

PubSubOperations是一种抽象,它允许 Spring 用户使用 Google Cloud Pub/Sub,而无需依赖任何 Google Cloud Pub/Sub API 语义。它提供了与 Google Cloud Pub/Sub 交互所需的一组通用操作。 PubSubTemplatePubSubOperations的默认实现,它使用适用于发布/订阅的 Google Cloud JavaClient 端与 Google Cloud Pub/Sub 进行交互。

PubSubTemplate取决于PublisherFactorySubscriberFactoryPublisherFactory为发布/订阅Publisher提供了 Google Cloud JavaClient 端。 SubscriberFactory提供Subscriber用于异步消息提取,以及SubscriberStub用于同步消息提取。用于 GCP Pub/Sub 的 Spring Boot 启动器使用默认设置自动配置PublisherFactorySubscriberFactory,并使用 Spring Boot GCP 启动器自动配置的GcpProjectIdProviderCredentialsProvider

Spring Cloud GCP 发布/订阅DefaultPublisherFactory提供的PublisherFactory实现按主题名称缓存Publisher实例,以优化资源利用率。

PubSubOperations接口实际上是PubSubPublisherOperationsPubSubSubscriberOperations以及相应的PubSubPublisherTemplatePubSubSubscriberTemplate实现的组合,可以单独使用或通过复合PubSubTemplate使用。本文档的其余部分引用PubSubTemplate,但是PubSubPublisherTemplatePubSubSubscriberTemplate同样适用,这取决于我们是在谈论发布还是订阅。

151.1.1 发布到主题

PubSubTemplate提供了异步方法来将消息发布到 Google Cloud Pub/Sub 主题。 publish()方法采用主题名称以将消息发布到通用类型的有效负载,以及(可选)带有消息头的 Map。

以下是如何将消息发布到 Google Cloud Pub/Sub 主题的示例:

public void publishMessage() {
    this.pubSubTemplate.publish("topic", "your message payload", ImmutableMap.of("key1", "val1"));
}

默认情况下,SimplePubSubMessageConverter用于将byte[]ByteStringByteBufferString类型的有效载荷转换为 Pub/Sub 消息。

JSON support

要使用 Jackson JSON 对 POJO 进行序列化和反序列化,请配置一个JacksonPubSubMessageConverter bean,GCP Pub/Sub 的 Spring Boot 启动程序将自动将其连接到PubSubTemplate

// Note: The ObjectMapper is used to convert Java POJOs to and from JSON.
// You will have to configure your own instance if you are unable to depend
// on the ObjectMapper provided by Spring Boot starters.
@Bean
public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
    return new JacksonPubSubMessageConverter(objectMapper);
}

或者,您可以通过调用PubSubTemplate上的setMessageConverter()方法直接设置它。 PubSubMessageConverter的其他实现也可以相同的方式配置。

请参考我们的发布/订阅 JSON 有效负载示例应用作为使用此功能的参考。

151.1.2 订阅

Google Cloud Pub/Sub 允许将许多订阅关联到同一主题。 PubSubTemplate允许您通过subscribe()方法收听订阅。它依赖于SubscriberFactory对象,该对象的唯一任务是生成 Google Cloud Pub/Sub Subscriber对象。收听订阅时,将以一定间隔异步地从 Google Cloud Pub/Sub 中提取消息。

适用于 Google Cloud Pub/Sub 的 Spring Boot 启动器会自动配置SubscriberFactory

如果需要进行发布/订阅邮件有效负载转换,则可以使用subscribeAndConvert()方法,该方法将使用模板中配置的转换器。

151.1.3 从订阅中提取消息

Google Cloud Pub/Sub 支持从订阅中同步提取消息。这与订阅是不同的,从某种意义上说,订阅是一个异步任务,它以设置的时间间隔轮询订阅。

pullNext()方法允许从订阅中提取一条消息并自动对其进行确认。 pull()方法可从订阅中提取大量消息,从而允许配置重试设置。 pull()收到的任何消息都不会自动确认。相反,由于它们属于AcknowledgeablePubsubMessage类型,因此您可以通过调用ack()方法来对其进行确认,或者通过调用nack()方法来对其进行否定确认。 pullAndAck()方法的作用与pull()方法相同,并且还确认所有接收到的消息。

pullAndConvert()方法与pull()方法相同,此外,使用模板中配置的转换器将 Pub/Sub 二进制有效负载转换为所需类型的对象。

要一次确认从pull()pullAndConvert()收到的多条消息,可以使用PubSubTemplate.ack()方法。您也可以使用PubSubTemplate.nack()否定地确认消息。

使用这些方法来批量确认消息比单独确认消息更有效,但是它们“需要”来自同一项目的消息集合。

消息以及PubSubSubscriberTemplate上的所有ack()nack()modifyAckDeadline()方法都是异步实现的,返回ListenableFuture<Void>即可处理异步执行。

PubSubTemplate使用其SubscriberFactory生成的特殊订阅者来同步提取消息。

151.2 发布/订阅 Management

PubSubAdmin是 Spring Cloud GCP 提供的用于 ManagementGoogle Cloud Pub/Sub 资源的抽象。它允许创建,删除和列出主题和订阅。

PubSubAdmin取决于GcpProjectIdProvider以及CredentialsProviderTopicAdminClientSubscriptionAdminClient。如果给出CredentialsProvider,它将使用 Google Cloud Java 库的 Pub/Sub 默认设置创建TopicAdminClientSubscriptionAdminClient。用于 GCP 发布/订阅的 Spring Boot 启动器使用 Spring Boot GCP Core 启动器自动配置的GcpProjectIdProviderCredentialsProvider自动配置PubSubAdmin对象。

151.2.1 创建主题

PubSubAdmin实现了一种创建主题的方法:

public Topic createTopic(String topicName)

这是有关如何创建 Google Cloud Pub/Sub 主题的示例:

public void newTopic() {
    pubSubAdmin.createTopic("topicName");
}

151.2.2 删除主题

PubSubAdmin实现了一种删除主题的方法:

public void deleteTopic(String topicName)

这是有关如何删除 Google Cloud Pub/Sub 主题的示例:

public void deleteTopic() {
    pubSubAdmin.deleteTopic("topicName");
}

151.2.3 列出主题

PubSubAdmin实现了一种列出主题的方法:

public List<Topic> listTopics

这是一个如何列出项目中每个 Google Cloud Pub/Sub 主题名称的示例:

public List<String> listTopics() {
    return pubSubAdmin
        .listTopics()
        .stream()
        .map(Topic::getNameAsTopicName)
        .map(TopicName::getTopic)
        .collect(Collectors.toList());
}

151.2.4 创建订阅

PubSubAdmin实现了一种创建对现有主题的订阅的方法:

public Subscription createSubscription(String subscriptionName, String topicName, Integer ackDeadline, String pushEndpoint)

以下是有关如何创建 Google Cloud Pub/Sub 订阅的示例:

public void newSubscription() {
    pubSubAdmin.createSubscription("subscriptionName", "topicName", 10, "http://my.endpoint/push");
}

提供了具有默认设置的替代方法,以方便使用。 ackDeadline的默认值为 10 秒。如果未指定pushEndpoint,则订阅将使用消息提取。

public Subscription createSubscription(String subscriptionName, String topicName)
public Subscription createSubscription(String subscriptionName, String topicName, Integer ackDeadline)
public Subscription createSubscription(String subscriptionName, String topicName, String pushEndpoint)

151.2.5 删除订阅

PubSubAdmin实现了一种删除订阅的方法:

public void deleteSubscription(String subscriptionName)

以下是有关如何删除 Google Cloud Pub/Sub 订阅的示例:

public void deleteSubscription() {
    pubSubAdmin.deleteSubscription("subscriptionName");
}

151.2.6 列出订阅

PubSubAdmin实现了一种列出订阅的方法:

public List<Subscription> listSubscriptions()

这是一个如何列出项目中每个订阅名称的示例:

public List<String> listSubscriptions() {
    return pubSubAdmin
        .listSubscriptions()
        .stream()
        .map(Subscription::getNameAsSubscriptionName)
        .map(SubscriptionName::getSubscription)
        .collect(Collectors.toList());
}

151.3 Configuration

Google Cloud Pub/Sub 的 Spring Boot Starter程序提供以下配置选项:

NameDescriptionRequiredDefault value
spring.cloud.gcp.pubsub.enabled启用或禁用发布/订阅自动配置Notrue
spring.cloud.gcp.pubsub.subscriber.executor-threadsSubscriberFactory创建的Subscriber实例使用的线程数No4
spring.cloud.gcp.pubsub.publisher.executor-threadsPublisherFactory创建的Publisher实例使用的线程数No4
spring.cloud.gcp.pubsub.project-id托管 Google Cloud Pub/Sub API 的 GCP 项目 ID(如果与Spring Cloud GCP 核心模块中的 ID 不同)No
spring.cloud.gcp.pubsub.credentials.location用于与 Google Cloud Pub/Sub API 进行身份验证的 OAuth2 凭据(如果与Spring Cloud GCP 核心模块中的不同)No
spring.cloud.gcp.pubsub.credentials.encoded-keyOAuth2 帐户私钥的 Base64 编码内容,用于与 Google Cloud Pub/Sub API 进行身份验证(如果与Spring Cloud GCP 核心模块中的内容不同)No
spring.cloud.gcp.pubsub.credentials.scopesOAuth2 scope用于 Spring Cloud GCP 发布/订阅凭据Nohttps://www.googleapis.com/auth/pubsub
spring.cloud.gcp.pubsub.subscriber.parallel-pull-count拉 Worker 数No可用处理器数量
spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period消息确认截止期限的最长时间(以秒为单位)No0
spring.cloud.gcp.pubsub.subscriber.pull-endpoint同步拉取消息的端点Nopubsub.googleapis.com:443
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.total-timeout-secondsTotalTimeout 具有最终控制权,该逻辑应将尝试 continue 远程调用直到完全放弃的时间。总超时时间越高,可以尝试的重试次数越多。No0
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.initial-retry-delay-secondInitialRetryDelay 控制第一次重试之前的延迟。随后的重试将使用根据 RetryDelayMultiplier 调整的该值。No0
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.retry-delay-multiplierRetryDelayMultiplier 控制重试延迟的更改。将前一个调用的重试延迟与 RetryDelayMultiplier 相乘,以计算下一个调用的重试延迟。No1
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.max-retry-delay-secondsMaxRetryDelay 设置了重试延迟的值的限制,以便 RetryDelayMultiplier 不能将重试延迟增加到高于此数量的值。No0
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.max-attemptsMaxAttempts 定义执行的最大尝试次数。如果此值大于 0,并且尝试次数达到此限制,则即使总重试时间仍小于 TotalTimeout,逻辑也会放弃重试。No0
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.jittered抖动确定是否应将延迟时间随机化。Notrue
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.initial-rpc-timeout-secondsInitialRpcTimeout 控制初始 RPC 的超时。后续调用将使用根据 RpcTimeoutMultiplier 调整的该值。No0
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.rpc-timeout-multiplierRpcTimeoutMultiplier 控制 RPC 超时的更改。上一个调用的超时时间乘以 RpcTimeoutMultiplier 来计算下一个调用的超时时间。No1
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.max-rpc-timeout-secondsMaxRpcTimeout 对 RPC 超时值设置了限制,因此 RpcTimeoutMultiplier 不能将 RPC 超时增加到高于此值。No0
spring.cloud.gcp.pubsub.[subscriber,publisher.batching].flow-control.max-outstanding-element-count在执行流控制之前要保留在内存中的未完成元素的最大数量。Nounlimited
spring.cloud.gcp.pubsub.[subscriber,publisher.batching].flow-control.max-outstanding-request-bytes强制执行流控制之前要保留在内存中的最大未完成字节数。Nounlimited
spring.cloud.gcp.pubsub.[subscriber,publisher.batching].flow-control.limit-exceeded-behavior超过指定限制时的行为。NoBlock
spring.cloud.gcp.pubsub.publisher.batching.element-count-threshold用于批处理的元素计数阈值。No未设置(阈值不适用)
spring.cloud.gcp.pubsub.publisher.batching.request-byte-threshold用于批处理的请求字节阈值。No未设置(阈值不适用)
spring.cloud.gcp.pubsub.publisher.batching.delay-threshold-seconds用于批处理的延迟阈值。经过这段时间后(从添加的第一个元素开始计数),这些元素将被分批包装并发送。No未设置(阈值不适用)
spring.cloud.gcp.pubsub.publisher.batching.enabledEnables batching.Nofalse

151.4 Sample

sample application可用。