On this page
151. Google Cloud Pub/Sub
Spring Cloud GCP 提供了一个抽象层,用于发布到 Google Cloud Pub/Sub 主题和从中订阅,以及创建,列出或删除 Google Cloud Pub/Sub 主题和订阅。
提供了一个 Spring Boot 启动器来自动配置各种必需的 Pub/Sub 组件。
使用 Spring Cloud GCP BOM 进行 Maven 坐标:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
</dependency>
Gradle coordinates:
dependencies {
compile group: 'org.springframework.cloud', name: 'spring-cloud-gcp-starter-pubsub'
}
也可以从Spring Initializr到GCP Messaging条目使用此启动器。
151.1 发布/订阅操作和模板
PubSubOperations是一种抽象,它允许 Spring 用户使用 Google Cloud Pub/Sub,而无需依赖任何 Google Cloud Pub/Sub API 语义。它提供了与 Google Cloud Pub/Sub 交互所需的一组通用操作。 PubSubTemplate是PubSubOperations的默认实现,它使用适用于发布/订阅的 Google Cloud JavaClient 端与 Google Cloud Pub/Sub 进行交互。
PubSubTemplate取决于PublisherFactory和SubscriberFactory。 PublisherFactory为发布/订阅Publisher提供了 Google Cloud JavaClient 端。 SubscriberFactory提供Subscriber用于异步消息提取,以及SubscriberStub用于同步消息提取。用于 GCP Pub/Sub 的 Spring Boot 启动器使用默认设置自动配置PublisherFactory和SubscriberFactory,并使用 Spring Boot GCP 启动器自动配置的GcpProjectIdProvider和CredentialsProvider。
Spring Cloud GCP 发布/订阅DefaultPublisherFactory提供的PublisherFactory实现按主题名称缓存Publisher实例,以优化资源利用率。
PubSubOperations接口实际上是PubSubPublisherOperations和PubSubSubscriberOperations以及相应的PubSubPublisherTemplate和PubSubSubscriberTemplate实现的组合,可以单独使用或通过复合PubSubTemplate使用。本文档的其余部分引用PubSubTemplate,但是PubSubPublisherTemplate和PubSubSubscriberTemplate同样适用,这取决于我们是在谈论发布还是订阅。
151.1.1 发布到主题
PubSubTemplate提供了异步方法来将消息发布到 Google Cloud Pub/Sub 主题。 publish()方法采用主题名称以将消息发布到通用类型的有效负载,以及(可选)带有消息头的 Map。
以下是如何将消息发布到 Google Cloud Pub/Sub 主题的示例:
public void publishMessage() {
this.pubSubTemplate.publish("topic", "your message payload", ImmutableMap.of("key1", "val1"));
}
默认情况下,SimplePubSubMessageConverter用于将byte[],ByteString,ByteBuffer和String类型的有效载荷转换为 Pub/Sub 消息。
JSON support
要使用 Jackson JSON 对 POJO 进行序列化和反序列化,请配置一个JacksonPubSubMessageConverter bean,GCP Pub/Sub 的 Spring Boot 启动程序将自动将其连接到PubSubTemplate。
// Note: The ObjectMapper is used to convert Java POJOs to and from JSON.
// You will have to configure your own instance if you are unable to depend
// on the ObjectMapper provided by Spring Boot starters.
@Bean
public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
return new JacksonPubSubMessageConverter(objectMapper);
}
或者,您可以通过调用PubSubTemplate上的setMessageConverter()方法直接设置它。 PubSubMessageConverter的其他实现也可以相同的方式配置。
请参考我们的发布/订阅 JSON 有效负载示例应用作为使用此功能的参考。
151.1.2 订阅
Google Cloud Pub/Sub 允许将许多订阅关联到同一主题。 PubSubTemplate允许您通过subscribe()方法收听订阅。它依赖于SubscriberFactory对象,该对象的唯一任务是生成 Google Cloud Pub/Sub Subscriber对象。收听订阅时,将以一定间隔异步地从 Google Cloud Pub/Sub 中提取消息。
适用于 Google Cloud Pub/Sub 的 Spring Boot 启动器会自动配置SubscriberFactory。
如果需要进行发布/订阅邮件有效负载转换,则可以使用subscribeAndConvert()方法,该方法将使用模板中配置的转换器。
151.1.3 从订阅中提取消息
Google Cloud Pub/Sub 支持从订阅中同步提取消息。这与订阅是不同的,从某种意义上说,订阅是一个异步任务,它以设置的时间间隔轮询订阅。
pullNext()方法允许从订阅中提取一条消息并自动对其进行确认。 pull()方法可从订阅中提取大量消息,从而允许配置重试设置。 pull()收到的任何消息都不会自动确认。相反,由于它们属于AcknowledgeablePubsubMessage类型,因此您可以通过调用ack()方法来对其进行确认,或者通过调用nack()方法来对其进行否定确认。 pullAndAck()方法的作用与pull()方法相同,并且还确认所有接收到的消息。
pullAndConvert()方法与pull()方法相同,此外,使用模板中配置的转换器将 Pub/Sub 二进制有效负载转换为所需类型的对象。
要一次确认从pull()或pullAndConvert()收到的多条消息,可以使用PubSubTemplate.ack()方法。您也可以使用PubSubTemplate.nack()否定地确认消息。
使用这些方法来批量确认消息比单独确认消息更有效,但是它们“需要”来自同一项目的消息集合。
消息以及PubSubSubscriberTemplate上的所有ack(),nack()和modifyAckDeadline()方法都是异步实现的,返回ListenableFuture<Void>即可处理异步执行。
PubSubTemplate使用其SubscriberFactory生成的特殊订阅者来同步提取消息。
151.2 发布/订阅 Management
PubSubAdmin是 Spring Cloud GCP 提供的用于 ManagementGoogle Cloud Pub/Sub 资源的抽象。它允许创建,删除和列出主题和订阅。
PubSubAdmin取决于GcpProjectIdProvider以及CredentialsProvider或TopicAdminClient和SubscriptionAdminClient。如果给出CredentialsProvider,它将使用 Google Cloud Java 库的 Pub/Sub 默认设置创建TopicAdminClient和SubscriptionAdminClient。用于 GCP 发布/订阅的 Spring Boot 启动器使用 Spring Boot GCP Core 启动器自动配置的GcpProjectIdProvider和CredentialsProvider自动配置PubSubAdmin对象。
151.2.1 创建主题
PubSubAdmin实现了一种创建主题的方法:
public Topic createTopic(String topicName)
这是有关如何创建 Google Cloud Pub/Sub 主题的示例:
public void newTopic() {
pubSubAdmin.createTopic("topicName");
}
151.2.2 删除主题
PubSubAdmin实现了一种删除主题的方法:
public void deleteTopic(String topicName)
这是有关如何删除 Google Cloud Pub/Sub 主题的示例:
public void deleteTopic() {
pubSubAdmin.deleteTopic("topicName");
}
151.2.3 列出主题
PubSubAdmin实现了一种列出主题的方法:
public List<Topic> listTopics
这是一个如何列出项目中每个 Google Cloud Pub/Sub 主题名称的示例:
public List<String> listTopics() {
return pubSubAdmin
.listTopics()
.stream()
.map(Topic::getNameAsTopicName)
.map(TopicName::getTopic)
.collect(Collectors.toList());
}
151.2.4 创建订阅
PubSubAdmin实现了一种创建对现有主题的订阅的方法:
public Subscription createSubscription(String subscriptionName, String topicName, Integer ackDeadline, String pushEndpoint)
以下是有关如何创建 Google Cloud Pub/Sub 订阅的示例:
public void newSubscription() {
pubSubAdmin.createSubscription("subscriptionName", "topicName", 10, "http://my.endpoint/push");
}
提供了具有默认设置的替代方法,以方便使用。 ackDeadline的默认值为 10 秒。如果未指定pushEndpoint,则订阅将使用消息提取。
public Subscription createSubscription(String subscriptionName, String topicName)
public Subscription createSubscription(String subscriptionName, String topicName, Integer ackDeadline)
public Subscription createSubscription(String subscriptionName, String topicName, String pushEndpoint)
151.2.5 删除订阅
PubSubAdmin实现了一种删除订阅的方法:
public void deleteSubscription(String subscriptionName)
以下是有关如何删除 Google Cloud Pub/Sub 订阅的示例:
public void deleteSubscription() {
pubSubAdmin.deleteSubscription("subscriptionName");
}
151.2.6 列出订阅
PubSubAdmin实现了一种列出订阅的方法:
public List<Subscription> listSubscriptions()
这是一个如何列出项目中每个订阅名称的示例:
public List<String> listSubscriptions() {
return pubSubAdmin
.listSubscriptions()
.stream()
.map(Subscription::getNameAsSubscriptionName)
.map(SubscriptionName::getSubscription)
.collect(Collectors.toList());
}
151.3 Configuration
Google Cloud Pub/Sub 的 Spring Boot Starter程序提供以下配置选项:
| Name | Description | Required | Default value |
spring.cloud.gcp.pubsub.enabled |
启用或禁用发布/订阅自动配置 | No | true |
spring.cloud.gcp.pubsub.subscriber.executor-threads |
SubscriberFactory创建的Subscriber实例使用的线程数 |
No | 4 |
spring.cloud.gcp.pubsub.publisher.executor-threads |
PublisherFactory创建的Publisher实例使用的线程数 |
No | 4 |
spring.cloud.gcp.pubsub.project-id |
托管 Google Cloud Pub/Sub API 的 GCP 项目 ID(如果与Spring Cloud GCP 核心模块中的 ID 不同) | No | |
spring.cloud.gcp.pubsub.credentials.location |
用于与 Google Cloud Pub/Sub API 进行身份验证的 OAuth2 凭据(如果与Spring Cloud GCP 核心模块中的不同) | No | |
spring.cloud.gcp.pubsub.credentials.encoded-key |
OAuth2 帐户私钥的 Base64 编码内容,用于与 Google Cloud Pub/Sub API 进行身份验证(如果与Spring Cloud GCP 核心模块中的内容不同) | No | |
spring.cloud.gcp.pubsub.credentials.scopes |
OAuth2 scope用于 Spring Cloud GCP 发布/订阅凭据 | No | https://www.googleapis.com/auth/pubsub |
spring.cloud.gcp.pubsub.subscriber.parallel-pull-count |
拉 Worker 数 | No | 可用处理器数量 |
spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period |
消息确认截止期限的最长时间(以秒为单位) | No | 0 |
spring.cloud.gcp.pubsub.subscriber.pull-endpoint |
同步拉取消息的端点 | No | pubsub.googleapis.com:443 |
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.total-timeout-seconds |
TotalTimeout 具有最终控制权,该逻辑应将尝试 continue 远程调用直到完全放弃的时间。总超时时间越高,可以尝试的重试次数越多。 | No | 0 |
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.initial-retry-delay-second |
InitialRetryDelay 控制第一次重试之前的延迟。随后的重试将使用根据 RetryDelayMultiplier 调整的该值。 | No | 0 |
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.retry-delay-multiplier |
RetryDelayMultiplier 控制重试延迟的更改。将前一个调用的重试延迟与 RetryDelayMultiplier 相乘,以计算下一个调用的重试延迟。 | No | 1 |
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.max-retry-delay-seconds |
MaxRetryDelay 设置了重试延迟的值的限制,以便 RetryDelayMultiplier 不能将重试延迟增加到高于此数量的值。 | No | 0 |
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.max-attempts |
MaxAttempts 定义执行的最大尝试次数。如果此值大于 0,并且尝试次数达到此限制,则即使总重试时间仍小于 TotalTimeout,逻辑也会放弃重试。 | No | 0 |
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.jittered |
抖动确定是否应将延迟时间随机化。 | No | true |
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.initial-rpc-timeout-seconds |
InitialRpcTimeout 控制初始 RPC 的超时。后续调用将使用根据 RpcTimeoutMultiplier 调整的该值。 | No | 0 |
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.rpc-timeout-multiplier |
RpcTimeoutMultiplier 控制 RPC 超时的更改。上一个调用的超时时间乘以 RpcTimeoutMultiplier 来计算下一个调用的超时时间。 | No | 1 |
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.max-rpc-timeout-seconds |
MaxRpcTimeout 对 RPC 超时值设置了限制,因此 RpcTimeoutMultiplier 不能将 RPC 超时增加到高于此值。 | No | 0 |
spring.cloud.gcp.pubsub.[subscriber,publisher.batching].flow-control.max-outstanding-element-count |
在执行流控制之前要保留在内存中的未完成元素的最大数量。 | No | unlimited |
spring.cloud.gcp.pubsub.[subscriber,publisher.batching].flow-control.max-outstanding-request-bytes |
强制执行流控制之前要保留在内存中的最大未完成字节数。 | No | unlimited |
spring.cloud.gcp.pubsub.[subscriber,publisher.batching].flow-control.limit-exceeded-behavior |
超过指定限制时的行为。 | No | Block |
spring.cloud.gcp.pubsub.publisher.batching.element-count-threshold |
用于批处理的元素计数阈值。 | No | 未设置(阈值不适用) |
spring.cloud.gcp.pubsub.publisher.batching.request-byte-threshold |
用于批处理的请求字节阈值。 | No | 未设置(阈值不适用) |
spring.cloud.gcp.pubsub.publisher.batching.delay-threshold-seconds |
用于批处理的延迟阈值。经过这段时间后(从添加的第一个元素开始计数),这些元素将被分批包装并发送。 | No | 未设置(阈值不适用) |
spring.cloud.gcp.pubsub.publisher.batching.enabled |
Enables batching. | No | false |