31. Schema Evolution 支持

Spring Cloud Stream 为 schema evolution 提供支持,以便数据可以在 time 内进化,并且仍然适用于较旧或较新的生产者和消费者,反之亦然。大多数序列化模型,特别是那些旨在跨不同平台和语言实现可移植性的模型,依赖于描述如何在二进制有效负载中序列化数据的 schema。在序列化数据然后解释数据的过程中,发送方和接收方都必须能够访问描述二进制格式的 schema。在某些情况下,schema 可以从序列化的有效负载类型或反序列化的目标类型推断出来。但是,许多 applications 受益于访问描述二进制数据格式的显式 schema。 schema 注册表允许您以文本格式(通常是 JSON)存储 schema 信息,并使该信息可供需要它以二进制格式接收和发送数据的各种应用程序访问。 schema 可作为元组引用,包括:

  • 作为 schema 的逻辑 name 的主题

  • schema version

  • schema 格式,描述数据的二进制格式

以下部分将详细介绍 schema evolution process 中涉及的各个组件。

31.1 Schema Registry Client

用于与 schema 注册服务器交互的 client-side 抽象是SchemaRegistryClient接口,它具有以下结构:

public interface SchemaRegistryClient {

    SchemaRegistrationResponse register(String subject, String format, String schema);

    String fetch(SchemaReference schemaReference);

    String fetch(Integer id);

}

Spring Cloud Stream 提供 out-of-the-box implementations,用于与自己的 schema 服务器进行交互,以及与 Confluent Schema Registry 进行交互。

可以使用@EnableSchemaRegistryClient配置 Spring Cloud Stream schema 注册表的 client,如下所示:

@EnableBinding(Sink.class)
  @SpringBootApplication
  @EnableSchemaRegistryClient
  public static class AvroSinkApplication {
    ...
  }

默认转换器经过优化,不仅可以缓存来自 remote 服务器的模式,还可以缓存非常昂贵的parse()toString()方法。因此,它使用不缓存响应的DefaultSchemaRegistryClient。如果您打算更改默认行为,可以直接在 code 上使用 client 并将其覆盖为所需的结果。为此,您必须将 property spring.cloud.stream.schemaRegistryClient.cached=true添加到 application properties 中。

31.1.1 Schema Registry Client Properties

Schema Registry Client 支持以下 properties:

  • spring.cloud.stream.schemaRegistryClient.endpoint

    • schema-server 的位置。设置此项时,请使用完整的 URL,包括协议(httphttps), port 和 context 路径。
  • 默认

    • http://localhost:8990/
  • spring.cloud.stream.schemaRegistryClient.cached

    • client 是否应该缓存 schema 服务器响应。通常设置为false,因为缓存发生在消息转换器中。 Clients 使用 schema 注册表 client 应将此设置为true
  • 默认

    • true

31.2 Avro Schema Registry Client 消息转换器

对于具有使用 application context 注册的 SchemaRegistryClient bean 的 applications,Spring Cloud Stream 自动为 schema management 配置 Apache Avro 消息转换器。这样可以简化 schema 进化,因为接收消息的应用程序可以轻松访问 writer schema,可以与自己的 reader schema 协调。

