29. Content Type 和 Transformation

为了允许您传播有关生成的消息的 content type 的信息,Spring Cloud Stream 默认情况下将contentType标头附加到出站消息。对于不直接支持 headers 的中间件,Spring Cloud Stream 提供了自己的自动包装出站消息的机制。对于支持 headers 的中间件,Spring Cloud Stream applications 可能会收到来自 non-Spring Cloud Stream applications 的给定 content type 的消息。

Spring Cloud Stream 可以通过两种方式根据此信息处理消息:

  • 通过入站和出站 channels 的contentType设置

  • 通过为@StreamListener注释的方法执行的参数映射

Spring Cloud Stream 允许您使用 binding 的spring.cloud.stream.bindings.<channelName>.content-type property 以声明方式配置输入和输出的类型转换。请注意,通过在 application 中使用变换器,也可以轻松完成常规类型转换。目前,Spring Cloud Stream 本身支持流中常用的以下类型转换:

  • JSON to/from POJO

  • JSON to/from org.springframework.tuple.Tuple

  • Object to/from byte [7]:为 remote 传输序列化的原始字节,application 发出的字节,或者使用 Java serialization(requires object 为 Serializable 转换为字节的字节数)

  • String to/from byte [8]

  • 对象纯文本(调用 object 的 toString()方法)

其中 JSON 表示包含 JSON 的 byte array 或 String 有效内容。目前,Objects 可以从 JSON 字节 array 或 String 转换。转换为 JSON 始终生成 String。

如果在出站 channel 上没有设置content-type property,Spring Cloud Stream 将使用基于KRYO序列化 framework 的序列化程序序列化有效负载。在目的地反序列化消息需要有效负载 class 出现在接收者的 classpath 上。

29.1 MIME 类型

content-type值被解析为媒体类型,e.g. ,application/jsontext/plain;charset=UTF-8。 MIME 类型对于指示如何转换为 String 或 byte [15]内容特别有用。 Spring Cloud Stream 还使用 MIME 类型格式来表示 Java 类型,使用带有type参数的常规类型application/x-java-object。对于 example,application/x-java-object;type=java.util.Mapapplication/x-java-object;type=com.bar.Foo可以设置为输入 binding 的content-type property。此外,Spring Cloud Stream 提供自定义 MIME 类型,特别是application/x-spring-tuple来指定元组。

29.2 MIME 类型和 Java 类型

类型转换 Spring Cloud Stream 提供的开箱即用的概述如下 table:'Source Payload'表示转换前的有效负载,'Target Payload'表示转换后的'payload'。类型转换可以在'producer'侧(输出)或'consumer'侧(输入)进行。

来源有效载荷目标有效载荷content-type标题(来源信息)content-type标题(转换后)评论
POJOJSON String忽视application/json
元组JSON String忽视application/jsonJSON 专为 Tuple 量身定制
POJOString(toString())忽视text/plain,java.lang.String
POJObyte [24](java.io 序列化)忽视application/x-java-serialized-object
JSON 字节[25]或 StringPOJOapplication/json(或 none)application/x-java-object
byte [26]或 String序列化application/x-java-serialized-objectapplication/x-java-object
JSON 字节[27]或 String元组application/json(或 none)application/x-spring-tuple
字节[28]任何text/plain,java.lang.String将应用 content-type 标头中指定的任何 Charset
字节[29]任何application/octet-stream将应用 content-type 标头中指定的任何 Charset

转换适用于需要类型转换的有效负载。例如,如果 application 生成带有 outputType=application/json 的 XML string,则有效负载将不会从 XML 转换为 JSON。这是因为发送到出站 channel 的有效负载已经是 String,因此在运行时不会应用任何转换。同样重要的是要注意,在使用默认序列化机制时,有效负载 class 必须在发送和接收 application 之间共享,并与二进制内容兼容。当 application code 在两个 applications 中独立更改时,这可能会产生问题,因为二进制格式和 code 可能会变得不兼容。

虽然入站和出站 channel 都支持转换,但特别建议将其用于转发出站邮件。对于入站消息的转换,尤其是当目标是 POJO 时,@StreamListener支持将自动执行转换。

29.3 自定义邮件转换

除了它支持开箱即用的转换外,Spring Cloud Stream 还支持注册您自己的消息转换 implementations。这允许您以各种自定义格式(包括二进制)发送和接收数据,并将它们与特定的contentTypes相关联。 Spring Cloud Stream 将org.springframework.messaging.converter.MessageConverter类型的所有 beans 注册为自定义消息转换器以及开箱即用的消息转换器。

如果消息转换器需要使用特定的content-type和目标 class(对于输入和输出),则消息转换器需要扩展org.springframework.messaging.converter.AbstractMessageConverter。对于使用@StreamListener时的转换,实现org.springframework.messaging.converter.MessageConverter的消息转换器就足够了。

这是在 Spring Cloud Stream application 中创建消息转换器 bean(带 content-type application/bar)的示例:

@EnableBinding(Sink.class)
@SpringBootApplication
public static class SinkApplication {

  ...

  @Bean
  public MessageConverter customMessageConverter() {
    return new MyCustomMessageConverter();
  }
public class MyCustomMessageConverter extends AbstractMessageConverter {

	public MyCustomMessageConverter() {
		super(new MimeType("application", "bar"));
	}

	@Override
  protected boolean supports(Class<?> clazz) {
    return (Bar.class == clazz);
  }

	@Override
	protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
		Object payload = message.getPayload();
		return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
	}
}

Spring Cloud Stream 还支持 Avro-based 转换器和 schema evolution。有关详细信息,请参阅具体部分

29.4 @StreamListener 和消息转换

@StreamListener annotation 提供了一种转换传入消息的便捷方式,而无需指定输入 channel 的 content type。在将 process 分派给使用@StreamListener注释的方法期间,如果参数需要,将自动应用转换。

对于 example,让我们考虑一个带有 String 内容{"greeting":"Hello, world"}的消息,并在输入 channel 上接收application/jsoncontent-type头。让我们考虑以下接收它的应用程序:

public class GreetingMessage {

  String greeting;

  public String getGreeting() {
    return greeting;
  }

  public void setGreeting(String greeting) {
    this.greeting = greeting;
  }
}

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class GreetingSink {

		@StreamListener(Sink.INPUT)
		public void receive(Greeting greeting) {
			// handle Greeting
		}
	}

该方法的参数将自动填充包含 JSON String 的 unmarshalled 形式的 POJO。