29. Content Type 和 Transformation
为了允许您传播有关生成的消息的 content type 的信息,Spring Cloud Stream 默认情况下将contentType
标头附加到出站消息。对于不直接支持 headers 的中间件,Spring Cloud Stream 提供了自己的自动包装出站消息的机制。对于支持 headers 的中间件,Spring Cloud Stream applications 可能会收到来自 non-Spring Cloud Stream applications 的给定 content type 的消息。
Spring Cloud Stream 可以通过两种方式根据此信息处理消息:
-
通过入站和出站 channels 的
contentType
设置 -
通过为
@StreamListener
注释的方法执行的参数映射
Spring Cloud Stream 允许您使用 binding 的spring.cloud.stream.bindings.<channelName>.content-type
property 以声明方式配置输入和输出的类型转换。请注意,通过在 application 中使用变换器,也可以轻松完成常规类型转换。目前,Spring Cloud Stream 本身支持流中常用的以下类型转换:
-
JSON to/from POJO
-
JSON to/from org.springframework.tuple.Tuple
-
Object to/from byte [7]:为 remote 传输序列化的原始字节,application 发出的字节,或者使用 Java serialization(requires object 为 Serializable 转换为字节的字节数)
-
String to/from byte [8]
-
对象到纯文本(调用 object 的 toString()方法)
其中 JSON 表示包含 JSON 的 byte array 或 String 有效内容。目前,Objects 可以从 JSON 字节 array 或 String 转换。转换为 JSON 始终生成 String。
如果在出站 channel 上没有设置content-type
property,Spring Cloud Stream 将使用基于KRYO序列化 framework 的序列化程序序列化有效负载。在目的地反序列化消息需要有效负载 class 出现在接收者的 classpath 上。
29.1 MIME 类型
content-type
值被解析为媒体类型,e.g. ,application/json
或text/plain;charset=UTF-8
。 MIME 类型对于指示如何转换为 String 或 byte [15]内容特别有用。 Spring Cloud Stream 还使用 MIME 类型格式来表示 Java 类型,使用带有type
参数的常规类型application/x-java-object
。对于 example,application/x-java-object;type=java.util.Map
或application/x-java-object;type=com.bar.Foo
可以设置为输入 binding 的content-type
property。此外,Spring Cloud Stream 提供自定义 MIME 类型,特别是application/x-spring-tuple
来指定元组。
29.2 MIME 类型和 Java 类型
类型转换 Spring Cloud Stream 提供的开箱即用的概述如下 table:'Source Payload'表示转换前的有效负载,'Target Payload'表示转换后的'payload'。类型转换可以在'producer'侧(输出)或'consumer'侧(输入)进行。
来源有效载荷 | 目标有效载荷 | content-type 标题(来源信息) | content-type 标题(转换后) | 评论 |
---|---|---|---|---|
POJO | JSON String | 忽视 | application/json | |
元组 | JSON String | 忽视 | application/json | JSON 专为 Tuple 量身定制 |
POJO | String(toString()) | 忽视 | text/plain,java.lang.String | |
POJO | byte [24](java.io 序列化) | 忽视 | application/x-java-serialized-object | |
JSON 字节[25]或 String | POJO | application/json(或 none) | application/x-java-object | |
byte [26]或 String | 序列化 | application/x-java-serialized-object | application/x-java-object | |
JSON 字节[27]或 String | 元组 | application/json(或 none) | application/x-spring-tuple | |
字节[28] | 串 | 任何 | text/plain,java.lang.String | 将应用 content-type 标头中指定的任何 Charset |
串 | 字节[29] | 任何 | application/octet-stream | 将应用 content-type 标头中指定的任何 Charset |
转换适用于需要类型转换的有效负载。例如,如果 application 生成带有 outputType=application/json 的 XML string,则有效负载将不会从 XML 转换为 JSON。这是因为发送到出站 channel 的有效负载已经是 String,因此在运行时不会应用任何转换。同样重要的是要注意,在使用默认序列化机制时,有效负载 class 必须在发送和接收 application 之间共享,并与二进制内容兼容。当 application code 在两个 applications 中独立更改时,这可能会产生问题,因为二进制格式和 code 可能会变得不兼容。
虽然入站和出站 channel 都支持转换,但特别建议将其用于转发出站邮件。对于入站消息的转换,尤其是当目标是 POJO 时,
@StreamListener
支持将自动执行转换。
29.3 自定义邮件转换
除了它支持开箱即用的转换外,Spring Cloud Stream 还支持注册您自己的消息转换 implementations。这允许您以各种自定义格式(包括二进制)发送和接收数据,并将它们与特定的contentTypes
相关联。 Spring Cloud Stream 将org.springframework.messaging.converter.MessageConverter
类型的所有 beans 注册为自定义消息转换器以及开箱即用的消息转换器。
如果消息转换器需要使用特定的content-type
和目标 class(对于输入和输出),则消息转换器需要扩展org.springframework.messaging.converter.AbstractMessageConverter
。对于使用@StreamListener
时的转换,实现org.springframework.messaging.converter.MessageConverter
的消息转换器就足够了。
这是在 Spring Cloud Stream application 中创建消息转换器 bean(带 content-type application/bar
)的示例:
@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {
...
@Bean
public MessageConverter customMessageConverter() {
return new MyCustomMessageConverter();
}
public class MyCustomMessageConverter extends AbstractMessageConverter {
public MyCustomMessageConverter() {
super(new MimeType("application", "bar"));
}
@Override
protected boolean supports(Class<?> clazz) {
return (Bar.class == clazz);
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
}
}
Spring Cloud Stream 还支持 Avro-based 转换器和 schema evolution。有关详细信息,请参阅具体部分。
29.4 @StreamListener 和消息转换
@StreamListener
annotation 提供了一种转换传入消息的便捷方式,而无需指定输入 channel 的 content type。在将 process 分派给使用@StreamListener
注释的方法期间,如果参数需要,将自动应用转换。
对于 example,让我们考虑一个带有 String 内容{"greeting":"Hello, world"}
的消息,并在输入 channel 上接收application/json
的content-type
头。让我们考虑以下接收它的应用程序:
public class GreetingMessage {
String greeting;
public String getGreeting() {
return greeting;
}
public void setGreeting(String greeting) {
this.greeting = greeting;
}
}
@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class GreetingSink {
@StreamListener(Sink.INPUT)
public void receive(Greeting greeting) {
// handle Greeting
}
}
该方法的参数将自动填充包含 JSON String 的 unmarshalled 形式的 POJO。