33. 模式演进支持

Spring Cloud Stream 提供了对模式演化的支持,因此数据可以随着时间的推移而演化,并且仍然可以与较新或较旧的生产者和 Consumer 一起使用,反之亦然。大多数序列化模型,尤其是旨在跨不同平台和语言进行移植的模型,都依赖于一种描述如何在二进制有效负载中序列化数据的模式。为了序列化数据然后解释它,发送方和接收方都必须有权访问描述二进制格式的模式。在某些情况下,可以从序列化时的有效负载类型或反序列化时的目标类型推断模式。但是,许多应用程序都可以从访问描述二进制数据格式的显式架构中受益。通过模式注册表,您可以以文本格式(通常为 JSON)存储模式信息,并使该信息可用于需要它以二进制格式接收和发送数据的各种应用程序。模式可引用为一个 Tuples,该 Tuples 包括:

  • 主题,是架构的逻辑名称

  • 模式版本

  • 模式格式,描述数据的二进制格式

以下各节详细介绍了架构演变过程中涉及的各种组件。

33.1 Schema Registry Client

与Schema Registry表服务器进行交互的 Client 端抽象是SchemaRegistryClient接口,该接口具有以下结构:

public interface SchemaRegistryClient {

    SchemaRegistrationResponse register(String subject, String format, String schema);

    String fetch(SchemaReference schemaReference);

    String fetch(Integer id);

}

Spring Cloud Stream 提供了开箱即用的实现,可用于与其自己的模式服务器进行交互以及与 Confluent Schema Registry 进行交互。

可以使用@EnableSchemaRegistryClient来配置 Spring Cloud Stream 模式注册表的 Client 端,如下所示:

@EnableBinding(Sink.class)
  @SpringBootApplication
  @EnableSchemaRegistryClient
  public static class AvroSinkApplication {
    ...
  }

Note

默认转换器经过优化,不仅可以缓存来自远程服务器的架构,还可以缓存parse()toString()方法,这非常昂贵。因此,它使用不缓存响应的DefaultSchemaRegistryClient。如果要更改默认行为,则可以直接在代码上使用 Client 端,并将其覆盖为所需的结果。为此,您必须将属性spring.cloud.stream.schemaRegistryClient.cached=true添加到应用程序属性中。

33.1.1 Schema Registry Client属性

Schema Registry Client 支持以下属性:

  • spring.cloud.stream.schemaRegistryClient.endpoint

    • 模式服务器的位置。进行设置时,请使用完整的 URL,包括协议(httphttps),端口和上下文路径。
  • Default

    • http://localhost:8990/
  • spring.cloud.stream.schemaRegistryClient.cached

    • Client 端是否应缓存架构服务器响应。通常设置为false,因为缓存发生在消息转换器中。使用Schema Registry Client的 Client 端应将此设置为true
  • Default

    • true

33.2 Avro Schema RegistryClient 端消息转换器

对于已向应用程序上下文注册了 SchemaRegistryClient Bean 的应用程序,Spring Cloud Stream 会自动配置 Apache Avro 消息转换器以进行模式 Management。由于接收消息的应用程序可以轻松访问可以与自己的读取器模式进行协调的写入器模式,因此这简化了模式的演变。

