32. Content Type 协商

数据转换是任何消息驱动的微服务体系结构的核心功能之一。假设在 Spring Cloud Stream 中,此类数据表示为 Spring Message,则在到达消息目的地之前,可能必须将消息转换为所需的形状或大小。这样做有两个原因:

  • 转换传入消息的内容以匹配应用程序提供的处理程序的签名。

  • 将外发邮件的内容转换为有线格式。

连线格式通常为byte[](对于 Kafka 和 RabbitBinder 而言,这是正确的),但它由 Binder 实现方式控制。

在 Spring Cloud Stream 中,消息转换是通过org.springframework.messaging.converter.MessageConverter完成的。

Note

作为后续细节的补充,您可能还需要阅读以下blog post

32.1 Mechanics

为了更好地理解 Content Type 协商的机制和必要性,我们以下面的消息处理程序为例,看一个非常简单的用例:

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String handle(Person person) {..}

Note

为简单起见,我们假设这是应用程序中唯一的处理程序(我们假设没有内部管道)。

前面示例中显示的处理程序期望Person对象作为参数,并产生String类型作为输出。为了使框架成功将传入的Message作为参数传递给此处理程序,它必须以某种方式将Message类型的有效负载从有线格式转换为Person类型。换句话说,框架必须找到并应用适当的MessageConverter。为此,该框架需要用户提供一些指导。处理程序方法本身的签名(Person类型)已经提供了这些指令之一。因此,从理论上讲,这应该是(并且在某些情况下是足够的)。但是,对于大多数用例,为了选择适当的MessageConverter,框架需要其他信息。那件遗失的是contentType

Spring Cloud Stream 提供了三种机制来定义contentType(按优先级 Sequences):

  • HEADERcontentType可以通过消息本身进行通信。通过提供contentTypeHeaders,您可以声明用于查找和应用适当的MessageConverter的 Content Type。

  • BINDING :可以通过设置spring.cloud.stream.bindings.input.content-type属性为每个目标绑定设置contentType

Note

属性名称中的input片段对应于目的地的实际名称(在我们的示例中为“Importing”)。通过这种方法,您可以按绑定声明用于查找和应用适当的MessageConverter的 Content Type。

  • DEFAULT :如果contentTypeMessageHeaders 或绑定中不存在,则使用默认的application/jsonContent Type 来查找和应用适当的MessageConverter

如前所述,前面的列表还演示了平局时的优先 Sequences。例如,Headers 提供的 Content Type 优先于任何其他 Content Type。对于按绑定设置的 Content Type 也是如此,这实际上使您可以覆盖默认 Content Type。但是,它也提供了明智的默认设置(由社区反馈确定)。

application/json设置为默认值的另一个原因是由分布式微服务体系结构驱动的互操作性要求,在该体系中,生产者和使用者不仅可以在不同的 JVM 中运行,而且还可以在不同的非 JVM 平台上运行。

当非无效处理程序方法返回时,如果返回值已经是Message,则该Message成为有效负载。但是,当返回值不是Message时,将使用返回值作为有效负载来构造新的Message,同时从 ImportingMessage继承 Headers 减去SpringIntegrationProperties.messageHandlerNotPropagatedHeaders定义或过滤的 Headers。默认情况下,仅设置一个 Headers:contentType。这意味着新的Message没有设置contentTypeHeaders,从而确保contentType可以演进。您始终可以选择不从处理程序方法中返回Message,您可以在其中注入所需的任何 Headers。

如果存在内部管道,则通过相同的转换过程将Message发送到下一个处理程序。但是,如果没有内部管道或您已经到达内部管道的末尾,则Message将发送回输出目标。

32.1.1Content Type 与参数类型

如前所述,要使框架选择适当的MessageConverter,它需要参数类型以及(可选)Content Type 信息。选择适当的MessageConverter的逻辑驻留在参数解析器(HandlerMethodArgumentResolvers)中,该解析器在调用用户定义的处理程序方法之前(即当框架知道实际的参数类型时)触发。如果参数类型与当前有效负载的类型不匹配,则框架将委派给预先配置的MessageConverters的堆栈,以查看其中是否有一个可以转换有效负载。如您所见,MessageConverter 的Object fromMessage(Message<?> message, Class<?> targetClass);操作将targetClass作为其参数之一。该框架还确保提供的Message始终包含contentTypeHeaders。如果没有 contentTypeHeaders,则它会插入按绑定的contentTypeHeaders 或默认的contentTypeHeaders。 contentType参数类型的组合是框架确定消息是否可以转换为目标类型的机制。如果找不到合适的MessageConverter,则会引发异常,您可以通过添加自定义MessageConverter(请参阅“ 第 32.3 节“用户定义的消息转换器””)来处理该异常。

