56. Span 数据作为消息

您可以通过将spring-cloud-sleuth-stream jar 包含为依赖项来累积和发送 span 数据,并添加 Channel Binder implementation(e.g. spring-cloud-starter-stream-rabbit用于 RabbitMQ 或spring-cloud-starter-stream-kafka用于 Kafka)。这将自动将您的应用程序转换为有效负载类型为Spans的消息的 producer。 spans 将被发送到的 channel name 称为sleuth

56.1 Zipkin Consumer

spring-cloud-sleuth-zipkin-stream已弃用,不应再使用。请使用 OpenZipkin 的 Zipkin 服务器并将环境变量设置为这里是 rabbit(Zipkin 2.4.6)这里是 kafka(Zipkin 2.4.6)

有一个特殊的便利注释,用于为 Span 数据设置消息 consumer 并将其推送到 Zipkin SpanStore。这个应用程序

@SpringBootApplication
@EnableZipkinStreamServer
public class Consumer {
	public static void main(String[] args) {
		SpringApplication.run(Consumer.class, args);
	}
}

将通过 Spring Cloud Stream Binder(e.g. 包含spring-cloud-starter-stream-rabbit用于 RabbitMQ,并且类似的 starter 存在于 Redis 和 Kafka 中)监听您提供的任何传输的 Span 数据。如果添加以下 UI 依赖项

<groupId>io.zipkin.java</groupId>
<artifactId>zipkin-autoconfigure-ui</artifactId>

然后你将拥有一个Zipkin 服务器,它在 port 9411 上托管 UI 和 api。

默认SpanStore是 in-memory(适用于演示和快速入门)。对于更强大的解决方案,您可以将 MySQL 和spring-boot-starter-jdbc添加到 classpath 并通过 configuration,e.g 启用 JDBC SpanStore。:

spring:
  rabbitmq:
    host: ${RABBIT_HOST:localhost}
  datasource:
    schema: classpath:/mysql.sql
    url: jdbc:mysql://${MYSQL_HOST:localhost}/test
    username: root
    password: root
# Switch this on to create the schema on startup:
    initialize: true
    continueOnError: true
  sleuth:
    enabled: false
zipkin:
  storage:
    type: mysql

@EnableZipkinStreamServer也用@EnableZipkinServer注释,因此 process 还将公开标准的 Zipkin 服务器 endpoints,用于通过 HTTP 收集 spans,以及用于在 Zipkin Web UI 中查询。

56.2 自定义 Consumer

使用spring-cloud-sleuth-stream和 binding 到SleuthSink也可以轻松实现自定义 consumer。 例:

@EnableBinding(SleuthSink.class)
@SpringBootApplication(exclude = SleuthStreamAutoConfiguration.class)
@MessageEndpoint
public class Consumer {

    @ServiceActivator(inputChannel = SleuthSink.INPUT)
    public void sink(Spans input) throws Exception {
        // ... process spans
    }
}

上面的 sample consumer application 明确排除了SleuthStreamAutoConfiguration,因此它不会向自己发送消息,但这是可选的(您可能实际上想要将请求跟踪到 consumer 应用程序中)。

在 order 中自定义轮询机制,您可以使用 name 等于StreamSpanReporter.POLLER创建PollerMetadata类型的 bean。在这里,您可以找到这种 configuration 的 example。

@Configuration
public static class CustomPollerConfiguration {

	@Bean(name = StreamSpanReporter.POLLER)
	PollerMetadata customPoller() {
		PollerMetadata poller = new PollerMetadata();
		poller.setMaxMessagesPerPoll(500);
		poller.setTrigger(new PeriodicTrigger(5000L));
		return poller;
	}
}