30. Content Type Negotiation

数据转换是任何 message-driven 微服务架构的核心功能之一。鉴于此,在 Spring Cloud Stream 中,此类数据表示为 Spring Message,在到达目的地之前,可能必须将消息转换为所需的形状或大小。这有两个原因:

  • 要将传入消息的内容转换为匹配 application-provided 处理程序的签名。

  • 将传出消息的内容转换为有线格式。

有线格式通常是byte[](Kafka 和 Rabbit binders 的 true),但它由 binder implementation 控制。

在 Spring Cloud Stream 中,使用org.springframework.messaging.converter.MessageConverter完成消息转换。

作为要遵循的细节的补充,您可能还想阅读以下博客文章

30.1 Mechanics

为了更好地理解机制和 content-type negotiation 背后的必要性,我们通过使用以下消息处理程序作为 example 来查看一个非常简单的用例:

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String handle(Person person) {..}

为简单起见,我们假设这是 application 中唯一的处理程序(我们假设没有内部管道)。

前面的 example 中显示的处理程序需要Person object 作为参数,并生成String类型作为输出。在 framework 成功将传入的Message作为参数传递给此处理程序时,它必须以某种方式将Message类型的有效负载从有线格式转换为Person类型。换句话说,framework 必须找到并应用适当的MessageConverter。为此,framework 需要用户的一些指示。其中一条指令已由处理程序方法本身的签名提供(Person类型)。因此,从理论上讲,这应该(并且在某些情况下)应该足够了。但是,对于大多数用例,在 select 中选择适当的MessageConverter,framework 需要一条额外的信息。丢失的那块是contentType

Spring Cloud Stream 提供了三种机制来定义contentType(在优先顺序中):

  • HEADERcontentType可以通过 Message 本身进行通信。通过提供contentType标头,您可以声明 content type 以用于查找和应用相应的MessageConverter

  • BINDING:可以通过设置spring.cloud.stream.bindings.input.content-type property 为每个目标 binding 设置contentType

property name 中的input段对应于目标的实际 name(在我们的示例中为“input”)。这种方法允许您在 per-binding 基础上声明 content type 以用于定位和应用适当的MessageConverter

  • DEFAULT:如果Message标头或 binding 中不存在contentType,则默认application/json content type 用于查找并应用相应的MessageConverter

如前所述,前面的列表还演示了绑定情况下的优先顺序。对于 example,header-provided content type 优先于任何其他 content type。这同样适用于基于 per-binding 的 content type 设置,它基本上允许您覆盖默认的 content type。但是,它也提供合理的默认值(这是根据社区反馈确定的)。

使application/json成为默认值的另一个原因源于分布式微服务体系结构驱动的互操作性要求,其中 producer 和 consumer 不仅在不同的 JVM 中运行,而且还可以在不同的 non-JVM 平台上运行。

当 non-void 处理程序方法返回时,如果 return value 已经是Message,则Message成为有效负载。但是,当 return value 不是Message时,新的Message构造时 return value 作为有效负载,同时从输入Message继承 headers 减去由SpringIntegrationProperties.messageHandlerNotPropagatedHeaders定义或过滤的 headers。默认情况下,只有一个标头集:contentType。这意味着新的Message没有设置contentType标头,从而确保contentType可以进化。您始终可以选择不从处理程序方法返回Message,您可以在其中输入您希望的任何标题。

如果存在内部管道,则通过执行相同的转换过程将Message发送到下一个处理程序。但是,如果没有内部管道或者您已到达它的末尾,则Message将被发送回输出目标。

30.1.1 Content Type 与参数类型

如前所述,为了 framework 选择适当的MessageConverter,它需要参数类型和 content type 信息。选择适当的MessageConverter的逻辑驻留在参数解析器(HandlerMethodArgumentResolvers)中,该参数解析器在 user-defined 处理程序方法的调用之前触发(这是 framework 知道实际参数类型的时候)。如果参数类型不匹配当前有效负载的类型,则 framework 委托给 pre-configured MessageConverters的堆栈以查看它们中的任何一个是否可以转换有效负载。如您所见,MessageConverter 的Object fromMessage(Message<?> message, Class<?> targetClass);操作将targetClass作为其 arguments 之一。 framework 还确保提供的Message始终包含contentType标头。如果没有 contentType 标头,则会注入 per-binding contentType标头或默认的contentType标头。 contentType参数类型的组合是 framework 确定消息是否可以转换为目标类型的机制。如果找不到合适的MessageConverter,则抛出 exception,您可以通过添加自定义MessageConverter来处理(请参阅“第 30.3 节,“User-defined 消息转换器””)。