对于出站消息,如果 channel 的 content type 设置为application/*+avro,则激活MessageConverter,如下面的示例所示:

spring.cloud.stream.bindings.output.contentType=application/*+avro

在出站转换期间,消息转换器尝试推断每个出站消息的 schema(基于其类型),并使用SchemaRegistryClient将其注册到主题(基于有效内容类型)。如果已找到相同的 schema,则检索对它的 reference。如果没有,则注册 schema,并提供新的 version number。使用以下 scheme:application/[prefix].[subject].v[version]+avro发送带有contentType标头的消息,其中prefix是可配置的,subject是从有效负载类型推导出来的。

对于 example,类型User的消息可能作为二进制有效负载发送,

接收消息时,转换器会从传入消息的标头中推断 schema reference 并尝试检索它。 schema 在反序列化 process 中用作 writer schema。

31.2.1 Avro Schema 注册表消息转换器 Properties

如果通过设置spring.cloud.stream.bindings.output.contentType=application/*+avro启用了基于 Avro 的 schema 注册表 client,则可以通过设置以下 properties 来自定义注册行为。

  • spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled

    • 如果希望转换器使用反射从 POJO 推断 Schema,则启用。

默认值:false

  • spring.cloud.stream.schema.avro.readerSchema

    • Avro 通过查看 writer schema(原始有效负载)和 reader schema(您的 application 有效负载)来比较 schema 版本。有关详细信息,请参阅Avro 文档。如果设置,则覆盖 schema 服务器上的任何查找,并使用本地 schema 作为 reader schema。默认值:null
  • spring.cloud.stream.schema.avro.schemaLocations

    • 使用 Schema Server 注册此 property 中列出的所有.avsc files。

默认值:empty

  • spring.cloud.stream.schema.avro.prefix

    • 要在 Content-Type 标头上使用的前缀。

默认值:vnd

31.3 Apache Avro 消息转换器

Spring Cloud Stream 通过spring-cloud-stream-schema模块为 schema-based 消息转换器提供支持。目前,支持 schema-based 消息转换器的唯一序列化格式是 Apache Avro,在未来版本中将添加更多格式。

spring-cloud-stream-schema模块包含两种类型的消息转换器,可用于 Apache Avro 序列化:

  • 转换器使用序列化或反序列化 objects 的 class 信息或 schema,其位置在启动时已知。

  • 使用 schema 注册表的转换器。他们在运行时定位模式,并在域 objects 进化时动态注册新模式。

具有 Schema 支持的 31.4 转换器

AvroSchemaMessageConverter支持通过使用预定义的 schema 或使用 class 中可用的 schema 信息(反射或包含在SpecificRecord中)来序列化和反序列化消息。如果您提供自定义转换器,则不会创建默认的 AvroSchemaMessageConverter bean。以下 example 显示了一个自定义转换器:

要使用自定义转换器,只需将其添加到 application context,可以选择指定一个或多个与之关联的MimeTypes。默认MimeTypeapplication/avro

如果转换的目标类型是GenericRecord,则必须设置 schema。

以下 example 显示了如何通过在没有预定义 schema 的情况下注册 Apache Avro MessageConverter来在 sink application 中配置转换器。在此 example 中,请注意 mime 类型 value 是avro/bytes,而不是默认application/avro

@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {

  ...

  @Bean
  public MessageConverter userMessageConverter() {
      return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
  }
}

相反,以下 application 注册一个带有预定义 schema 的转换器(在 classpath 上找到):

@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {

  ...

  @Bean
  public MessageConverter userMessageConverter() {
      AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
      converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
      return converter;
  }
}

31.5 Schema Registry Server

Spring Cloud Stream 提供 schema 注册表服务器 implementation。要使用它,可以将spring-cloud-stream-schema-server artifact 添加到项目中并使用@EnableSchemaRegistryServer annotation,它将 schema 注册表服务器 REST 控制器添加到您的 application。此 annotation 旨在与 Spring Boot web applications 一起使用,并且服务器的监听 port 由server.port property 控制。 spring.cloud.stream.schema.server.path property 可用于控制 schema 服务器的根路径(特别是当它嵌入其他 applications 时)。 spring.cloud.stream.schema.server.allowSchemaDeletion boolean property 可以删除 schema。默认情况下,禁用此功能。

schema 注册表服务器使用关系数据库来存储模式。默认情况下,它使用嵌入式数据库。您可以使用Spring Boot SQL 数据库和 JDBC configuration 选项自定义 schema 存储。

以下 example 显示了启用 schema 注册表的 Spring Boot application:

@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(SchemaRegistryServerApplication.class, args);
    }
}

31.5.1 Schema Registry Server API

Schema Registry Server API 包含以下操作:

注册新的 Schema

要注册新的 schema,请向/端点发送POST请求。

/接受带有以下字段的 JSON 有效内容:

  • subject:schema 主题

  • format:schema 格式

  • definition:schema 定义

它的响应是 JSON 中的 schema object,其中包含以下字段:

  • id:schema ID

  • subject:schema 主题

  • format:schema 格式

  • version:schema version

  • definition:schema 定义

按主题,格式和 Version 检索现有 Schema

要按主题,格式和 version 检索现有 schema,请将GET请求发送到/{subject}/{format}/{version}端点。

它的响应是 JSON 中的 schema object,其中包含以下字段:

  • id:schema ID

  • subject:schema 主题

  • format:schema 格式

  • version:schema version

  • definition:schema 定义

按主题和格式检索现有 Schema

要按主题和格式检索现有 schema,请将GET请求发送到/subject/format端点。

它的响应是 JSON 中每个 schema object 的模式列表,包含以下字段:

  • id:schema ID

  • subject:schema 主题

  • format:schema 格式

  • version:schema version

  • definition:schema 定义

按 ID 检索现有 Schema

要通过其 ID 检索 schema,请将GET请求发送到/schemas/{id}端点。

它的响应是 JSON 中的 schema object,其中包含以下字段:

  • id:schema ID

  • subject:schema 主题

  • format:schema 格式

  • version:schema version

  • definition:schema 定义

按主题,格式和 Version 删除 Schema

要删除由其 subject,format 和 version 标识的 schema,请向/{subject}/{format}/{version}端点发送DELETE请求。

按 ID 删除 Schema

要按其 ID 删除 schema,请向/schemas/{id}端点发送DELETE请求。

按主题删除 Schema

DELETE /{subject}

按主题删除现有架构。

本说明仅适用于 Spring Cloud Stream 1.1.0.RELEASE 的用户。 Spring Cloud Stream 1.1.0.RELEASE 使用 table name,schema来存储Schema objects。 Schema是许多数据库 implementations 中的关键字。为了避免将来出现任何冲突,从 1.1.1.RELEASE 开始,我们选择了存储 table 的 name SCHEMA_REPOSITORY。任何升级的 Spring Cloud Stream 1.1.0.RELEASE 用户都应该在升级之前将其现有架构迁移到新的 table。

31.5.2 使用 Confluent 的 Schema Registry

默认的 configuration 会创建DefaultSchemaRegistryClient bean。如果要使用 Confluent schema 注册表,则需要创建类型的 bean,它取代 framework 默认配置的 bean。以下 example 显示了如何创建这样的 bean:

@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
  ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
  client.setEndpoint(endpoint);
  return client;
}

ConfluentSchemaRegistryClient 针对 Confluent platform version 4.0.0 进行测试。

31.6 Schema 注册和解决方案

为了更好地理解 Spring Cloud Stream 如何注册和解析新模式及其对 Avro schema 比较 features 的使用,我们提供了两个单独的子部分:

31.6.1 Schema Registration Process(Serialization)

注册 process 的第一部分是从通过 channel 发送的有效负载中提取 schema。诸如SpecificRecordGenericRecord之类的 Avro 类型已经包含 schema,可以立即从实例中检索。对于 POJO,如果spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled property 设置为true(默认值),则会推断 schema。

图 1_.Schema Writer Resolution Process

schema 决议

获得 schema,转换器从 remote 服务器加载其元数据(version)。首先,它查询本地缓存。如果未找到结果,则将数据提交给服务器,服务器将回复版本信息。转换器始终缓存结果,以避免为每个需要序列化的新消息查询 Schema Server 的开销。

图 1_.Schema Registration Process

注册

使用 schema version 信息,转换器设置消息的contentType标头以携带 version 信息 - 对于 example:application/vnd.user.v1+avro

31.6.2 Schema Resolution Process(反序列化)

当读取包含 version 信息的消息(即带有 scheme 的contentType标头,如“Section 31.6.1,“_ Scheche Registration Process(Serialization)””中描述的那样)时,转换器会查询 Schema 服务器以获取消息的 writer schema。一旦找到传入消息的正确 schema,它就会检索 reader schema,并通过使用 Avro 的 schema 解析支持将其读入 reader 定义(设置默认值和任何缺少的 properties)。

图 1_.Schema 阅读决议 Process

schema 阅读

您应该了解 writer schema(写入消息的 application)和 reader schema(接收 application)之间的区别。我们建议使用 moment 来读取Avro 术语并理解 process。 Spring Cloud Stream 始终提取 writer schema 以确定如何阅读消息。如果您想让 Avro 的 schema evolution 支持工作,您需要确保为 application 正确设置readerSchema