On this page
151. Google Cloud Pub/Sub
Spring Cloud GCP 提供了一个抽象层,用于发布到 Google Cloud Pub/Sub 主题和从中订阅,以及创建,列出或删除 Google Cloud Pub/Sub 主题和订阅。
提供了一个 Spring Boot 启动器来自动配置各种必需的 Pub/Sub 组件。
使用 Spring Cloud GCP BOM 进行 Maven 坐标:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
</dependency>
Gradle coordinates:
dependencies {
compile group: 'org.springframework.cloud', name: 'spring-cloud-gcp-starter-pubsub'
}
也可以从Spring Initializr到GCP Messaging
条目使用此启动器。
151.1 发布/订阅操作和模板
PubSubOperations
是一种抽象,它允许 Spring 用户使用 Google Cloud Pub/Sub,而无需依赖任何 Google Cloud Pub/Sub API 语义。它提供了与 Google Cloud Pub/Sub 交互所需的一组通用操作。 PubSubTemplate
是PubSubOperations
的默认实现,它使用适用于发布/订阅的 Google Cloud JavaClient 端与 Google Cloud Pub/Sub 进行交互。
PubSubTemplate
取决于PublisherFactory
和SubscriberFactory
。 PublisherFactory
为发布/订阅Publisher
提供了 Google Cloud JavaClient 端。 SubscriberFactory
提供Subscriber
用于异步消息提取,以及SubscriberStub
用于同步消息提取。用于 GCP Pub/Sub 的 Spring Boot 启动器使用默认设置自动配置PublisherFactory
和SubscriberFactory
,并使用 Spring Boot GCP 启动器自动配置的GcpProjectIdProvider
和CredentialsProvider
。
Spring Cloud GCP 发布/订阅DefaultPublisherFactory
提供的PublisherFactory
实现按主题名称缓存Publisher
实例,以优化资源利用率。
PubSubOperations
接口实际上是PubSubPublisherOperations
和PubSubSubscriberOperations
以及相应的PubSubPublisherTemplate
和PubSubSubscriberTemplate
实现的组合,可以单独使用或通过复合PubSubTemplate
使用。本文档的其余部分引用PubSubTemplate
,但是PubSubPublisherTemplate
和PubSubSubscriberTemplate
同样适用,这取决于我们是在谈论发布还是订阅。
151.1.1 发布到主题
PubSubTemplate
提供了异步方法来将消息发布到 Google Cloud Pub/Sub 主题。 publish()
方法采用主题名称以将消息发布到通用类型的有效负载,以及(可选)带有消息头的 Map。
以下是如何将消息发布到 Google Cloud Pub/Sub 主题的示例:
public void publishMessage() {
this.pubSubTemplate.publish("topic", "your message payload", ImmutableMap.of("key1", "val1"));
}
默认情况下,SimplePubSubMessageConverter
用于将byte[]
,ByteString
,ByteBuffer
和String
类型的有效载荷转换为 Pub/Sub 消息。
JSON support
要使用 Jackson JSON 对 POJO 进行序列化和反序列化,请配置一个JacksonPubSubMessageConverter
bean,GCP Pub/Sub 的 Spring Boot 启动程序将自动将其连接到PubSubTemplate
。
// Note: The ObjectMapper is used to convert Java POJOs to and from JSON.
// You will have to configure your own instance if you are unable to depend
// on the ObjectMapper provided by Spring Boot starters.
@Bean
public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) {
return new JacksonPubSubMessageConverter(objectMapper);
}
或者,您可以通过调用PubSubTemplate
上的setMessageConverter()
方法直接设置它。 PubSubMessageConverter
的其他实现也可以相同的方式配置。
请参考我们的发布/订阅 JSON 有效负载示例应用作为使用此功能的参考。
151.1.2 订阅
Google Cloud Pub/Sub 允许将许多订阅关联到同一主题。 PubSubTemplate
允许您通过subscribe()
方法收听订阅。它依赖于SubscriberFactory
对象,该对象的唯一任务是生成 Google Cloud Pub/Sub Subscriber
对象。收听订阅时,将以一定间隔异步地从 Google Cloud Pub/Sub 中提取消息。
适用于 Google Cloud Pub/Sub 的 Spring Boot 启动器会自动配置SubscriberFactory
。
如果需要进行发布/订阅邮件有效负载转换,则可以使用subscribeAndConvert()
方法,该方法将使用模板中配置的转换器。
151.1.3 从订阅中提取消息
Google Cloud Pub/Sub 支持从订阅中同步提取消息。这与订阅是不同的,从某种意义上说,订阅是一个异步任务,它以设置的时间间隔轮询订阅。
pullNext()
方法允许从订阅中提取一条消息并自动对其进行确认。 pull()
方法可从订阅中提取大量消息,从而允许配置重试设置。 pull()
收到的任何消息都不会自动确认。相反,由于它们属于AcknowledgeablePubsubMessage
类型,因此您可以通过调用ack()
方法来对其进行确认,或者通过调用nack()
方法来对其进行否定确认。 pullAndAck()
方法的作用与pull()
方法相同,并且还确认所有接收到的消息。
pullAndConvert()
方法与pull()
方法相同,此外,使用模板中配置的转换器将 Pub/Sub 二进制有效负载转换为所需类型的对象。
要一次确认从pull()
或pullAndConvert()
收到的多条消息,可以使用PubSubTemplate.ack()
方法。您也可以使用PubSubTemplate.nack()
否定地确认消息。
使用这些方法来批量确认消息比单独确认消息更有效,但是它们“需要”来自同一项目的消息集合。
消息以及PubSubSubscriberTemplate
上的所有ack()
,nack()
和modifyAckDeadline()
方法都是异步实现的,返回ListenableFuture<Void>
即可处理异步执行。
PubSubTemplate
使用其SubscriberFactory
生成的特殊订阅者来同步提取消息。
151.2 发布/订阅 Management
PubSubAdmin
是 Spring Cloud GCP 提供的用于 ManagementGoogle Cloud Pub/Sub 资源的抽象。它允许创建,删除和列出主题和订阅。
PubSubAdmin
取决于GcpProjectIdProvider
以及CredentialsProvider
或TopicAdminClient
和SubscriptionAdminClient
。如果给出CredentialsProvider
,它将使用 Google Cloud Java 库的 Pub/Sub 默认设置创建TopicAdminClient
和SubscriptionAdminClient
。用于 GCP 发布/订阅的 Spring Boot 启动器使用 Spring Boot GCP Core 启动器自动配置的GcpProjectIdProvider
和CredentialsProvider
自动配置PubSubAdmin
对象。
151.2.1 创建主题
PubSubAdmin
实现了一种创建主题的方法:
public Topic createTopic(String topicName)
这是有关如何创建 Google Cloud Pub/Sub 主题的示例:
public void newTopic() {
pubSubAdmin.createTopic("topicName");
}
151.2.2 删除主题
PubSubAdmin
实现了一种删除主题的方法:
public void deleteTopic(String topicName)
这是有关如何删除 Google Cloud Pub/Sub 主题的示例:
public void deleteTopic() {
pubSubAdmin.deleteTopic("topicName");
}
151.2.3 列出主题
PubSubAdmin
实现了一种列出主题的方法:
public List<Topic> listTopics
这是一个如何列出项目中每个 Google Cloud Pub/Sub 主题名称的示例:
public List<String> listTopics() {
return pubSubAdmin
.listTopics()
.stream()
.map(Topic::getNameAsTopicName)
.map(TopicName::getTopic)
.collect(Collectors.toList());
}
151.2.4 创建订阅
PubSubAdmin
实现了一种创建对现有主题的订阅的方法:
public Subscription createSubscription(String subscriptionName, String topicName, Integer ackDeadline, String pushEndpoint)
以下是有关如何创建 Google Cloud Pub/Sub 订阅的示例:
public void newSubscription() {
pubSubAdmin.createSubscription("subscriptionName", "topicName", 10, "http://my.endpoint/push");
}
提供了具有默认设置的替代方法,以方便使用。 ackDeadline
的默认值为 10 秒。如果未指定pushEndpoint
,则订阅将使用消息提取。
public Subscription createSubscription(String subscriptionName, String topicName)
public Subscription createSubscription(String subscriptionName, String topicName, Integer ackDeadline)
public Subscription createSubscription(String subscriptionName, String topicName, String pushEndpoint)
151.2.5 删除订阅
PubSubAdmin
实现了一种删除订阅的方法:
public void deleteSubscription(String subscriptionName)
以下是有关如何删除 Google Cloud Pub/Sub 订阅的示例:
public void deleteSubscription() {
pubSubAdmin.deleteSubscription("subscriptionName");
}
151.2.6 列出订阅
PubSubAdmin
实现了一种列出订阅的方法:
public List<Subscription> listSubscriptions()
这是一个如何列出项目中每个订阅名称的示例:
public List<String> listSubscriptions() {
return pubSubAdmin
.listSubscriptions()
.stream()
.map(Subscription::getNameAsSubscriptionName)
.map(SubscriptionName::getSubscription)
.collect(Collectors.toList());
}
151.3 Configuration
Google Cloud Pub/Sub 的 Spring Boot Starter程序提供以下配置选项:
Name | Description | Required | Default value |
spring.cloud.gcp.pubsub.enabled |
启用或禁用发布/订阅自动配置 | No | true |
spring.cloud.gcp.pubsub.subscriber.executor-threads |
SubscriberFactory 创建的Subscriber 实例使用的线程数 |
No | 4 |
spring.cloud.gcp.pubsub.publisher.executor-threads |
PublisherFactory 创建的Publisher 实例使用的线程数 |
No | 4 |
spring.cloud.gcp.pubsub.project-id |
托管 Google Cloud Pub/Sub API 的 GCP 项目 ID(如果与Spring Cloud GCP 核心模块中的 ID 不同) | No | |
spring.cloud.gcp.pubsub.credentials.location |
用于与 Google Cloud Pub/Sub API 进行身份验证的 OAuth2 凭据(如果与Spring Cloud GCP 核心模块中的不同) | No | |
spring.cloud.gcp.pubsub.credentials.encoded-key |
OAuth2 帐户私钥的 Base64 编码内容,用于与 Google Cloud Pub/Sub API 进行身份验证(如果与Spring Cloud GCP 核心模块中的内容不同) | No | |
spring.cloud.gcp.pubsub.credentials.scopes |
OAuth2 scope用于 Spring Cloud GCP 发布/订阅凭据 | No | https://www.googleapis.com/auth/pubsub |
spring.cloud.gcp.pubsub.subscriber.parallel-pull-count |
拉 Worker 数 | No | 可用处理器数量 |
spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period |
消息确认截止期限的最长时间(以秒为单位) | No | 0 |
spring.cloud.gcp.pubsub.subscriber.pull-endpoint |
同步拉取消息的端点 | No | pubsub.googleapis.com:443 |
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.total-timeout-seconds |
TotalTimeout 具有最终控制权,该逻辑应将尝试 continue 远程调用直到完全放弃的时间。总超时时间越高,可以尝试的重试次数越多。 | No | 0 |
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.initial-retry-delay-second |
InitialRetryDelay 控制第一次重试之前的延迟。随后的重试将使用根据 RetryDelayMultiplier 调整的该值。 | No | 0 |
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.retry-delay-multiplier |
RetryDelayMultiplier 控制重试延迟的更改。将前一个调用的重试延迟与 RetryDelayMultiplier 相乘,以计算下一个调用的重试延迟。 | No | 1 |
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.max-retry-delay-seconds |
MaxRetryDelay 设置了重试延迟的值的限制,以便 RetryDelayMultiplier 不能将重试延迟增加到高于此数量的值。 | No | 0 |
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.max-attempts |
MaxAttempts 定义执行的最大尝试次数。如果此值大于 0,并且尝试次数达到此限制,则即使总重试时间仍小于 TotalTimeout,逻辑也会放弃重试。 | No | 0 |
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.jittered |
抖动确定是否应将延迟时间随机化。 | No | true |
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.initial-rpc-timeout-seconds |
InitialRpcTimeout 控制初始 RPC 的超时。后续调用将使用根据 RpcTimeoutMultiplier 调整的该值。 | No | 0 |
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.rpc-timeout-multiplier |
RpcTimeoutMultiplier 控制 RPC 超时的更改。上一个调用的超时时间乘以 RpcTimeoutMultiplier 来计算下一个调用的超时时间。 | No | 1 |
spring.cloud.gcp.pubsub.[subscriber,publisher].retry.max-rpc-timeout-seconds |
MaxRpcTimeout 对 RPC 超时值设置了限制,因此 RpcTimeoutMultiplier 不能将 RPC 超时增加到高于此值。 | No | 0 |
spring.cloud.gcp.pubsub.[subscriber,publisher.batching].flow-control.max-outstanding-element-count |
在执行流控制之前要保留在内存中的未完成元素的最大数量。 | No | unlimited |
spring.cloud.gcp.pubsub.[subscriber,publisher.batching].flow-control.max-outstanding-request-bytes |
强制执行流控制之前要保留在内存中的最大未完成字节数。 | No | unlimited |
spring.cloud.gcp.pubsub.[subscriber,publisher.batching].flow-control.limit-exceeded-behavior |
超过指定限制时的行为。 | No | Block |
spring.cloud.gcp.pubsub.publisher.batching.element-count-threshold |
用于批处理的元素计数阈值。 | No | 未设置(阈值不适用) |
spring.cloud.gcp.pubsub.publisher.batching.request-byte-threshold |
用于批处理的请求字节阈值。 | No | 未设置(阈值不适用) |
spring.cloud.gcp.pubsub.publisher.batching.delay-threshold-seconds |
用于批处理的延迟阈值。经过这段时间后(从添加的第一个元素开始计数),这些元素将被分批包装并发送。 | No | 未设置(阈值不适用) |
spring.cloud.gcp.pubsub.publisher.batching.enabled |
Enables batching. | No | false |