但是如果有效负载类型与处理程序方法声明的目标类型匹配怎么办?在这种情况下,没有任何东西可以转换,并且有效载荷是未经修改的。虽然这听起来非常简单和合乎逻辑,但请记住以Message<?>Object作为参数的处理程序方法。通过将目标类型声明为Object(这是 Java 中的instanceof所有内容),您基本上会丧失转换 process。

不要指望Message仅基于contentType转换为其他类型。请记住,contentType是目标类型的补充。如果您愿意,可以提供一个提示,可能会或可能不会考虑。

30.1.2 消息转换器

MessageConverters定义两种方法:

Object fromMessage(Message<?> message, Class<?> targetClass);

Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);

了解这些方法及其用法的 contract 非常重要,特别是在 Spring Cloud Stream 的 context 中。

fromMessage方法将传入的Message转换为参数类型。 Message的有效负载可以是任何类型,并且由MessageConverter的实际 implementation 来支持多种类型。例如,某些 JSON 转换器可能支持有效负载类型为byte[]String等。当 application 包含内部管道(即输入→处理程序 1→处理程序 2→...→输出)时,这很重要,并且上游处理程序的输出会产生Message,这可能不是初始有线格式。

但是,toMessage方法具有更严格的 contract,并且必须始终将Message转换为有线格式:byte[]

因此,对于所有意图和目的(尤其是在实现自己的转换器时),您认为这两种方法具有以下签名:

Object fromMessage(Message<?> message, Class<?> targetClass);

Message<byte[]> toMessage(Object payload, @Nullable MessageHeaders headers);

30.2 提供了 MessageConverters

如前所述,framework 已经提供了一堆MessageConverters来处理大多数 common 用例。以下列表描述了提供的MessageConverters,在优先顺序中(使用的第一个MessageConverter):

  • ApplicationJsonMessageMarshallingConverterorg.springframework.messaging.converter.MappingJackson2MessageConverter的变化。当contentTypeapplication/json(DEFAULT)时,支持转换Message to/from POJO 的有效负载。

  • TupleJsonMessageConverterDEPRECATED支持转换Message to/from org.springframework.tuple.Tuple的有效负载。

  • ByteArrayMessageConverter:当contentTypeapplication/octet-stream时,支持将Message的有效负载从byte[]转换为byte[]。它本质上是一种传递,主要用于向后兼容。

  • ObjectStringMessageConverter:当contentTypetext/plain时,支持将任何类型转换为String。它调用 Object 的toString()方法,或者如果有效负载为byte[],则调用新的String(byte[])

  • JavaSerializationMessageConverterDEPRECATEDcontentTypeapplication/x-java-serialized-object时,支持基于 java 序列化的转换。

  • KryoMessageConverterDEPRECATEDcontentTypeapplication/x-java-object时,支持基于 Kryo 序列化的转换。

  • JsonUnmarshallingConverter:类似于ApplicationJsonMessageMarshallingConverter。当contentTypeapplication/x-java-object时,它支持任何类型的转换。它希望将实际类型信息作为属性嵌入contentType(对于 example,application/x-java-object;type=foo.bar.Cat)。

如果找不到合适的转换器,framework 将抛出 exception。当发生这种情况时,您应该检查 code 和 configuration 并确保您没有遗漏任何内容(即,确保您使用 binding 或标头提供了contentType)。但是,最有可能的是,您发现了一些不常见的情况(例如自定义contentType),并且当前提供的MessageConverters堆栈不知道如何转换。如果是这种情况,您可以添加自定义MessageConverter。见第 30.3 节,“User-defined 消息转换器”

30.3 User-defined 消息转换器

Spring Cloud Stream 公开了一种定义和注册附加MessageConverters的机制。要使用它,请实现org.springframework.messaging.converter.MessageConverter,将其配置为@Bean,并使用@StreamMessageConverter进行注释。然后它被添加到现有的MessageConverter s 堆栈中。

重要的是要了解自定义MessageConverter __mplement 将添加到现有堆栈的头部。因此,自定义MessageConverter implementations 优先于现有的_i_semplementations,这使您可以覆盖以及添加到现有转换器。

以下 example 显示了如何创建消息转换器 bean 以支持名为application/bar的新 content type:

@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {

    ...

    @Bean
    @StreamMessageConverter
    public MessageConverter customMessageConverter() {
        return new MyCustomMessageConverter();
    }
}

public class MyCustomMessageConverter extends AbstractMessageConverter {

    public MyCustomMessageConverter() {
        super(new MimeType("application", "bar"));
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return (Bar.class.equals(clazz));
    }

    @Override
    protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
    }
}

Spring Cloud Stream 还支持 Avro-based 转换器和 schema evolution。有关详细信息,请参阅“第 31 章,Schema Evolution 支持”。