32. Content Type 协商
数据转换是任何消息驱动的微服务体系结构的核心功能之一。假设在 Spring Cloud Stream 中,此类数据表示为 Spring Message
,则在到达消息目的地之前,可能必须将消息转换为所需的形状或大小。这样做有两个原因:
-
转换传入消息的内容以匹配应用程序提供的处理程序的签名。
-
将外发邮件的内容转换为有线格式。
连线格式通常为byte[]
(对于 Kafka 和 RabbitBinder 而言,这是正确的),但它由 Binder 实现方式控制。
在 Spring Cloud Stream 中,消息转换是通过org.springframework.messaging.converter.MessageConverter
完成的。
Note
作为后续细节的补充,您可能还需要阅读以下blog post。
32.1 Mechanics
为了更好地理解 Content Type 协商的机制和必要性,我们以下面的消息处理程序为例,看一个非常简单的用例:
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String handle(Person person) {..}
Note
为简单起见,我们假设这是应用程序中唯一的处理程序(我们假设没有内部管道)。
前面示例中显示的处理程序期望Person
对象作为参数,并产生String
类型作为输出。为了使框架成功将传入的Message
作为参数传递给此处理程序,它必须以某种方式将Message
类型的有效负载从有线格式转换为Person
类型。换句话说,框架必须找到并应用适当的MessageConverter
。为此,该框架需要用户提供一些指导。处理程序方法本身的签名(Person
类型)已经提供了这些指令之一。因此,从理论上讲,这应该是(并且在某些情况下是足够的)。但是,对于大多数用例,为了选择适当的MessageConverter
,框架需要其他信息。那件遗失的是contentType
。
Spring Cloud Stream 提供了三种机制来定义contentType
(按优先级 Sequences):
-
HEADER :
contentType
可以通过消息本身进行通信。通过提供contentType
Headers,您可以声明用于查找和应用适当的MessageConverter
的 Content Type。 -
BINDING :可以通过设置
spring.cloud.stream.bindings.input.content-type
属性为每个目标绑定设置contentType
。
Note
属性名称中的input
片段对应于目的地的实际名称(在我们的示例中为“Importing”)。通过这种方法,您可以按绑定声明用于查找和应用适当的MessageConverter
的 Content Type。
- DEFAULT :如果
contentType
在Message
Headers 或绑定中不存在,则使用默认的application/json
Content Type 来查找和应用适当的MessageConverter
。
如前所述,前面的列表还演示了平局时的优先 Sequences。例如,Headers 提供的 Content Type 优先于任何其他 Content Type。对于按绑定设置的 Content Type 也是如此,这实际上使您可以覆盖默认 Content Type。但是,它也提供了明智的默认设置(由社区反馈确定)。
将application/json
设置为默认值的另一个原因是由分布式微服务体系结构驱动的互操作性要求,在该体系中,生产者和使用者不仅可以在不同的 JVM 中运行,而且还可以在不同的非 JVM 平台上运行。
当非无效处理程序方法返回时,如果返回值已经是Message
,则该Message
成为有效负载。但是,当返回值不是Message
时,将使用返回值作为有效负载来构造新的Message
,同时从 ImportingMessage
继承 Headers 减去SpringIntegrationProperties.messageHandlerNotPropagatedHeaders
定义或过滤的 Headers。默认情况下,仅设置一个 Headers:contentType
。这意味着新的Message
没有设置contentType
Headers,从而确保contentType
可以演进。您始终可以选择不从处理程序方法中返回Message
,您可以在其中注入所需的任何 Headers。
如果存在内部管道,则通过相同的转换过程将Message
发送到下一个处理程序。但是,如果没有内部管道或您已经到达内部管道的末尾,则Message
将发送回输出目标。
32.1.1Content Type 与参数类型
如前所述,要使框架选择适当的MessageConverter
,它需要参数类型以及(可选)Content Type 信息。选择适当的MessageConverter
的逻辑驻留在参数解析器(HandlerMethodArgumentResolvers
)中,该解析器在调用用户定义的处理程序方法之前(即当框架知道实际的参数类型时)触发。如果参数类型与当前有效负载的类型不匹配,则框架将委派给预先配置的MessageConverters
的堆栈,以查看其中是否有一个可以转换有效负载。如您所见,MessageConverter 的Object fromMessage(Message<?> message, Class<?> targetClass);
操作将targetClass
作为其参数之一。该框架还确保提供的Message
始终包含contentType
Headers。如果没有 contentTypeHeaders,则它会插入按绑定的contentType
Headers 或默认的contentType
Headers。 contentType
参数类型的组合是框架确定消息是否可以转换为目标类型的机制。如果找不到合适的MessageConverter
,则会引发异常,您可以通过添加自定义MessageConverter
(请参阅“ 第 32.3 节“用户定义的消息转换器””)来处理该异常。
但是,如果有效负载类型与处理程序方法声明的目标类型匹配,该怎么办?在这种情况下,没有任何要转换的内容,并且有效负载未经修改地传递。尽管这听起来很简单且合乎逻辑,但请记住以Message<?>
或Object
作为参数的处理程序方法。通过将目标类型声明为Object
(在 Java 中是instanceof
的所有内容),实际上就放弃了转换过程。
Note
不要期望Message
仅基于contentType
转换为其他类型。请记住,contentType
是目标类型的补充。如果您愿意,可以提供一个提示,MessageConverter
可以考虑也可以不考虑。
32.1.2 消息转换器
MessageConverters
定义两种方法:
Object fromMessage(Message<?> message, Class<?> targetClass);
Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);
了解这些方法的约定及其用法非常重要,尤其是在 Spring Cloud Stream 的上下文中。
fromMessage
方法将传入的Message
转换为参数类型。 Message
的有效负载可以是任何类型,并且MessageConverter
的实际实现取决于支持多种类型。例如,某些 JSON 转换器可能支持有效负载类型为byte[]
,String
等。当应用程序包含内部管道(即 Importing→handler1→handler2→..→→输出)并且上游处理程序的输出结果为Message
时(这可能不是初始连线格式),这一点很重要。
但是,toMessage
方法的 Contract 更为严格,必须始终将Message
转换为有线格式byte[]
。
因此,出于所有意图和目的(尤其是在实现自己的转换器时),您将这两种方法视为具有以下签名:
Object fromMessage(Message<?> message, Class<?> targetClass);
Message<byte[]> toMessage(Object payload, @Nullable MessageHeaders headers);
32.2 提供的 MessageConverters
如前所述,该框架已经提供了MessageConverters
的堆栈来处理最常见的用例。以下列表按优先级描述了提供的MessageConverters
(使用了第一个有效的MessageConverter
):
-
ApplicationJsonMessageMarshallingConverter
:org.springframework.messaging.converter.MappingJackson2MessageConverter
的变体。对于contentType
为application/json
(DEFAULT)的情况,支持Message
的有效负载到 POJO 的转换。 -
TupleJsonMessageConverter
: 已弃用 支持将Message
的有效负载转换为org.springframework.tuple.Tuple
或从org.springframework.tuple.Tuple
转换。 -
ByteArrayMessageConverter
:在contentType
为application/octet-stream
的情况下,支持将Message
的有效载荷从byte[]
转换为byte[]
。它本质上是一个传递,主要是为了向后兼容而存在。 -
ObjectStringMessageConverter
:当contentType
为text/plain
时,支持将任何类型转换为String
。它调用 Object 的toString()
方法,或者,如果有效载荷为byte[]
,则调用新的String(byte[])
。 -
JavaSerializationMessageConverter
: 已弃用 当contentType
为application/x-java-serialized-object
时,支持基于 Java 序列化的转换。 -
KryoMessageConverter
: 不推荐使用 当contentType
为application/x-java-object
时,支持基于 Kryo 序列化的转换。 -
JsonUnmarshallingConverter
:类似于ApplicationJsonMessageMarshallingConverter
。当contentType
为application/x-java-object
时,它支持任何类型的转换。它期望将实际类型信息作为属性(例如application/x-java-object;type=foo.bar.Cat
)嵌入到contentType
中。
当找不到合适的转换器时,框架将引发异常。发生这种情况时,应检查代码和配置,并确保您没有错过任何内容(即,确保使用绑定或 Headers 提供了contentType
)。但是,很可能您发现了一些不常见的情况(例如自定义contentType
),并且提供的MessageConverters
的当前堆栈不知道如何进行转换。在这种情况下,您可以添加自定义MessageConverter
。参见第 32.3 节“用户定义的消息转换器”。
32.3 用户定义的消息转换器
Spring Cloud Stream 公开了定义和注册其他MessageConverters
的机制。要使用它,请实现org.springframework.messaging.converter.MessageConverter
,将其配置为@Bean
,并用@StreamMessageConverter
进行 Comments。然后将其添加到 MessageConverter 的现有堆栈中。
Note
请务必将自定义MessageConverter
实现添加到现有堆栈的开头,这一点很重要。因此,自定义MessageConverter
实现优先于现有实现,您可以覆盖它们并添加到现有转换器中。
下面的示例演示如何创建消息转换器 bean 以支持称为application/bar
的新 Content Type:
@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {
...
@Bean
@StreamMessageConverter
public MessageConverter customMessageConverter() {
return new MyCustomMessageConverter();
}
}
public class MyCustomMessageConverter extends AbstractMessageConverter {
public MyCustomMessageConverter() {
super(new MimeType("application", "bar"));
}
@Override
protected boolean supports(Class<?> clazz) {
return (Bar.class.equals(clazz));
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
}
}
Spring Cloud Stream 还为基于 Avro 的转换器和模式演变提供支持。有关详细信息,请参见“ 第 33 章,模式演化支持”。