31. Schema Evolution 支持
Spring Cloud Stream 为 schema evolution 提供支持,以便数据可以在 time 内进化,并且仍然适用于较旧或较新的生产者和消费者,反之亦然。大多数序列化模型,特别是那些旨在跨不同平台和语言实现可移植性的模型,依赖于描述如何在二进制有效负载中序列化数据的 schema。在序列化数据然后解释数据的过程中,发送方和接收方都必须能够访问描述二进制格式的 schema。在某些情况下,schema 可以从序列化的有效负载类型或反序列化的目标类型推断出来。但是,许多 applications 受益于访问描述二进制数据格式的显式 schema。 schema 注册表允许您以文本格式(通常是 JSON)存储 schema 信息,并使该信息可供需要它以二进制格式接收和发送数据的各种应用程序访问。 schema 可作为元组引用,包括:
-
作为 schema 的逻辑 name 的主题
-
schema version
-
schema 格式,描述数据的二进制格式
以下部分将详细介绍 schema evolution process 中涉及的各个组件。
31.1 Schema Registry Client
用于与 schema 注册服务器交互的 client-side 抽象是SchemaRegistryClient
接口,它具有以下结构:
public interface SchemaRegistryClient {
SchemaRegistrationResponse register(String subject, String format, String schema);
String fetch(SchemaReference schemaReference);
String fetch(Integer id);
}
Spring Cloud Stream 提供 out-of-the-box implementations,用于与自己的 schema 服务器进行交互,以及与 Confluent Schema Registry 进行交互。
可以使用@EnableSchemaRegistryClient
配置 Spring Cloud Stream schema 注册表的 client,如下所示:
@EnableBinding(Sink.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public static class AvroSinkApplication {
...
}
默认转换器经过优化,不仅可以缓存来自 remote 服务器的模式,还可以缓存非常昂贵的
parse()
和toString()
方法。因此,它使用不缓存响应的DefaultSchemaRegistryClient
。如果您打算更改默认行为,可以直接在 code 上使用 client 并将其覆盖为所需的结果。为此,您必须将 propertyspring.cloud.stream.schemaRegistryClient.cached=true
添加到 application properties 中。
31.1.1 Schema Registry Client Properties
Schema Registry Client 支持以下 properties:
-
spring.cloud.stream.schemaRegistryClient.endpoint
- schema-server 的位置。设置此项时,请使用完整的 URL,包括协议(
http
或https
), port 和 context 路径。
- schema-server 的位置。设置此项时,请使用完整的 URL,包括协议(
-
默认
http://localhost:8990/
-
spring.cloud.stream.schemaRegistryClient.cached
- client 是否应该缓存 schema 服务器响应。通常设置为
false
,因为缓存发生在消息转换器中。 Clients 使用 schema 注册表 client 应将此设置为true
。
- client 是否应该缓存 schema 服务器响应。通常设置为
-
默认
true
31.2 Avro Schema Registry Client 消息转换器
对于具有使用 application context 注册的 SchemaRegistryClient bean 的 applications,Spring Cloud Stream 自动为 schema management 配置 Apache Avro 消息转换器。这样可以简化 schema 进化,因为接收消息的应用程序可以轻松访问 writer schema,可以与自己的 reader schema 协调。
对于出站消息,如果 channel 的 content type 设置为application/*+avro
,则激活MessageConverter
,如下面的示例所示:
spring.cloud.stream.bindings.output.contentType=application/*+avro
在出站转换期间,消息转换器尝试推断每个出站消息的 schema(基于其类型),并使用SchemaRegistryClient
将其注册到主题(基于有效内容类型)。如果已找到相同的 schema,则检索对它的 reference。如果没有,则注册 schema,并提供新的 version number。使用以下 scheme:application/[prefix].[subject].v[version]+avro
发送带有contentType
标头的消息,其中prefix
是可配置的,subject
是从有效负载类型推导出来的。
对于 example,类型User
的消息可能作为二进制有效负载发送,
接收消息时,转换器会从传入消息的标头中推断 schema reference 并尝试检索它。 schema 在反序列化 process 中用作 writer schema。
31.2.1 Avro Schema 注册表消息转换器 Properties
如果通过设置spring.cloud.stream.bindings.output.contentType=application/*+avro
启用了基于 Avro 的 schema 注册表 client,则可以通过设置以下 properties 来自定义注册行为。
-
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
- 如果希望转换器使用反射从 POJO 推断 Schema,则启用。
默认值:false
-
spring.cloud.stream.schema.avro.readerSchema
- Avro 通过查看 writer schema(原始有效负载)和 reader schema(您的 application 有效负载)来比较 schema 版本。有关详细信息,请参阅Avro 文档。如果设置,则覆盖 schema 服务器上的任何查找,并使用本地 schema 作为 reader schema。默认值:
null
- Avro 通过查看 writer schema(原始有效负载)和 reader schema(您的 application 有效负载)来比较 schema 版本。有关详细信息,请参阅Avro 文档。如果设置,则覆盖 schema 服务器上的任何查找,并使用本地 schema 作为 reader schema。默认值:
-
spring.cloud.stream.schema.avro.schemaLocations
- 使用 Schema Server 注册此 property 中列出的所有
.avsc
files。
- 使用 Schema Server 注册此 property 中列出的所有
默认值:empty
-
spring.cloud.stream.schema.avro.prefix
- 要在 Content-Type 标头上使用的前缀。
默认值:vnd
31.3 Apache Avro 消息转换器
Spring Cloud Stream 通过spring-cloud-stream-schema
模块为 schema-based 消息转换器提供支持。目前,支持 schema-based 消息转换器的唯一序列化格式是 Apache Avro,在未来版本中将添加更多格式。
spring-cloud-stream-schema
模块包含两种类型的消息转换器,可用于 Apache Avro 序列化:
-
转换器使用序列化或反序列化 objects 的 class 信息或 schema,其位置在启动时已知。
-
使用 schema 注册表的转换器。他们在运行时定位模式,并在域 objects 进化时动态注册新模式。
具有 Schema 支持的 31.4 转换器
AvroSchemaMessageConverter
支持通过使用预定义的 schema 或使用 class 中可用的 schema 信息(反射或包含在SpecificRecord
中)来序列化和反序列化消息。如果您提供自定义转换器,则不会创建默认的 AvroSchemaMessageConverter bean。以下 example 显示了一个自定义转换器:
要使用自定义转换器,只需将其添加到 application context,可以选择指定一个或多个与之关联的MimeTypes
。默认MimeType
是application/avro
。
如果转换的目标类型是GenericRecord
,则必须设置 schema。
以下 example 显示了如何通过在没有预定义 schema 的情况下注册 Apache Avro MessageConverter
来在 sink application 中配置转换器。在此 example 中,请注意 mime 类型 value 是avro/bytes
,而不是默认application/avro
。
@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {
...
@Bean
public MessageConverter userMessageConverter() {
return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
}
}
相反,以下 application 注册一个带有预定义 schema 的转换器(在 classpath 上找到):
@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {
...
@Bean
public MessageConverter userMessageConverter() {
AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
return converter;
}
}
31.5 Schema Registry Server
Spring Cloud Stream 提供 schema 注册表服务器 implementation。要使用它,可以将spring-cloud-stream-schema-server
artifact 添加到项目中并使用@EnableSchemaRegistryServer
annotation,它将 schema 注册表服务器 REST 控制器添加到您的 application。此 annotation 旨在与 Spring Boot web applications 一起使用,并且服务器的监听 port 由server.port
property 控制。 spring.cloud.stream.schema.server.path
property 可用于控制 schema 服务器的根路径(特别是当它嵌入其他 applications 时)。 spring.cloud.stream.schema.server.allowSchemaDeletion
boolean property 可以删除 schema。默认情况下,禁用此功能。
schema 注册表服务器使用关系数据库来存储模式。默认情况下,它使用嵌入式数据库。您可以使用Spring Boot SQL 数据库和 JDBC configuration 选项自定义 schema 存储。
以下 example 显示了启用 schema 注册表的 Spring Boot application:
@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServerApplication {
public static void main(String[] args) {
SpringApplication.run(SchemaRegistryServerApplication.class, args);
}
}
31.5.1 Schema Registry Server API
Schema Registry Server API 包含以下操作:
-
POST /
- 见“名为“注册新 Schema”的部分” -
'GET/{}/{}/{}' - 见““按主题,格式和 Version 检索现有 Schema”一节”
-
GET /{subject}/{format}
- 见““按主题和格式检索现有 Schema”一节” -
GET /schemas/{id}
- 见““按 ID 检索现有 Schema”一节” -
DELETE /{subject}/{format}/{version}
- 见““按主题,格式和 Version 删除 Schema”一节” -
DELETE /schemas/{id}
- 见““按 ID 删除 Schema”一节” -
DELETE /{subject}
- 见““按主题删除 Schema”一节”
注册新的 Schema
要注册新的 schema,请向/
端点发送POST
请求。
/
接受带有以下字段的 JSON 有效内容:
-
subject
:schema 主题 -
format
:schema 格式 -
definition
:schema 定义
它的响应是 JSON 中的 schema object,其中包含以下字段:
-
id
:schema ID -
subject
:schema 主题 -
format
:schema 格式 -
version
:schema version -
definition
:schema 定义
按主题,格式和 Version 检索现有 Schema
要按主题,格式和 version 检索现有 schema,请将GET
请求发送到/{subject}/{format}/{version}
端点。
它的响应是 JSON 中的 schema object,其中包含以下字段:
-
id
:schema ID -
subject
:schema 主题 -
format
:schema 格式 -
version
:schema version -
definition
:schema 定义
按主题和格式检索现有 Schema
要按主题和格式检索现有 schema,请将GET
请求发送到/subject/format
端点。
它的响应是 JSON 中每个 schema object 的模式列表,包含以下字段:
-
id
:schema ID -
subject
:schema 主题 -
format
:schema 格式 -
version
:schema version -
definition
:schema 定义
按 ID 检索现有 Schema
要通过其 ID 检索 schema,请将GET
请求发送到/schemas/{id}
端点。
它的响应是 JSON 中的 schema object,其中包含以下字段:
-
id
:schema ID -
subject
:schema 主题 -
format
:schema 格式 -
version
:schema version -
definition
:schema 定义
按主题,格式和 Version 删除 Schema
要删除由其 subject,format 和 version 标识的 schema,请向/{subject}/{format}/{version}
端点发送DELETE
请求。
按 ID 删除 Schema
要按其 ID 删除 schema,请向/schemas/{id}
端点发送DELETE
请求。
按主题删除 Schema
DELETE /{subject}
按主题删除现有架构。
本说明仅适用于 Spring Cloud Stream 1.1.0.RELEASE 的用户。 Spring Cloud Stream 1.1.0.RELEASE 使用 table name,
schema
来存储Schema
objects。Schema
是许多数据库 implementations 中的关键字。为了避免将来出现任何冲突,从 1.1.1.RELEASE 开始,我们选择了存储 table 的 nameSCHEMA_REPOSITORY
。任何升级的 Spring Cloud Stream 1.1.0.RELEASE 用户都应该在升级之前将其现有架构迁移到新的 table。
31.5.2 使用 Confluent 的 Schema Registry
默认的 configuration 会创建DefaultSchemaRegistryClient
bean。如果要使用 Confluent schema 注册表,则需要创建类型的 bean,它取代 framework 默认配置的 bean。以下 example 显示了如何创建这样的 bean:
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
ConfluentSchemaRegistryClient 针对 Confluent platform version 4.0.0 进行测试。
31.6 Schema 注册和解决方案
为了更好地理解 Spring Cloud Stream 如何注册和解析新模式及其对 Avro schema 比较 features 的使用,我们提供了两个单独的子部分:
31.6.1 Schema Registration Process(Serialization)
注册 process 的第一部分是从通过 channel 发送的有效负载中提取 schema。诸如SpecificRecord
或GenericRecord
之类的 Avro 类型已经包含 schema,可以立即从实例中检索。对于 POJO,如果spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
property 设置为true
(默认值),则会推断 schema。
图 1_.Schema Writer Resolution Process
获得 schema,转换器从 remote 服务器加载其元数据(version)。首先,它查询本地缓存。如果未找到结果,则将数据提交给服务器,服务器将回复版本信息。转换器始终缓存结果,以避免为每个需要序列化的新消息查询 Schema Server 的开销。
图 1_.Schema Registration Process
使用 schema version 信息,转换器设置消息的contentType
标头以携带 version 信息 - 对于 example:application/vnd.user.v1+avro
。
31.6.2 Schema Resolution Process(反序列化)
当读取包含 version 信息的消息(即带有 scheme 的contentType
标头,如“Section 31.6.1,“_ Scheche Registration Process(Serialization)””中描述的那样)时,转换器会查询 Schema 服务器以获取消息的 writer schema。一旦找到传入消息的正确 schema,它就会检索 reader schema,并通过使用 Avro 的 schema 解析支持将其读入 reader 定义(设置默认值和任何缺少的 properties)。
图 1_.Schema 阅读决议 Process
您应该了解 writer schema(写入消息的 application)和 reader schema(接收 application)之间的区别。我们建议使用 moment 来读取Avro 术语并理解 process。 Spring Cloud Stream 始终提取 writer schema 以确定如何阅读消息。如果您想让 Avro 的 schema evolution 支持工作,您需要确保为 application 正确设置
readerSchema
。