但是,如果有效负载类型与处理程序方法声明的目标类型匹配,该怎么办?在这种情况下,没有任何要转换的内容,并且有效负载未经修改地传递。尽管这听起来很简单且合乎逻辑,但请记住以Message<?>Object作为参数的处理程序方法。通过将目标类型声明为Object(在 Java 中是instanceof的所有内容),实际上就放弃了转换过程。

Note

不要期望Message仅基于contentType转换为其他类型。请记住,contentType是目标类型的补充。如果您愿意,可以提供一个提示,MessageConverter可以考虑也可以不考虑。

32.1.2 消息转换器

MessageConverters定义两种方法:

Object fromMessage(Message<?> message, Class<?> targetClass);

Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);

了解这些方法的约定及其用法非常重要,尤其是在 Spring Cloud Stream 的上下文中。

fromMessage方法将传入的Message转换为参数类型。 Message的有效负载可以是任何类型,并且MessageConverter的实际实现取决于支持多种类型。例如,某些 JSON 转换器可能支持有效负载类型为byte[]String等。当应用程序包含内部管道(即 Importing→handler1→handler2→..→→输出)并且上游处理程序的输出结果为Message时(这可能不是初始连线格式),这一点很重要。

但是,toMessage方法的 Contract 更为严格,必须始终将Message转换为有线格式byte[]

因此,出于所有意图和目的(尤其是在实现自己的转换器时),您将这两种方法视为具有以下签名:

Object fromMessage(Message<?> message, Class<?> targetClass);

Message<byte[]> toMessage(Object payload, @Nullable MessageHeaders headers);

32.2 提供的 MessageConverters

如前所述,该框架已经提供了MessageConverters的堆栈来处理最常见的用例。以下列表按优先级描述了提供的MessageConverters(使用了第一个有效的MessageConverter):

  • ApplicationJsonMessageMarshallingConverterorg.springframework.messaging.converter.MappingJackson2MessageConverter的变体。对于contentTypeapplication/json(DEFAULT)的情况,支持Message的有效负载到 POJO 的转换。

  • TupleJsonMessageConverter已弃用 支持将Message的有效负载转换为org.springframework.tuple.Tuple或从org.springframework.tuple.Tuple转换。

  • ByteArrayMessageConverter:在contentTypeapplication/octet-stream的情况下,支持将Message的有效载荷从byte[]转换为byte[]。它本质上是一个传递,主要是为了向后兼容而存在。

  • ObjectStringMessageConverter:当contentTypetext/plain时,支持将任何类型转换为String。它调用 Object 的toString()方法,或者,如果有效载荷为byte[],则调用新的String(byte[])

  • JavaSerializationMessageConverter已弃用contentTypeapplication/x-java-serialized-object时,支持基于 Java 序列化的转换。

  • KryoMessageConverter不推荐使用contentTypeapplication/x-java-object时,支持基于 Kryo 序列化的转换。

  • JsonUnmarshallingConverter:类似于ApplicationJsonMessageMarshallingConverter。当contentTypeapplication/x-java-object时,它支持任何类型的转换。它期望将实际类型信息作为属性(例如application/x-java-object;type=foo.bar.Cat)嵌入到contentType中。

当找不到合适的转换器时,框架将引发异常。发生这种情况时,应检查代码和配置,并确保您没有错过任何内容(即,确保使用绑定或 Headers 提供了contentType)。但是,很可能您发现了一些不常见的情况(例如自定义contentType),并且提供的MessageConverters的当前堆栈不知道如何进行转换。在这种情况下,您可以添加自定义MessageConverter。参见第 32.3 节“用户定义的消息转换器”

32.3 用户定义的消息转换器

Spring Cloud Stream 公开了定义和注册其他MessageConverters的机制。要使用它,请实现org.springframework.messaging.converter.MessageConverter,将其配置为@Bean,并用@StreamMessageConverter进行 Comments。然后将其添加到 MessageConverter 的现有堆栈中。

Note

请务必将自定义MessageConverter实现添加到现有堆栈的开头,这一点很重要。因此,自定义MessageConverter实现优先于现有实现,您可以覆盖它们并添加到现有转换器中。

下面的示例演示如何创建消息转换器 bean 以支持称为application/bar的新 Content Type:

@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {

    ...

    @Bean
    @StreamMessageConverter
    public MessageConverter customMessageConverter() {
        return new MyCustomMessageConverter();
    }
}

public class MyCustomMessageConverter extends AbstractMessageConverter {

    public MyCustomMessageConverter() {
        super(new MimeType("application", "bar"));
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return (Bar.class.equals(clazz));
    }

    @Override
    protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
    }
}

Spring Cloud Stream 还为基于 Avro 的转换器和模式演变提供支持。有关详细信息,请参见“ 第 33 章,模式演化支持”。