对于出站消息,如果通道的 Content Type 设置为application/*+avro,则激活MessageConverter,如以下示例所示:

spring.cloud.stream.bindings.output.contentType=application/*+avro

在出站转换期间,消息转换器尝试使用SchemaRegistryClient推断每个出站消息的模式(基于其类型),并将其注册到主题(基于有效负载类型)。如果已经找到相同的模式,则将检索对其的引用。如果不是,则注册架构,并提供新的版本号。通过使用以下方案,该消息以contentTypeHeaders 发送:application/[prefix].[subject].v[version]+avro,其中prefix是可配置的,而subject是从有效负载类型推论得到的。

例如,类型为User的消息可能作为二进制有效负载发送,Content Type 为application/vnd.user.v2+avro,其中user是主题,而2是版本号。

接收消息时,转换器从传入消息的头中推断出模式引用,并尝试检索它。该模式在反序列化过程中用作编写器模式。

33.2.1 Avro Schema Registry表消息转换器属性

如果通过设置spring.cloud.stream.bindings.output.contentType=application/*+avro启用了基于 Avro 的Schema Registry Client,则可以通过设置以下属性来自定义注册行为。

  • spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled

    • 如果您希望转换器使用反射从 POJO 推断模式,则启用。

默认值:false

  • spring.cloud.stream.schema.avro.readerSchema

    • Avro 通过查看写入器模式(原始有效负载)和读取器模式(您的应用程序有效负载)来比较模式版本。有关更多信息,请参见Avro documentation。如果设置,则它将覆盖模式服务器上的所有查找,并将本地模式用作读取器模式。默认值:null
  • spring.cloud.stream.schema.avro.schemaLocations

    • 向 Schema Server 注册此属性中列出的任何.avsc文件。

默认值:empty

  • spring.cloud.stream.schema.avro.prefix

    • Content-TypeHeaders 上要使用的前缀。

默认值:vnd

33.3 Apache Avro 消息转换器

Spring Cloud Stream 通过其spring-cloud-stream-schema模块为基于模式的消息转换器提供支持。当前,基于模式的消息转换器开箱即用的唯一序列化格式是 Apache Avro,将来的版本中将添加更多格式。

spring-cloud-stream-schema模块包含两种可用于 Apache Avro 序列化的消息转换器:

  • 转换器使用序列化或反序列化对象的类信息或启动时具有已知位置的模式。

  • 使用Schema Registry表的转换器。它们在运行时定位架构,并随着域对象的 Developing 动态注册新架构。

33.4 具有模式支持的转换器

AvroSchemaMessageConverter支持通过使用 sched 义的架构或通过使用类中可用的架构信息(反射地或包含在SpecificRecord中)来对消息进行序列化和反序列化。如果提供自定义转换器,则不会创建默认的 AvroSchemaMessageConverter bean。以下示例显示了一个自定义转换器:

要使用自定义转换器,您只需将其添加到应用程序上下文中,就可以选择指定一个或多个与其关联的MimeTypes。默认MimeTypeapplication/avro

如果转换的目标类型为GenericRecord,则必须设置一个架构。

以下示例显示了如何通过在不使用 sched 义架构的情况下注册 Apache Avro MessageConverter来在接收器应用程序中配置转换器。在此示例中,请注意,MIME 类型值为avro/bytes,而不是默认值application/avro

@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {

  ...

  @Bean
  public MessageConverter userMessageConverter() {
      return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
  }
}

相反,以下应用程序使用 sched 义的架构(在 Classpath 上找到)注册一个转换器:

@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {

  ...

  @Bean
  public MessageConverter userMessageConverter() {
      AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
      converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
      return converter;
  }
}

33.5 Schema Registry表服务器

Spring Cloud Stream 提供了一个Schema Registry表服务器实现。要使用它,您可以将spring-cloud-stream-schema-server工件添加到您的项目中,并使用@EnableSchemaRegistryServer注解,这会将Schema Registry表服务器 REST 控制器添加到您的应用程序。该 Comments 旨在与 Spring Boot Web 应用程序一起使用,并且服务器的监听端口由server.port属性控制。 spring.cloud.stream.schema.server.path属性可用于控制模式服务器的根路径(尤其是当它嵌入在其他应用程序中时)。 spring.cloud.stream.schema.server.allowSchemaDeletion布尔属性允许删除架构。默认情况下,这是禁用的。

Schema Registry表服务器使用关系数据库来存储架构。默认情况下,它使用嵌入式数据库。您可以使用Spring Boot SQL 数据库和 JDBC 配置选项定制模式存储。

以下示例显示了启用Schema Registry表的 Spring Boot 应用程序:

@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(SchemaRegistryServerApplication.class, args);
    }
}

33.5.1 Schema Registry表服务器 API

Schema Registry Server API 包含以下操作:

注册新架构

要注册新模式,请向/端点发送POST请求。

/接受具有以下字段的 JSON 有效负载:

  • subject:架构主题

  • format:模式格式

  • definition:模式定义

它的响应是 JSON 中的架构对象,具有以下字段:

  • id:模式 ID

  • subject:架构主题

  • format:模式格式

  • version:模式版本

  • definition:模式定义

按主题,格式和版本检索现有架构

要按主题,格式和版本检索现有模式,请向GET端点发送GET请求。

它的响应是 JSON 中的架构对象,具有以下字段:

  • id:模式 ID

  • subject:架构主题

  • format:模式格式

  • version:模式版本

  • definition:模式定义

按主题和格式检索现有架构

要按主题和格式检索现有模式,请向GET端点发送GET请求。

它的响应是 JSON 中每个模式对象的模式列表,其中包含以下字段:

  • id:模式 ID

  • subject:架构主题

  • format:模式格式

  • version:模式版本

  • definition:模式定义

通过 ID 检索现有架构

要通过其 ID 检索架构,请将GET请求发送到/schemas/{id}端点。

它的响应是 JSON 中的架构对象,具有以下字段:

  • id:模式 ID

  • subject:架构主题

  • format:模式格式

  • version:模式版本

  • definition:模式定义

按主题,格式和版本删除架构

要删除由其主题,格式和版本标识的架构,请向DELETE端点发送DELETE请求。

按 ID 删除架构

要通过其 ID 删除架构,请将DELETE请求发送到/schemas/{id}端点。

按主题删除架构

DELETE /{subject}

按主题删除现有架构。

Note

本说明仅适用于 Spring Cloud Stream 1.1.0.RELEASE 的用户。 Spring Cloud Stream 1.1.0.RELEASE 使用表名schema来存储Schema个对象。 Schema是许多数据库实现中的关键字。为了避免将来发生任何冲突,从 1.1.1.RELEASE 开始,我们为存储表选择了名称SCHEMA_REPOSITORY。任何升级的 Spring Cloud Stream 1.1.0.RELEASE 用户都应在升级之前将其现有模式迁移到新表。

33.5.2 使用 Confluent 的Schema Registry表

默认配置将创建一个DefaultSchemaRegistryClient bean。如果要使用 Confluent 模式注册表,则需要创建ConfluentSchemaRegistryClient类型的 Bean,该 Bean 将取代框架默认配置的 Bean。以下示例显示了如何创建这样的 bean:

@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
  ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
  client.setEndpoint(endpoint);
  return client;
}

Note

ConfluentSchemaRegistryClient 已针对 Confluent 平台 4.0.0 版进行了测试。

33.6 模式注册和解析

为了更好地了解 Spring Cloud Stream 如何注册和解析新模式及其对 Avro 模式比较功能的使用,我们提供了两个单独的小节:

33.6.1 模式注册过程(序列化)

注册过程的第一部分是从通过通道发送的有效负载中提取模式。诸如SpecificRecordGenericRecord之类的 Avro 类型已经包含一个架构,可以从实例中立即检索该架构。对于 POJO,如果spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled属性设置为true(默认值),则将推断模式。

图 33.1. 模式编写器解析过程

schema resolution

获得一个模式,转换器从远程服务器加载其元数据(版本)。首先,它查询本地缓存。如果未找到结果,它将把数据提交给服务器,服务器将提供版本信息。转换器始终缓存结果,以避免为每个需要序列化的新消息查询 Schema Server 的开销。

图 33.2. 模式注册过程

registration

借助模式版本信息,转换器将消息的contentTypeHeaders 设置为携带版本信息,例如application/vnd.user.v1+avro

33.6.2 模式解析过程(反序列化)

当读取包含版本信息的消息(即具有类似“ 第 33.6.1 节“模式注册过程(序列化)””下所述方案的contentType头)时,转换器将查询 Schema 服务器以获取消息的编写者模式。一旦找到了传入消息的正确架构,它将检索阅读器架构,并使用 Avro 的架构解析支持将其读入阅读器定义(设置默认值和任何缺少的属性)。

图 33.3. 模式阅读解析过程

schema reading

Note

您应该了解写程序架构(编写消息的应用程序)和读程序架构(接收应用程序)之间的区别。我们建议花点时间阅读Avro 术语并理解该过程。 Spring Cloud Stream 始终会获取编写者架构来确定如何读取消息。如果要使 Avro 的模式演变支持正常工作,则需要确保为应用程序正确设置了readerSchema