Flume 1.9.0 用户指南

Introduction

Overview

Apache Flume 是一个分布式,可靠且可用的系统,用于有效地收集,聚集大量日志数据并将其从许多不同的源移动到集中式数据存储中。

Apache Flume 的使用不仅限于日志数据聚合。由于数据源是可定制的,因此 Flume 可用于传输大量事件数据,包括但不限于网络流量数据,社交媒体生成的数据,电子邮件消息以及几乎所有可能的数据源。

Apache Flume 是 Apache Software Foundation 的顶级项目。

System Requirements

  • Java 运行时环境-Java 1.8 或更高版本

  • 内存-足够的内存用于源,通道或接收器使用的配置

  • 磁盘空间-足够的磁盘空间用于通道或接收器使用的配置

  • 目录权限-代理程序使用的目录的读/写权限

Architecture

数据流模型

Flume 事件定义为具有字节有效负载和可选的字符串属性集的数据流单位。 Flume 代理是一个(JVM)进程,承载了组件,事件通过这些组件从外部源流到下一个目标(跳)。

代理组件图

Flume 源使用由外部源(如 Web 服务器)传递给它的事件。外部源以目标 Flume 源可以识别的格式将事件发送到 Flume。例如,Avro Flume 源可用于从流中从 Avro 接收器发送事件的流中的 AvroClient 端或其他 Flume 代理接收 Avro 事件。可以使用 Thrift Flume 源定义类似的流程,以接收来自 Thrift Sink 或 Flume Thrift RpcClient 端或以 Flume Thrift 协议生成的任何语言编写的 ThriftClient 端的事件.Flume 源接收到事件后,会将其存储到一个或多个 Channel。通道是一个被动存储,用于保留事件,直到被 Flume Sink 占用为止。文件通道是一个示例–由本地文件系统支持。接收器从通道中删除事件,并将其放入 HDFS 之类的外部存储库(通过 Flume HDFS 接收器),或将其转发到流中下一个 Flume 代理(下一跳)的 Flume 源。给定代理中的源和接收器与通道中上演的事件异步运行。

Complex flows

Flume 允许用户构建多跳流程,其中事件在到达最终目的地之前会通过多个代理传播。它还允许扇入和扇出流,上下文路由和备用路由(故障转移)。

Reliability

事件在每个代理上的一个通道中上演。然后将事件传递到流中的下一个代理或终端存储库(如 HDFS)。仅将事件存储在下一个代理程序的信道或终端存储库中之后,才将其从信道中删除。这就是 Flume 中单跳消息传递语义如何提供流的端到端可靠性的方式。

Flume 使用事务处理方法来确保事件的可靠传递。源和接收器分别在事务中封装存储在通道中或由通道提供的事务中提供的事件的存储/检索。这确保了事件集在流中从点到点可靠地传递。在多跳流的情况下,来自上一跳的接收器和来自下一跳的源均运行其事务,以确保将数据安全地存储在下一跳的通道中。

Recoverability

事件在通道中上演,该通道 Management 从故障中恢复。 Flume 支持持久的文件通道,该通道由本地文件系统支持。还有一个内存通道,它仅将事件存储在内存队列中,速度更快,但是当代理进程死亡时,仍保留在内存通道中的任何事件都无法恢复。

Setup

设置代理

Flume 代理配置存储在本地配置文件中。这是遵循 Java 属性文件格式的文本文件。可以在同一配置文件中指定一个或多个代理的配置。配置文件包括代理中每个源,接收器和通道的属性,以及它们如何连接在一起以形成数据流。

配置单个组件

流中的每个组件(源,接收器或通道)都有一个名称,类型和特定于该类型和实例化的属性集。例如,一个 Avro 源需要一个主机名(或 IP 地址)和一个端口号来接收数据。一个内存通道可以具有最大队列大小(“容量”),并且 HDFS 接收器需要知道文件系统 URI,创建文件的路径,文件旋转的频率(“ hdfs.rollInterval”)等。组件的所有此类属性需要在托管 Flume 代理的属性文件中进行设置。

将各部分连接在一起

代理需要知道要加载哪些单独的组件以及如何进行连接才能构成流程。通过列出代理中每个源,接收器和通道的名称,然后为每个接收器和源指定连接通道来完成此操作。例如,代理通过称为文件通道的文件通道将事件从名为 avroWeb 的 Avro 源流到 HDFS 接收器 hdfs-cluster1.配置文件将包含这些组件的名称和文件通道,作为 avroWeb 源和 hdfs-cluster1 接收器的共享通道。

启动代理

使用称为 flume-ng 的 Shell 程序脚本启动代理,该脚本位于 Flume 发行版的 bin 目录中。您需要在命令行上指定代理名称,配置目录和配置文件:

$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

现在,代理将开始运行在给定属性文件中配置的源和接收器。

一个简单的例子

在这里,我们提供了一个示例配置文件,描述了单节点 Flume 部署。通过此配置,用户可以生成事件,然后将其记录到控制台。

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

此配置定义名为 a1 的单个代理。 a1 具有侦听端口 44444 上的数据的源,在内存中缓冲事件数据的通道以及将事件数据记录到控制台的接收器。配置文件为各个组件命名,然后描述它们的类型和配置参数。给定的配置文件可能会定义几个命名的代理。当启动给定的 Flume 进程时,会传递一个标志,告诉它要显示哪个命名的代理。

有了这个配置文件,我们可以如下启动 Flume:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

请注意,在完整部署中,我们通常会另外包含一个选项:\-\-conf=\<conf\-dir\>\<conf\-dir\>目录将包含一个 shell 脚本* flume-env.sh *和一个 log4j 属性文件。在此示例中,我们传递了一个 Java 选项来强制 Flume 登录到控制台,并且没有自定义环境脚本。

然后,我们可以从另一个终端通过 telnet 端口 44444 发送 Flume 事件:

$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK

原始的 Flume 终端将在日志消息中输出事件。

12/06/19 15:32:19 INFO source.NetcatSource: Source starting
12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D          Hello world!. }

恭喜,您已经成功配置和部署了 Flume 代理!后续部分将更详细地介绍代理配置。

在配置文件中使用环境变量

Flume 可以替换配置中的环境变量。例如:

a1.sources = r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = ${NC_PORT}
a1.sources.r1.channels = c1

注意:目前仅适用于值,不适用于键。 (即,仅在配置行=标记的“右侧”.)

可以通过设置属性 Implementation = org.apache.flume.node.EnvVarResolverProperties 在代理调用上通过 Java 系统属性来启用此功能。

  • For example::
$ NC_PORT=44444 bin/flume-ng agent –conf conf –conf-file example.conf –name a1 -Dflume.root.logger=INFO,console -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties

注意,以上只是示例,可以用其他方式配置环境变量,包括在 conf/flume-env.sh 中设置。

记录原始数据

在许多生产环境中,记录流经摄取管道的原始数据流不是理想的行为,因为这可能会导致敏感数据或与安全相关的配置(例如密钥)泄漏到 Flume 日志文件。默认情况下,Flume 不会记录此类信息。另一方面,如果数据管道断开,Flume 将尝试提供调试问题的线索。

调试事件管道问题的一种方法是设置一个额外的Memory Channel连接到Logger Sink,它将所有事件数据输出到 Flume 日志。但是,在某些情况下,这种方法是不够的。

为了启用与事件和配置有关的数据的日志记录,除了 log4j 属性之外,还必须设置一些 Java 系统属性。

要启用与配置有关的日志记录,请设置 Java 系统属性\-Dorg\.apache\.flume\.log\.printconfig=true。这可以在命令行中传递,也可以在* flume-env.sh *的JAVA_OPTS变量中进行设置。

要启用数据记录,请按照上述相同的方式设置 Java 系统属性\-Dorg\.apache\.flume\.log\.rawdata=true。对于大多数组件,还必须将 log4j 日志记录级别设置为 DEBUG 或 TRACE,以使特定于事件的日志记录出现在 Flume 日志中。

这是一个同时启用配置日志记录和原始数据日志记录,同时还将 Log4j 日志级别设置为 DEBUG 以进行控制台输出的示例:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true

基于 Zookeeper 的配置

Flume 通过 Zookeeper 支持代理配置。 *这是一项实验性功能.*配置文件需要以可配置的前缀上传到 Zookeeper 中。配置文件存储在 Zookeeper 节点数据中。以下是代理 a1 和 a2 的 Zookeeper 节点树的外观

- /flume
 |- /a1 [Agent config file]
 |- /a2 [Agent config file]

上传配置文件后,使用以下选项启动代理

Note

$ bin/flume-ng 代理–conf conf -z zkhost:2181,zkhost1:2181 -p/flume –name a1 -Dflume.root.logger = INFO,控制台

Argument NameDefaultDescription
zZookeeper 连接字符串。主机名:端口的逗号分隔列表
p/flumeZookeeper 中用于存储代理配置的基本路径

安装第三方插件

Flume 具有完全基于插件的体系结构。尽管 Flume 附带了许多现成的源,通道,接收器,序列化器等,但存在许多与 Flume 分开提供的实现。

尽管始终可以通过将自定义 Flume 组件的 jar 添加到 flume-env.sh 文件中的 FLUME_CLASSPATH 变量中来包括它们,但 Flume 现在支持一个名为plugins\.d的特殊目录,该目录会自动选择以特定格式打包的插件。这样可以更轻松地 Management 插件打包问题,还可以简化几类问题的调试和故障排除,尤其是库依赖冲突。

plugins.d 目录

plugins\.d目录位于$FLUME_HOME/plugins\.d。在启动时,flume\-ng启动脚本在plugins\.d目录中查找符合以下格式的插件,并在启动java时将其包含在正确的路径中。

插件的目录布局

plugins\.d中的每个插件(子目录)最多可以具有三个子目录:

  • lib-插件的 jar

  • libext-插件的依赖项

  • 本机-任何必需的本机库,例如\.so文件

plugins.d 目录中的两个插件的示例:

plugins.d/
plugins.d/custom-source-1/
plugins.d/custom-source-1/lib/my-source.jar
plugins.d/custom-source-1/libext/spring-core-2.5.6.jar
plugins.d/custom-source-2/
plugins.d/custom-source-2/lib/custom.jar
plugins.d/custom-source-2/native/gettext.so

Data ingestion

Flume 支持多种机制来从外部源获取数据。

RPC

Flume 发行版中包含的 AvroClient 端可以使用 avro RPC 机制将给定文件发送到 Flume Avro 源:

$ bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10

上面的命令会将/usr/logs/log.10 的内容发送到在该端口上监听的 Flume 源。

Executing commands

有一个 exec 源,它执行给定的命令并使用输出。单个“行”输出,即 Literals 后跟回车符(' r')或换行符(' n')或两者一起。

Network streams

Flume 支持以下机制从流行的日志流类型读取数据,例如:

  • Avro

  • Thrift

  • Syslog

  • Netcat

设置多主体流程

两个代理通过 Avro RPC 进行通信

为了使数据跨多个代理或跃点流动,前一个代理的接收器和当前跃点的源必须为 avro 类型,接收器指向源的主机名(或 IP 地址)和端口。

Consolidation

日志收集中的一种非常常见的情况是,大量的日志生成 Client 端将数据发送到连接到存储子系统的几个使用方代理。例如,从数百台 Web 服务器收集的日志发送到许多写入 HDFS 群集的代理。

使用 Avro RPC 在一个地方合并事件的扇入流

这可以在 Flume 中实现,方法是为多个第一层代理配置一个 avro 接收器,它们均指向单个代理的 avro 源(同样,在这种情况下,您可以使用节俭的源/接收器/Client 端)。第二层代理上的此源将接收到的事件合并到一个通道中,该通道由接收器消耗到其最终目的地。

多路复用

Flume 支持将事件流复用到一个或多个目的地。这是通过定义一种流多路复用器来实现的,该流多路复用器可以将事件复制或选择性地路由到一个或多个通道。

使用(复用)通道 selectors 的扇出流

上面的示例显示了来自代理“ foo”的源,将流扩展到三个不同的通道。此扇出可以复制或多路复用。在复制流的情况下,每个事件都发送到所有三个通道。对于多路复用情况,当事件的属性与预配置的值匹配时,事件将传递到可用通道的子集。例如,如果将名为“ txnType”的事件属性设置为“Client”,则应将其转到 channel1 和 channel3,如果将其为“ vendor”,则应将其转到 channel2,否则将其转到 channel3.可以在代理的配置文件中设置 Map。

Configuration

如前一节所述,从类似于 Java 属性文件格式和分层属性设置的文件中读取 Flume 代理配置。

定义流程

要定义单个代理中的流,您需要通过通道链接源和接收器。您需要列出给定代理的源,接收器和通道,然后将源和接收器指向一个通道。一个源实例可以指定多个通道,但是一个接收器实例只能指定一个通道。格式如下:

# list the sources, sinks and channels for the agent
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>

# set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...

# set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>

例如,名为 agent_foo 的代理正在从外部 avroClient 端读取数据,并通过内存通道将其发送到 HDFS。配置文件 weblog.config 如下所示:

# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.sinks = hdfs-sink-1
agent_foo.channels = mem-channel-1

# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1

# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1

这将使事件从 avro-AppSrv 源通过内存通道 mem-channel-1 流到 hdfs-Cluster1-sink。当代理使用 weblog.config 作为其配置文件启动时,它将实例化该流。

配置单个组件

定义流之后,您需要设置每个源,接收器和通道的属性。这以相同的分层名称空间方式完成,在该方式中,您可以为每个组件特定的属性设置组件类型和其他值:

# properties for sources
<Agent>.sources.<Source>.<someProperty> = <someValue>

# properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue>

# properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>

需要为 Flume 的每个组件设置属性“类型”,以了解它需要成为哪种对象。每个源,接收器和通道类型都有其自己的属性集,以使其按预期运行。所有这些都需要根据需要进行设置。在前面的示例中,我们有一个从 avro-AppSrv-source 通过内存通道 mem-channel-1 到 hdfs-Cluster1-sink 的流。这是一个示例,显示每个组件的配置:

agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1

# set channel for sources, sinks

# properties of avro-AppSrv-source
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000

# properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100

# properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata

#...

在代理中添加多个流

一个 Flume 代理可以包含多个独立的流。您可以在配置中列出多个源,接收器和通道。这些组件可以链接形成多个流程:

# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

然后,您可以将源和接收器链接到通道(对于接收器)的相应通道(对于源),以设置两个不同的流。例如,如果您需要在一个代理中设置两个流,一个从外部 avroClient 端流到外部 HDFS,另一个从尾部输出流到 avro 接收器,那么这里是一个配置:

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1 exec-tail-source2
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2

# flow #1 configuration
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1

# flow #2 configuration
agent_foo.sources.exec-tail-source2.channels = file-channel-2
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2

配置多代理流程

要设置多层流,您需要使第一跳的 avro /节俭接收器指向下一跳的 avro /节俭来源。这将导致第一个 Flume 代理将事件转发到下一个 Flume 代理。例如,如果您要使用 avroClient 端定期将文件(每个事件 1 个文件)发送到本地 Flume 代理,则此本地代理可以将其转发到已挂载用于存储的另一个代理。

Weblog 代理配置:

# list sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = avro-forward-sink
agent_foo.channels = file-channel

# define the flow
agent_foo.sources.avro-AppSrv-source.channels = file-channel
agent_foo.sinks.avro-forward-sink.channel = file-channel

# avro sink properties
agent_foo.sinks.avro-forward-sink.type = avro
agent_foo.sinks.avro-forward-sink.hostname = 10.1.1.100
agent_foo.sinks.avro-forward-sink.port = 10000

# configure other pieces
#...

HDFS 代理配置:

# list sources, sinks and channels in the agent
agent_foo.sources = avro-collection-source
agent_foo.sinks = hdfs-sink
agent_foo.channels = mem-channel

# define the flow
agent_foo.sources.avro-collection-source.channels = mem-channel
agent_foo.sinks.hdfs-sink.channel = mem-channel

# avro source properties
agent_foo.sources.avro-collection-source.type = avro
agent_foo.sources.avro-collection-source.bind = 10.1.1.100
agent_foo.sources.avro-collection-source.port = 10000

# configure other pieces
#...

在这里,我们将来自 Weblog 代理的 avro-forward-sink 链接到 hdfs 代理的 avro-collection-source。这将导致来自外部应用服务器源的事件最终被存储在 HDFS 中。

扇出流量

如上一节所述,Flume 支持将流从一个源散发到多个通道。扇出有两种模式,即复制和多路复用。在复制流程中,事件被发送到所有已配置的通道。在多路复用的情况下,事件仅发送到合格信道的子集。要散发流,需要为源指定一个通道列表以及散发它的策略。这是通过添加可以复制或多路复用的通道“selectors”来完成的。如果是多路复用器,则进一步指定选择规则。如果未指定 selectors,则默认情况下它是复制的:

# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>

# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>

<Agent>.sources.<Source1>.selector.type = replicating

多路复用选择具有另一组属性,以使流分叉。这需要指定事件属性到通道集合的 Map。selectors 检查事件 Headers 中的每个已配置属性。如果它与指定值匹配,那么该事件将发送到 Map 到该值的所有通道。如果没有匹配项,则将事件发送到配置为默认值的一组通道:

# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...

<Agent>.sources.<Source1>.selector.default = <Channel2>

Map 允许每个值的通道重叠。

以下示例具有一个多路复用到两条路径的流。名为 agent_foo 的代理具有单个 avro 源和链接到两个接收器的两个通道:

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2

# set channels for source
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2

# set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2

# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

selectors 检查名为“状态”的 Headers。如果值为“ CA”,则将其发送到 mem-channel-1;如果其值为“ AZ”,则将其发送到文件通道 2;如果值为“ NY”,则两者都发送。如果未设置“ State”Headers,或者与三个 Headers 都不匹配,则转到标为“默认”的 mem-channel-1.

selectors 还支持可选通道。要为 Headers 指定可选通道,可通过以下方式使用 config 参数“ optional”:

# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

selectors 将首先尝试写入所需的通道,并且即使这些通道中的一个通道也无法使用事件,事务也会失败。该 Transaction 会在所有 Channel 上重新尝试。一旦所有必需的通道都消耗了事件,selectors 将尝试写入可选通道。任何可选通道未能消耗事件都将被忽略,不会重试。

如果可选通道和特定 Headers 的必需通道之间存在重叠,则认为该通道是必需的,并且通道中的故障将导致重试整个必需通道集。例如,在上面的示例中,即使 Headers“ CA”将 mem-channel-1 标记为必需和可选,也将其视为必需通道,并且写入此通道失败将导致该事件在为 selectors 配置的“所有”通道上重试。

请注意,如果 Headers 没有任何必需的通道,则事件将被写入默认通道,并尝试将其写入该 Headers 的可选通道。如果未指定必需的通道,则指定可选通道仍将导致事件写入默认通道。如果没有将任何通道指定为默认通道,并且没有要求,selectors 将尝试将事件写入可选通道。在这种情况下,任何故障都将被忽略。

SSL/TLS support

一些 Flume 组件支持 SSL/TLS 协议,以便与其他系统安全地通信。

ComponentSSL 服务器或 Client 端
Avro Sourceserver
Avro Sinkclient
Thrift Sourceserver
Thrift Sinkclient
Kafka Sourceclient
Kafka Channelclient
Kafka Sinkclient
HTTP Sourceserver
JMS Sourceclient
Syslog TCP 源server
多端口 Syslog TCP 源server

与 SSL 兼容的组件具有多个用于设置 SSL 的配置参数,例如启用 SSL 标志,密钥库/信任库参数(位置,密码,类型)和其他 SSL 参数(例如,禁用的协议)。

始终在代理程序配置文件中的组件级别上指定为组件启用 SSL。因此,某些组件可以配置为使用 SSL,而其他组件则不使用(即使具有相同的组件类型)。

可以在组件级别或全局指定密钥库/信任库设置。

在设置组件级别的情况下,通过特定于组件的参数在代理配置文件中配置密钥库/信任库。此方法的优点是组件可以使用不同的密钥库(如果需要)。缺点是必须为代理程序配置文件中的每个组件复制密钥库参数。组件级设置是可选的,但如果定义了,则其优先级高于全局参数。

使用全局设置,一次定义密钥库/信任库参数并为所有组件使用相同的设置就足够了,这意味着越来越少的集中式配置。

可以通过系统属性或环境变量来配置全局设置。

System propertyEnvironment variableDescription
javax.net.ssl.keyStoreFLUME_SSL_KEYSTORE_PATHKeystore location
javax.net.ssl.keyStorePasswordFLUME_SSL_KEYSTORE_PASSWORDKeystore password
javax.net.ssl.keyStoreTypeFLUME_SSL_KEYSTORE_TYPE密钥库类型(默认为 JKS)
javax.net.ssl.trustStoreFLUME_SSL_TRUSTSTORE_PATHTruststore location
javax.net.ssl.trustStorePasswordFLUME_SSL_TRUSTSTORE_PASSWORDTruststore password
javax.net.ssl.trustStoreTypeFLUME_SSL_TRUSTSTORE_TYPE信任库类型(默认为 JKS)
flume.ssl.include.protocolsFLUME_SSL_INCLUDE_PROTOCOLS计算启用的协议时要包括的协议。以逗号(,)分隔的列表。如果提供的话,排除的协议将从此列表中排除。
flume.ssl.exclude.protocolsFLUME_SSL_EXCLUDE_PROTOCOLS计算启用的协议时要排除的协议。以逗号(,)分隔的列表。
flume.ssl.include.cipherSuitesFLUME_SSL_INCLUDE_CIPHERSUITES计算启用的密码套件时要包括的密码套件。以逗号(,)分隔的列表。如果提供,则排除的密码套件将从此列表中排除。
flume.ssl.exclude.cipherSuitesFLUME_SSL_EXCLUDE_CIPHERSUITES计算启用的密码套件时要排除的密码套件。以逗号(,)分隔的列表。

SSL 系统属性可以在命令行中传递,也可以通过在* conf/flume-env.sh *中设置JAVA_OPTS环境变量来传递。 (尽管不建议使用命令行,因为包含密码的命令将保存到命令历史 Logging.)

export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStore=/path/to/keystore.jks"
export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStorePassword=password"

Flume 使用 JSSE(Java 安全套接字扩展)中定义的系统属性,因此这是设置 SSL 的标准方法。另一方面,在系统属性中指定密码意味着可以在进程列表中看到密码。对于不可接受的情况,也可以在环境变量中定义参数。在这种情况下,Flume 会从内部的相应环境变量中初始化 JSSE 系统属性。

SSL 环境变量可以在启动 Flume 之前在 Shell 环境中设置,也可以在* conf/flume-env.sh *中设置。 (尽管不建议使用命令行,因为包含密码的命令将保存到命令历史 Logging.)

export FLUME_SSL_KEYSTORE_PATH=/path/to/keystore.jks
export FLUME_SSL_KEYSTORE_PASSWORD=password

Please note:

  • 必须在组件级别启用 SSL。仅指定全局 SSL 参数不会有任何效果。

  • 如果在多个级别上指定了全局 SSL 参数,则优先级如下(从高到低):

  • 代理配置中的组件参数

    • system properties

    • environment variables

  • 如果为组件启用了 SSL,但未以上述任何方式指定 SSL 参数,则

  • 如果是密钥库:配置错误

    • 如果是信任库:将使用默认的信任库(在 Oracle JDK 中为jssecacerts/cacerts)
  • trustore 密码在所有情况下都是可选的。如果未指定,那么当 JDK 打开信任库时,将不对信任库执行完整性检查。

源和汇的批次大小和 ChannelTransaction 能力

源和接收器可以具有一个批处理大小参数,该参数确定它们在一个批处理中处理的最大事件数。这在具有上限(称为事务容量)的通道事务内发生。批次大小必须小于 Channel 的 Transaction 能力。进行了明确检查以防止设置不兼容。只要读取配置,就会进行此检查。

Flume Sources

Avro Source

侦听 Avro 端口并从外部 AvroClient 端流接收事件。与另一个(以前的跃点)Flume 代理上的内置 Avro Sink 配对时,它可以创建分层的集合拓扑。必填属性以**表示。

Property NameDefaultDescription
channels
type组件类型名称,必须为avro
bind要监听的主机名或 IP 地址
port要绑定的端口号
threads产生的最大工作线程数
selector.type
selector.*
interceptors以空格分隔的拦截器列表
interceptors.*
compression-typenone这可以是“无”或“放气”。压缩类型必须与匹配的 AvroSource 的压缩类型匹配
sslfalse将此设置为 true 可启用 SSL 加密。如果启用了 SSL,则还必须通过组件级别参数(请参见下文)或作为全局 SSL 参数(请参见SSL/TLS support部分)指定“密钥库”和“密钥库密码”。
keystore这是 Java 密钥库文件的路径。如果在此未指定,则将使用全局密钥库(如果已定义,否则配置错误)。
keystore-passwordJava 密钥库的密码。如果此处未指定,则将使用全局密钥库密码(如果已定义,否则配置错误)。
keystore-typeJKSJava 密钥库的类型。可以是“ JKS”或“ PKCS12”。如果此处未指定,则将使用全局密钥库类型(如果已定义,否则默认为 JKS)。
exclude-protocolsSSLv3要排除的 SSL/TLS 协议的空格分隔列表。除了指定的协议之外,还将始终排除 SSLv3.
include-protocols要包括的 SSL/TLS 协议的空格分隔列表。启用的协议将是包含的协议,而没有排除的协议。如果 included-protocols 为空,则包含所有受支持的协议。
exclude-cipher-suites以空格分隔的要排除的密码套件列表。
include-cipher-suites要包括的密码套件的空格分隔列表。启用的密码套件将是包含的密码套件,而没有排除的密码套件。如果 included-cipher-suites 为空,则包括所有受支持的密码套件。
ipFilterfalse将此设置为 true 可启用 netty 的 ipFiltering
ipFilterRules使用此配置定义 N 个 netty ipFilter 模式规则。

名为 a1 的代理的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

ipFilterRules 的示例

ipFilterRules 定义 N 个净值 ipFilter,以逗号分隔,模式规则必须采用这种格式。

\ <'allow' or deny>:\ <'ip' or 'name' for computer name>:\ 或 allow/deny:ip/name:pattern

example: ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:*

请注意,第一个匹配的规则将适用,如以下示例从 localhost 上的 Client 端显示的那样

这将允许 localhost 上的 Client 端拒绝来自任何其他 IP 的 Client 端“ allow:name:localhost,deny:ip:”这将拒绝 localhost 上的 Client 端允许来自任何其他 IP 的 Client 端“ deny:name:localhost,allow: ip:

Thrift Source

侦听 Thrift 端口并从外部 ThriftClient 端流接收事件。与另一个(以前的跃点)Flume 代理上的内置 ThriftSink 配对时,它可以创建分层的集合拓扑。可通过启用 kerberos 身份验证将节俭源配置为以安全模式启动。 agent-principal 和 agent-keytab 是 Thrift 源用来验证 kerberos KDC 的属性。必填属性以**表示。

Property NameDefaultDescription
channels
type组件类型名称,必须为thrift
bind要监听的主机名或 IP 地址
port要绑定的端口号
threads产生的最大工作线程数
selector.type
selector.*
interceptors空格分隔的拦截器列表
interceptors.*
sslfalse将此设置为 true 可启用 SSL 加密。如果启用了 SSL,则还必须通过组件级别参数(请参见下文)或作为全局 SSL 参数(请参见SSL/TLS support部分)指定“密钥库”和“密钥库密码”
keystore这是 Java 密钥库文件的路径。如果在此未指定,则将使用全局密钥库(如果已定义,否则配置错误)。
keystore-passwordJava 密钥库的密码。如果此处未指定,则将使用全局密钥库密码(如果已定义,否则配置错误)。
keystore-typeJKSJava 密钥库的类型。可以是“ JKS”或“ PKCS12”。如果此处未指定,则将使用全局密钥库类型(如果已定义,否则默认为 JKS)。
exclude-protocolsSSLv3要排除的 SSL/TLS 协议的空格分隔列表。除了指定的协议之外,还将始终排除 SSLv3.
include-protocols要包括的 SSL/TLS 协议的空格分隔列表。启用的协议将是包含的协议,而没有排除的协议。如果 included-protocols 为空,则包含所有受支持的协议。
exclude-cipher-suites以空格分隔的要排除的密码套件列表。
include-cipher-suites要包括的密码套件的空格分隔列表。启用的密码套件将是包含的密码套件,而没有排除的密码套件。
kerberosfalse设置为 true 以启用 kerberos 身份验证。在 kerberos 模式下,成功进行身份验证需要 agent-principal 和 agent-keytab。安全模式下的 Thrift 源将仅接受来自启用了 kerberos 并已成功验证到 kerberos KDC 的 ThriftClient 端的连接。
agent-principalThrift Source 使用的 kerberos 主体对 kerberos KDC 进行身份验证。
agent-keytab—-Thrift Source 结合代理主体使用的 keytab 位置,用于对 kerberos KDC 进行身份验证。

名为 a1 的代理的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

Exec Source

Exec 源代码在启动时运行给定的 Unix 命令,并期望该过程在标准输出上连续产生数据(除非将属性 logStdErr 设置为 true,否则将直接丢弃 stderr)。如果该过程由于某种原因而退出,则源也将退出,并且将不再产生任何数据。这意味着诸如cat \[named pipe\]tail \-F \[file\]之类的配置将产生期望的结果,而date则可能不会-前两个命令产生数据流,而后者则产生单个事件并退出。

必填属性以**表示。

Property NameDefaultDescription
channels
type组件类型名称,必须为exec
command要执行的命令
shell用于运行命令的 Shell 程序调用。例如/ bin/sh -c。仅对于依赖于 Shell 功能(如通配符,反斜杠,管道等)的命令才需要。
restartThrottle10000尝试重新启动之前 await 的时间(以毫秒为单位)
restartfalse如果执行的 cmd 死了,是否应该重新启动
logStdErrfalse是否应记录命令的 stderr
batchSize20一次读取并发送到通道的最大行数
batchTimeout3000如果未达到缓冲区大小,则 await 时间(以毫秒为单位),然后将数据推送到下游
selector.typereplicating复制或多路复用
selector.* 取决于 selector.type 值
interceptors以空格分隔的拦截器列表
interceptors.*

Warning

ExecSource 和其他异步源的问题在于,该源不能保证如果将事件放入 Channel 失败,则 Client 端会知道该事件。在这种情况下,数据将丢失。例如,最常见的功能之一就是类似tail \-F \[file\]的用例,其中应用程序将写到磁盘上的日志文件,Flume 拖尾该文件,将每一行作为事件发送。尽管这是可能的,但是存在一个明显的问题;如果 Channels 已满并且 Flume 无法发送事件怎么办? Flume 无法以某种原因向编写日志文件的应用程序指示需要保留日志或未发送事件。如果这没有任何意义,则您只需要了解以下内容:当使用诸如 ExecSource 之类的单向异步接口时,您的应用程序将无法保证已接收到数据!作为此警告的扩展-且要完全清除-使用此来源时,绝对不会对事件的传递提供零保证。为了获得更高的可靠性保证,请考虑假脱机目录源,尾目录源或通过 SDK 与 Flume 直接集成。

名为 a1 的代理的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

“Shell”配置用于通过命令 Shell(例如 Bash 或 Powershell)调用“命令”。 “命令”作为参数传递给“ shell”以执行。这允许“命令”使用 Shell 程序中的功能,例如通配符,反斜杠,管道,循环,条件等。在没有“Shell 程序”配置的情况下,将直接调用“命令”。 'shell'的常用值:'/ bin/sh -c','/ bin/ksh -c','cmd/c','powershell -Command'等。

a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done

JMS Source

JMS 源从 JMS 目标(例如队列或主题)读取消息。作为 JMS 应用程序,它可以与任何 JMS 提供程序一起使用,但仅经过 ActiveMQ 的测试。 JMS 源提供可配置的批处理大小,消息 selectors,用户/传递以及消息到 Sink 事件转换器。请注意,应使用 plugins.d 目录(首选),命令行上的–classpath 或通过 flume-env.sh 中的 FLUME_CLASSPATH 变量,将供应商提供的 JMS jar 包含在 FlumeClasspath 中。

必填属性以**表示。

Property NameDefaultDescription
channels
type组件类型名称,必须为jms
initialContextFactory初始上下文工厂,例如:org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactory连接工厂的 JNDI 名称应显示为
providerURLJMS 提供者 URL
destinationNameDestination name
destinationType目标类型(队列或主题)
messageSelector创建使用者时要使用的消息 selectors
userName目的地/提供者的用户名
passwordFile包含目的地/提供者密码的文件
batchSize100一批消耗的消息数
converter.typeDEFAULT用于将消息转换为 Sink 事件的类。见下文。
converter.*Converter properties.
converter.charsetUTF-8仅默认转换器。将 JMS TextMessages 转换为字节数组时使用的字符集。
createDurableSubscriptionfalse是否创建持久订阅。持久预订只能与 destinationType 主题一起使用。如果为 true,则必须指定“ clientId”和“ durableSubscriptionName”。
clientId创建连接后立即在 Connection 上设置的 JMSClient 端标识符。持久订阅必需。
durableSubscriptionName用于标识持久订阅的名称。持久订阅必需。
JMS 消息转换器

JMS 源允许可插拔转换器,尽管默认转换器可能会在大多数情况下工作。默认的转换器能够将字节,文本和对象消息转换为 FlumeEvents。在所有情况下,消息中的属性都作为 Headers 添加到 FlumeEvent。

  • BytesMessage:
Bytes of message are copied to body of the FlumeEvent. Cannot convert more than 2GB of data per message.
  • TextMessage:
Text of message is converted to a byte array and copied to the body of the FlumeEvent. The default converter uses UTF-8 by default but this is configurable.
  • ObjectMessage:
Object is written out to a ByteArrayOutputStream wrapped in an ObjectOutputStream and the resulting array is copied to the body of the FlumeEvent.

名为 a1 的代理的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE
SSL 和 JMS 来源

JMSClient 端实现通常支持通过 JSSE(Java 安全套接字扩展)定义的某些 Java 系统属性来配置 SSL/TLS。为 Flume 的 JVM 指定这些系统属性后,JMS Source(或更确切地说是 JMS Source 使用的 JMSClient 端实现)可以通过 SSL 连接到 JMS 服务器(当然,只有在 JMS 服务器也已设置为使用 SSL 的情况下)。它应该与任何 JMS 提供程序一起使用,并且已经过 ActiveMQ,IBM MQ 和 Oracle WebLogic 的测试。

以下各节仅介绍 Flume 端所需的 SSL 配置步骤。您可以在 Flume Wiki 上找到有关不同 JMS 提供程序的服务器端设置的更详细说明,以及完整的工作示例。

SSL 传输/服务器身份验证:

如果 JMS 服务器使用自签名证书,或者其证书是由不受信任的 CA(例如,公司自己的 CA)签名的,则需要设置信任库(包含正确的证书)并将其传递给 Flume。可以通过全局 SSL 参数来完成。有关全局 SSL 设置的更多详细信息,请参见SSL/TLS support部分。

一些 JMS 提供程序在使用 SSL 时需要 SSL 特定的 JNDI 初始上下文工厂和/或提供程序 URL 设置(例如 ActiveMQ 使用 ssl:// URL 前缀而不是 tcp://)。在这种情况下,必须在代理配置文件中调整源属性(initialContextFactory和/或providerURL)。

Client 端证书认证(双向 SSL):

JMS Source 可以通过 Client 端证书身份验证而不是通常的用户名/密码登录(使用 SSL 并将 JMS 服务器配置为接受这种身份验证)通过 JMS 服务器进行身份验证。

需要再次通过全局 SSL 参数配置包含用于身份验证的 Flume 密钥的密钥库。有关全局 SSL 设置的更多详细信息,请参见SSL/TLS support部分。

密钥库应仅包含一个密钥(如果存在多个密钥,则将使用第一个密钥)。密钥密码必须与密钥库密码相同。

如果进行 Client 端证书认证,则无需在 Flume 代理配置文件中为 JMS 源指定userName/passwordFile属性。

Please note:

与其他组件不同,JMS Source 没有组件级配置参数。也没有启用 SSL 标志。 SSL 设置由 JNDI /提供程序 URL 设置(最终是 JMS 服务器设置)以及是否存在信任库/密钥库控制。

假脱机目录源

此源使您可以通过将要摄取的文件放入磁盘上的“假脱机”目录来摄取数据。该源将监视指定目录中的新文件,并从出现的新文件中解析事件。事件解析逻辑是可插入的。在将给定文件完全读入通道后,默认情况下将通过重命名文件来指示完成,或者可以将其删除或使用 trackerDir 跟踪已处理文件。

与 Exec 源不同,此源是可靠的,即使 Flume 重新启动或终止,它也不会丢失数据。为了获得这种可靠性,只能将不可变的唯一命名的文件拖放到假脱机目录中。 Flume 尝试检测这些问题情况,如果被违反,将大声失败:

  • 如果将文件放入假脱机目录后写入文件,Flume 将在其日志文件中打印错误并停止处理。

  • 如果以后再使用文件名,Flume 将在其日志文件中打印错误并停止处理。

为避免上述问题,将唯一的标识符(例如时间戳)添加到日志文件名称(当它们移至假脱机目录中时)可能会很有用。

尽管有此来源的可靠性保证,但在某些情况下,如果发生某些下游故障,则事件可能会重复。这与 Flume 其他组件提供的保证是一致的。

Property NameDefaultDescription
channels
type组件类型名称必须为spooldir
spoolDir从中读取文件的目录。
fileSuffix.COMPLETED后缀以附加到完全摄取的文件
deletePolicynever何时删除完成的文件:neverimmediate
fileHeaderfalse是否添加存储绝对路径文件名的 Headers。
fileHeaderKeyfile将绝对路径文件名附加到事件 Headers 时使用的 Headers 键。
basenameHeaderfalse是否添加存储文件基本名称的头。
basenameHeaderKeybasename将文件的基本名称附加到事件 Headers 时使用的 Headers 密钥。
includePattern^.*$正则表达式,指定要包含的文件。它可以与ignorePattern一起使用。如果文件同时匹配ignorePatternincludePattern正则表达式,则忽略该文件。
ignorePattern^$指定要忽略(跳过)的文件的正则表达式。它可以与includePattern一起使用。如果文件同时匹配ignorePatternincludePattern正则表达式,则忽略该文件。
trackerDir.flumespool用于存储与文件处理有关的元数据的目录。如果此路径不是绝对路径,则将其解释为相对于 spoolDir 的相对路径。
trackingPolicyrename跟踪策略定义了如何跟踪文件处理。它可以是“重命名”或“ tracker_dir”。仅当 deletePolicy 为“从不”时,此参数才有效。 “重命名”-处理文件后,将根据 fileSuffix 参数对其重命名。 “ tracker_dir”-文件未重命名,但在 trackerDir 中创建了一个新的空文件。新的跟踪器文件名是从摄取的文件名加上 fileSuffix 派生的。
consumeOrderoldest假脱机目录中的文件将按oldestyoungestrandom的 Sequences 使用。在oldestyoungest的情况下,文件的最后修改时间将用于比较文件。如果是平局,则将首先使用词典 Sequences 最小的文件。如果是random,则将随机选择任何文件。当使用oldestyoungest时,将扫描整个目录以选择最旧/最旧的文件,如果有大量文件,这可能会很慢;而使用random则可能导致旧文件被消耗得很晚(如果新文件不断进入)假脱机目录。
pollDelay500轮询新文件时使用的延迟(以毫秒为单位)。
recursiveDirectorySearchfalse是否监视子目录以读取新文件。
maxBackoff4000如果通道已满,则在两次连续尝试写入通道之间 await 的最长时间(以毫秒为单位)。源将从低退避开始,并在每次通道抛出 ChannelException 时以指数方式增加,直到此参数指定的值。
batchSize100批量转移到 Channel 的粒度
inputCharsetUTF-8解串器使用的字符集,将 Importing 文件视为文本。
decodeErrorPolicyFAIL当我们在 Importing 文件中看到不可解码的字符时该怎么办。 FAIL:引发异常而无法解析文件。 REPLACE:将不可解析的字符替换为“替换字符” char,通常为 Unicode U FFFD。 IGNORE:删除不可解析的字符序列。
deserializerLINE指定用于将文件解析为事件的解串器。默认情况下将每行解析为一个事件。指定的类必须实现EventDeserializer\.Builder
deserializer.* 因事件解串器而异。
bufferMaxLines(取消)此选项现在被忽略。
bufferMaxLineLength5000(不建议使用)提交缓冲区中一行的最大长度。请改用 deserializer.maxLineLength。
selector.typereplicating复制或多路复用
selector.* 取决于 selector.type 值
interceptors以空格分隔的拦截器列表
interceptors.*

名为 agent-1 的代理的示例:

a1.channels = ch-1
a1.sources = src-1

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
Event Deserializers

Flume 随附以下事件反序列化器。

LINE

该反序列化器每行文本 Importing 生成一个事件。

Property NameDefaultDescription
deserializer.maxLineLength2048单个事件中包含的最大字符数。如果一行超出此长度,则会被截断,并且该行中的其余字符将出现在后续事件中。
deserializer.outputCharsetUTF-8字符集,用于编码放入通道的事件。
AVRO

该解串器能够读取 Avro 容器文件,并且该文件中的每个 Avro 记录都会生成一个事件。每个事件都有一个 Headers,用于指示所使用的架构。事件的主体是二进制 Avro 记录数据,不包括架构或其余的容器文件元素。

请注意,如果假脱机目录源必须重试将这些事件之一放到通道上(例如,因为通道已满),则它将重置并从最新的 Avro 容器文件同步点重试。为了减少在这种故障情况下潜在的事件重复,请在您的 AvroImporting 文件中更频繁地写入同步标记。

Property NameDefaultDescription
deserializer.schemaTypeHASH模式的表示方式。默认情况下,或者指定值HASH时,将对 Avro 模式进行哈希处理,并将哈希存储在事件 Headers“ flume.avro.schema.hash”中的每个事件中。如果指定LITERAL,则 JSON 编码的架构本身将存储在事件 Headers“ flume.avro.schema.literal”中的每个事件中。与HASH模式相比,使用LITERAL模式效率相对较低。
BlobDeserializer

该解串器每个事件读取一个二进制大对象(BLOB),通常每个文件读取一个 BLOB。例如 PDF 或 JPG 文件。请注意,此方法不适用于非常大的对象,因为整个 BLOB 都缓存在 RAM 中。

Property NameDefaultDescription
deserializer此类的 FQCN:org\.apache\.flume\.sink\.solr\.morphline\.BlobDeserializer$Builder
deserializer.maxBlobLength100000000给定请求读取和缓冲的最大字节数

Taildir Source

Note

此来源是作为预览功能提供的.在 Windows 上不起作用.

观察指定的文件,并在检测到新行添加到每个文件后,几乎实时地尾随它们。如果正在写入新行,则此源将重试读取它们,以 await 写入完成。

此源是可靠的,即使拖尾文件旋转也不会丢失数据。它定期以 JSON 格式将每个文件的最后读取位置写入给定位置文件。如果 Flume 由于某种原因停止或停机,它可以从写入现有位置文件中的位置重新开始拖尾。

在其他用例中,此源也可以使用给定位置文件从每个文件的任意位置开始拖尾。当指定路径上没有位置文件时,默认情况下它将从每个文件的第一行开始拖尾。

文件将按照其修改时间 Sequences 被消耗。修改时间最早的文件将首先被使用。

该来源不会重命名或删除或对尾文件进行任何修改。当前,此源不支持尾随二进制文件。它逐行读取文本文件。

Property NameDefaultDescription
channels
type组件类型名称必须为TAILDIR
filegroups以空格分隔的文件组列表。每个文件组指示一组要尾部的文件。
filegroups.<filegroupName>文件组的绝对路径。正则表达式(而非文件系统模式)只能用于文件名。
positionFile~/.flume/taildir_position.jsonJSON 格式的文件,用于记录每个拖尾文件的索引节点,绝对路径和最后位置。
headers.<filegroupName>.<headerKey>Headers 值是使用 Headers 键设置的值。可以为一个文件组指定多个标题。
byteOffsetHeaderfalse是否将尾线的字节偏移量添加到称为“ byteoffset”的 Headers 中。
skipToEndfalse如果文件未写入位置文件,是否跳过位置至 EOF。
idleTimeout120000关闭不活动文件的时间(毫秒)。如果关闭的文件后附加新行,此源将自动重新打开它。
writePosInterval3000将每个文件的最后位置写入位置文件的间隔时间(毫秒)。
batchSize100一次读取并发送到通道的最大行数。通常使用默认值就可以了。
maxBatchCountLong.MAX_VALUE控制从同一文件连续读取的批次数。如果源文件拖尾多个文件,并且其中一个文件的写入速度很快,则它会阻止其他文件被处理,因为繁忙文件将被无休止地读取。在这种情况下,请降低此值。
backoffSleepIncrement1000当最后一次尝试未找到任何新数据时,重新尝试轮询新数据之前的时间延迟增量。
maxBackoffSleep5000当最后一次尝试未找到任何新数据时,每次重新尝试轮询新数据之间的最大时间延迟。
cachePatternMatchingtrue对于包含数千个文件的目录,列出目录并应用文件名 regex 模式可能会很耗时。缓存匹配文件列表可以提高性能。文件的使用 Sequences 也将被缓存。要求文件系统以至少 1 秒的粒度跟踪修改时间。
fileHeaderfalse是否添加存储绝对路径文件名的 Headers。
fileHeaderKeyfile将绝对路径文件名附加到事件 Headers 时使用的 Headers 键。

名为 a1 的代理的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000

Twitter 1%的水喉源(实验)

Warning

此来源具有很高的实验性,在次要版本的 Flume 之间可能会有所变化。使用风险自负。

实验源通过 Streaming API 连接到 1%的 samplestwitter firehose,连续下载推文,将其转换为 Avro 格式,并将 Avro 事件发送到下游 Flume 接收器。需要使用者并访问 Twitter 开发人员帐户的令牌和机密。必填属性以**表示。

Property NameDefaultDescription
channels
type组件类型名称,必须为org\.apache\.flume\.source\.twitter\.TwitterSource
consumerKeyOAuth 使用者金钥
consumerSecretOAuthConsumerSecret
accessTokenOAuth 访问令牌
accessTokenSecretOAuth 令牌机密
maxBatchSize1000批量放入 Twitter 消息的最大数量
maxBatchDurationMillis1000关闭批次之前要 await 的最大毫秒数

名为 a1 的代理的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource
a1.sources.r1.channels = c1
a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY
a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN
a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
a1.sources.r1.maxBatchSize = 10
a1.sources.r1.maxBatchDurationMillis = 200

Kafka Source

Kafka Source 是 Apache Kafka 使用者,可从 Kafka 主题读取消息。如果您有多个运行的 Kafka 源,则可以为它们配置相同的 Consumer Group,以便每个源都可以读取主题的唯一分区集。当前支持 Kafka 服务器版本 0.10.1.0 或更高。测试已完成至 2.0.1,这是发行时的最高可用版本。

Property NameDefaultDescription
channels
type组件类型名称,必须为org\.apache\.flume\.source\.kafka\.KafkaSource
kafka.bootstrap.servers来源使用的 Kafka 集群中的 broker 列表
kafka.consumer.group.idflume唯一识别的消费群体。在多个来源或代理中设置相同的 ID 表示它们属于同一使用者组
kafka.topicsKafka 使用者将从中读取消息的主题的逗号分隔列表。
kafka.topics.regex正则表达式,用于定义订阅源的主题集。此属性的优先级高于kafka\.topics,并且优先级kafka\.topics(如果存在)。
batchSize1000批量写入通道的最大消息数
batchDurationMillis1000将批处理写入通道之前的最长时间(以毫秒为单位)每当达到大小和时间的第一个时,就会写入批处理。
backoffSleepIncrement1000当 Kafka 主题为空时触发的初始和增量 await 时间。await 期将减少对空的 Kafka 主题的主动 ping 操作。一秒钟是摄取用例的理想选择,但对于使用拦截器的低延迟操作,可能需要较低的值。
maxBackoffSleep5000当 Kafka 主题为空时触发的最大 await 时间。五秒钟是摄取用例的理想选择,但对于使用拦截器的低延迟操作,可能需要较低的值。
useFlumeEventFormatfalse默认情况下,事件以字节为单位从 Kafka 主题直接进入事件主体。设置为 true 以读取 Flume Avro 二进制格式的事件。与 KafkaSink 上的相同属性或 Kafka Channel 上的 parseAsFlumeEvent 属性结合使用,将保留在生产端发送的所有 FlumeHeaders。
setTopicHeadertrue设置为 true 时,将检索到的消息的主题存储到由topicHeader属性定义的 Headers 中。
topicHeadertopic如果setTopicHeader属性设置为true,则定义标题的名称,在该标题中存储接收消息的主题的名称。如果与 Kafka Sink topicHeader属性结合使用,则应格外小心,以免将消息循环发送回同一主题。
kafka.consumer.security.protocolPLAINTEXT如果使用某种安全级别写入 Kafka,则设置为 SASL_PLAINTEXT,SASL_SSL 或 SSL。有关安全设置的其他信息,请参见下文。
更多 Consumer 保障道具 如果使用 SASL_PLAINTEXT,则 SASL_SSL 或 SSL 请参考Kafka security以获取需要在使用者上设置的其他属性。
其他 Kafka 消费 Property这些属性用于配置 Kafka Consumer。可以使用 Kafka 支持的任何 Consumer 财产。唯一的要求是在属性名称前添加前缀kafka\.consumer。例如:kafka\.consumer\.auto\.offset\.reset

Note

Kafka 来源会覆盖两个 Kafka 使用者参数:来源将 auto.commit.enable 设置为“ false”,并且每个批次都已提交。 Kafka 源保证至少一次消息检索策略。源启动时可以显示重复项。 Kafka 源代码还提供了 key.deserializer(org.apache.kafka.common.serialization.StringSerializer)和 value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值。不建议修改这些参数。

Deprecated Properties

Property NameDefaultDescription
topicUse kafka.topics
groupIdflumeUse kafka.consumer.group.id
zookeeperConnect从 0.9.x 开始,kafkaConsumerClient 端不再支持该版本。使用 kafka.bootstrap.servers 与 kafka 集群构建连接
migrateZookeeperOffsetstrue如果找不到 Kafka 存储的偏移量,请在 Zookeeper 中查找偏移量并将其提交给 Kafka。为支持从旧版 Flume 进行无缝 KafkaClient 端迁移,这应该是正确的。迁移后,可以将其设置为 false,尽管通常不需要这样做。如果未找到 Zookeeper 偏移,则 Kafka 配置 kafka.consumer.auto.offset.reset 定义如何处理偏移。检查Kafka documentation以获取详细信息

通过逗号分隔主题列表进行主题订阅的示例。

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id

正则表达式订阅主题的示例

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used

安全性和 Kafka 来源:

Flume 和 Kafka 之间的通信通道支持安全身份验证以及数据加密。对于安全身份验证,可以从 Kafka 0.9.0 版开始使用 SASL/GSSAPI(Kerberos V5)或 SSL(即使该参数名为 SSL,但实际协议是 TLS 实现)。

到目前为止,数据加密仅由 SSL/TLS 提供。

kafka\.consumer\.security\.protocol设置为以下任意值表示:

  • SASL_PLAINTEXT -Kerberos 或纯文本身份验证,无数据加密

  • SASL_SSL -带有数据加密的 Kerberos 或纯文本身份验证

  • SSL -基于 TLS 的加密,带有可选身份验证。

Warning

启用 SSL 时,性能会降低,幅度取决于 CPU 类型和 JVM 实现。参考:Kafka 安全概述和用于跟踪此问题的吉拉:KAFKA-2561

TLS 和 Kafka 资料来源:

请阅读配置 KafkaClient 端 SSL中描述的步骤,以了解用于微调的其他配置设置,例如以下任意项:安全提供程序,密码套件,已启用的协议,信任库或密钥库类型。

服务器端身份验证和数据加密的示例配置。

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>

在此指定信任库是可选的,可以改为使用全局信任库。有关全局 SSL 设置的更多详细信息,请参见SSL/TLS support部分。

注意:默认情况下,未定义属性ssl\.endpoint\.identification\.algorithm,因此不执行主机名验证。为了启用主机名验证,请设置以下属性

a1.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS

启用后,Client 端将根据以下两个字段之一验证服务器的标准域名(FQDN):

如果还需要 Client 端身份验证,则还需要在 Flume 代理配置中添加以下内容,或者可以使用全局 SSL 设置(请参阅SSL/TLS support部分)。每个 Flume 代理必须拥有其 Client 证书,该证书必须由 Kafkabroker 单独或通过其签名链来信任。常见的示例是通过单个根 CA 对每个 Client 端证书进行签名,而根 CA 又会受到 Kafkabroker 的信任。

# optional, the global keystore can be used alternatively
a1.sources.source1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
a1.sources.source1.kafka.consumer.ssl.keystore.password=<password to access the keystore>

如果密钥库和密钥使用不同的密码保护,则ssl\.key\.password属性将为两个使用者密钥库提供所需的其他机密:

a1.sources.source1.kafka.consumer.ssl.key.password=<password to access the key>

Kerberos 和 Kafka 来源:

要将 Kafka 源与受 Kerberos 保护的 Kafka 群集一起使用,请为 Consumer 设置上述consumer\.security\.protocol属性。在 JAAS 文件的“ KafkaClient”部分中指定了与 Kafka 代理一起使用的 Kerberos 密钥表和主体。如果需要,“Client 端”部分介绍了 Zookeeper 连接。有关 JAAS 文件内容的信息,请参见Kafka doc。可以通过 flume-env.sh 中的 JAVA_OPTS 来指定此 JAAS 文件的位置以及系统范围内的 kerberos 配置:

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

使用 SASL_PLAINTEXT 的示例安全配置:

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka

使用 SASL_SSL 的示例安全配置:

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>

samplesJAAS 文件。有关其内容的参考,请参阅SASL configuration的 Kafka 文档中所需身份验证机制(GSSAPI/PLAIN)的 Client 端配置部分。由于 Kafka Source 也可以连接到 Zookeeper 进行偏移量迁移,因此在此示例中还添加了“Client 端”部分。除非需要偏移量迁移,或者其他安全组件需要此部分,否则将不需要这样做。另外,请确保 Flume 进程的 os 用户具有 jaas 和 keytab 文件的读取特权。

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

NetCat TCP 源

类似于 netcat 的源,它在给定的端口上侦听并将文本的每一行转换为一个事件。行为类似于nc \-k \-l \[host\] \[port\]。换句话说,它打开指定的端口并侦听数据。期望提供的数据是换行符分隔的文本。每一行文本都变成 Flume 事件,并通过连接的通道发送。

必填属性以**表示。

Property NameDefaultDescription
channels
type组件类型名称,必须为netcat
bind要绑定的主机名或 IP 地址
port要绑定的端口号
max-line-length512每个事件正文的最大行长(以字节为单位)
ack-every-eventtrue对收到的每个事件做出“ OK”响应
selector.typereplicating复制或多路复用
selector.* 取决于 selector.type 值
interceptors以空格分隔的拦截器列表
interceptors.*

名为 a1 的代理的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1

NetCat UDP 源

按照原始的 Netcat(TCP)源,此源在给定的端口上侦听并将文本的每一行转换为一个事件,并通过连接的通道发送。行为类似于nc \-u \-k \-l \[host\] \[port\]

必填属性以**表示。

Property NameDefaultDescription
channels
type组件类型名称,必须为netcatudp
bind要绑定的主机名或 IP 地址
port要绑定的端口号
remoteAddressHeader
selector.typereplicating复制或多路复用
selector.* 取决于 selector.type 值
interceptors以空格分隔的拦截器列表
interceptors.*

名为 a1 的代理的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcatudp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1

序列生成器源

一个简单的序列生成器,它使用从 0 开始,递增 1 并在 totalEvents 处停止的计数器连续生成事件。在无法将事件发送到 Channels 时重试。主要用于测试。重试期间,它使重试邮件的正文与以前相同,因此,在目标重复数据删除之后,唯一事件的数量预计应等于指定的totalEvents。必填属性以**表示。

Property NameDefaultDescription
channels
type组件类型名称,必须为seq
selector.type 复制或多路复用
selector.*replicating取决于 selector.type 值
interceptors以空格分隔的拦截器列表
interceptors.*
batchSize1每个请求循环尝试处理的事件数。
totalEventsLong.MAX_VALUE源发送的唯一事件数。

名为 a1 的代理的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1

Syslog Sources

读取系统日志数据并生成 Flume 事件。 UDP 源将整个消息视为单个事件。 TCP 源为每个由换行符('n')分隔的字符串创建一个新事件。

必填属性以**表示。

Syslog TCP 源

原始且经过验证的 syslog TCP 源。

Property NameDefaultDescription
channels
type组件类型名称,必须为syslogtcp
host要绑定的主机名或 IP 地址
port要绑定的端口号
eventSize2500单个事件行的最大大小,以字节为单位
keepFieldsnone将此设置为“ all”将在事件正文中保留 Priority,Timestamp 和 Hostname。也允许使用空格分隔的要包含的字段列表。当前,可以包括以下字段:优先级,版本,时间戳,主机名。不赞成使用值“ true”和“ false”,而赞成“ all”和“ none”。
clientIPHeader如果指定,则 Client 端的 IP 地址将使用此处指定的 Headers 名称存储在每个事件的 Headers 中。这允许拦截器和通道 selectors 根据 Client 端的 IP 地址自定义路由逻辑。不要在此处使用标准的 SyslogHeaders 名称(例如_host_),因为在这种情况下,事件 Headers 将被覆盖。
clientHostnameHeader如果指定,Client 端的主机名将使用此处指定的 Headers 名称存储在每个事件的 Headers 中。这允许拦截器和通道 selectors 根据 Client 端的主机名自定义路由逻辑。检索主机名可能涉及名称服务反向查找,这可能会影响性能。不要在此处使用标准的 SyslogHeaders 名称(例如_host_),因为在这种情况下,事件 Headers 将被覆盖。
selector.type 复制或多路复用
selector.*replicating取决于 selector.type 值
interceptors以空格分隔的拦截器列表
interceptors.*
sslfalse将此设置为 true 可启用 SSL 加密。如果启用了 SSL,则还必须通过组件级别参数(请参见下文)或作为全局 SSL 参数(请参见SSL/TLS support部分)指定“密钥库”和“密钥库密码”。
keystore这是 Java 密钥库文件的路径。如果在此未指定,则将使用全局密钥库(如果已定义,否则配置错误)。
keystore-passwordJava 密钥库的密码。如果此处未指定,则将使用全局密钥库密码(如果已定义,否则配置错误)。
keystore-typeJKSJava 密钥库的类型。可以是“ JKS”或“ PKCS12”。如果此处未指定,则将使用全局密钥库类型(如果已定义,否则默认为 JKS)。
exclude-protocolsSSLv3要排除的 SSL/TLS 协议的空格分隔列表。除了指定的协议之外,还将始终排除 SSLv3.
include-protocols要包括的 SSL/TLS 协议的空格分隔列表。启用的协议将是包含的协议,而没有排除的协议。如果 included-protocols 为空,则包含所有受支持的协议。
exclude-cipher-suites以空格分隔的要排除的密码套件列表。
include-cipher-suites要包括的密码套件的空格分隔列表。启用的密码套件将是包含的密码套件,而没有排除的密码套件。如果 included-cipher-suites 为空,则包括所有受支持的密码套件。

例如,名为 a1 的代理的 syslog TCP 源:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
多端口 Syslog TCP 源

这是 Syslog TCP 源的更新,更快,具有多端口功能的版本。请注意,ports配置设置已替换port。多端口功能意味着它可以高效地一次侦听多个端口。此源使用 Apache Mina 库来执行此操作。提供对 RFC-3164 和许多常见 RFC-5424 格式的消息的支持。还提供了配置每个端口使用的字符集的功能。

Property NameDefaultDescription
channels
type组件类型名称,必须为multiport_syslogtcp
host要绑定的主机名或 IP 地址。
ports要绑定的端口的空格分隔列表(一个或多个)。
eventSize2500单个事件行的最大大小,以字节为单位。
keepFieldsnone将此设置为“ all”将在事件正文中保留 Priority,Timestamp 和 Hostname。也允许使用空格分隔的要包含的字段列表。当前,可以包括以下字段:优先级,版本,时间戳,主机名。不赞成使用值“ true”和“ false”,而赞成“ all”和“ none”。
portHeader如果指定,端口号将使用此处指定的标题名称存储在每个事件的标题中。这允许拦截器和通道 selectors 根据传入端口自定义路由逻辑。
clientIPHeader如果指定,则 Client 端的 IP 地址将使用此处指定的 Headers 名称存储在每个事件的 Headers 中。这允许拦截器和通道 selectors 根据 Client 端的 IP 地址自定义路由逻辑。不要在此处使用标准的 SyslogHeaders 名称(例如_host_),因为在这种情况下,事件 Headers 将被覆盖。
clientHostnameHeader如果指定,Client 端的主机名将使用此处指定的 Headers 名称存储在每个事件的 Headers 中。这允许拦截器和通道 selectors 根据 Client 端的主机名自定义路由逻辑。检索主机名可能涉及名称服务反向查找,这可能会影响性能。不要在此处使用标准的 SyslogHeaders 名称(例如_host_),因为在这种情况下,事件 Headers 将被覆盖。
charset.defaultUTF-8将系统日志事件解析为字符串时使用的默认字符集。
charset.port.<port>字符集可基于每个端口进行配置。
batchSize100每个请求循环尝试处理的最大事件数。通常使用默认值就可以了。
readBufferSize1024内部 Mina 读取缓冲区的大小。提供性能调整。通常使用默认值就可以了。
numProcessors(auto-detected)系统上可供处理消息时使用的处理器数量。默认是使用 Java Runtime API 自动检测 CPU 数量。 Mina 将为每个检测到的 CPU 产生 2 个请求处理线程,这通常是合理的。
selector.typereplicating复制,多路复用或自定义
selector.*取决于selector\.type
interceptors以空格分隔的拦截器列表。
interceptors.*
sslfalse将此设置为 true 可启用 SSL 加密。如果启用了 SSL,则还必须通过组件级别参数(请参见下文)或作为全局 SSL 参数(请参见SSL/TLS support部分)指定“密钥库”和“密钥库密码”。
keystore这是 Java 密钥库文件的路径。如果在此未指定,则将使用全局密钥库(如果已定义,否则配置错误)。
keystore-passwordJava 密钥库的密码。如果此处未指定,则将使用全局密钥库密码(如果已定义,否则配置错误)。
keystore-typeJKSJava 密钥库的类型。可以是“ JKS”或“ PKCS12”。如果此处未指定,则将使用全局密钥库类型(如果已定义,否则默认为 JKS)。
exclude-protocolsSSLv3要排除的 SSL/TLS 协议的空格分隔列表。除了指定的协议之外,还将始终排除 SSLv3.
include-protocols要包括的 SSL/TLS 协议的空格分隔列表。启用的协议将是包含的协议,而没有排除的协议。如果 included-protocols 为空,则包含所有受支持的协议。
exclude-cipher-suites以空格分隔的要排除的密码套件列表。
include-cipher-suites要包括的密码套件的空格分隔列表。启用的密码套件将是包含的密码套件,而没有排除的密码套件。如果 included-cipher-suites 为空,则包括所有受支持的密码套件。

例如,名为 a1 的代理的多端口 syslog TCP 源:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.channels = c1
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.ports = 10001 10002 10003
a1.sources.r1.portHeader = port
Syslog UDP 源
Property NameDefaultDescription
channels
type组件类型名称,必须为syslogudp
host要绑定的主机名或 IP 地址
port要绑定的端口号
keepFieldsfalse将此设置为 true 将在事件正文中保留 Priority,Timestamp 和 Hostname。
clientIPHeader如果指定,则 Client 端的 IP 地址将使用此处指定的 Headers 名称存储在每个事件的 Headers 中。这允许拦截器和通道 selectors 根据 Client 端的 IP 地址自定义路由逻辑。不要在此处使用标准的 SyslogHeaders 名称(例如_host_),因为在这种情况下,事件 Headers 将被覆盖。
clientHostnameHeader如果指定,Client 端的主机名将使用此处指定的 Headers 名称存储在每个事件的 Headers 中。这允许拦截器和通道 selectors 根据 Client 端的主机名自定义路由逻辑。检索主机名可能涉及名称服务反向查找,这可能会影响性能。不要在此处使用标准的 SyslogHeaders 名称(例如_host_),因为在这种情况下,事件 Headers 将被覆盖。
selector.type 复制或多路复用
selector.*replicating取决于 selector.type 值
interceptors以空格分隔的拦截器列表
interceptors.*

例如,名为 a1 的代理的系统日志 UDP 源:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

HTTP Source

通过 HTTP POST 和 GET 接受 Flume Events 的源。 GET 应该仅用于实验。可插拔的“处理程序”将 HTTP 请求转换为 Sink 事件,该处理程序必须实现 HTTPSourceHandler 接口。该处理程序接受 HttpServletRequest 并返回 Sink 事件列表。从一个 Http 请求处理的所有事件都通过一次事务提交给通道,从而提高了文件通道等通道的效率。如果处理程序引发异常,则此源将返回 HTTP 状态 400.如果通道已满,或者该源无法将事件追加到该通道,则该源将返回 HTTP 503-临时不可用状态。

在一个发布请求中发送的所有事件均被视为一批,并在一项事务中插入到通道中。

此源基于 Jetty 9.4,并提供了设置其他特定于 Jetty 的参数的功能,这些参数将直接传递给 Jetty 组件。

Property NameDefaultDescription
type 组件类型名称,必须为http
port源应绑定到的端口。
bind0.0.0.0要监听的主机名或 IP 地址
handlerorg\.apache\.flume\.source\.http\.JSONHandler处理程序类的 FQCN。
handler.*处理程序的配置参数
selector.typereplicating复制或多路复用
selector.* 取决于 selector.type 值
interceptors以空格分隔的拦截器列表
interceptors.*
sslfalse将属性设置为 true,以启用 SSL。 * HTTP 源不支持 SSLv3.*
exclude-protocolsSSLv3要排除的 SSL/TLS 协议的空格分隔列表。除了指定的协议之外,还将始终排除 SSLv3.
include-protocols要包括的 SSL/TLS 协议的空格分隔列表。启用的协议将是包含的协议,而没有排除的协议。如果 included-protocols 为空,则包含所有受支持的协议。
exclude-cipher-suites以空格分隔的要排除的密码套件列表。
include-cipher-suites要包括的密码套件的空格分隔列表。启用的密码套件将是包含的密码套件,而没有排除的密码套件。
keystore 密钥库的位置,包括密钥库文件名。如果启用了 SSL 但未在此处指定密钥库,则将使用全局密钥库(如果已定义,否则配置错误)。
keystore-password 密钥库密码。如果启用了 SSL,但未在此处指定密钥库密码,则将使用全局密钥库密码(如果已定义,否则配置错误)。
keystore-typeJKS密钥库类型。可以是“ JKS”或“ PKCS12”。
QueuedThreadPool.* 要在 org.eclipse.jetty.util.thread.QueuedThreadPool 上设置的 Jetty 特定设置。 N.B.仅当设置了此类的至少一个属性时,才使用 QueuedThreadPool。
HttpConfiguration.* 在 org.eclipse.jetty.server.HttpConfiguration 上设置的 Jetty 特定设置
SslContextFactory.* 要在 org.eclipse.jetty.util.ssl.SslContextFactory 上设置的 Jetty 特定设置(仅在* ssl *设置为 true 时适用)。
ServerConnector.* 在 org.eclipse.jetty.server.ServerConnector 上设置的 Jetty 特定设置

Deprecated Properties

Property NameDefaultDescription
keystorePassword使用* keystore-password *。不推荐使用的值将被新的值覆盖。
excludeProtocolsSSLv3使用* exclude-protocols *。不推荐使用的值将被新的值覆盖。
enableSSLfalse使用* ssl *。不推荐使用的值将被新的值覆盖。

N.B.使用上面列出的对象上的设置方法来设置特定于 Jetty 的设置。有关完整的详细信息,请参见这些类(QueuedThreadPoolHttpConfigurationSslContextFactoryServerConnector)的 Javadoc。

当使用特定于 Jetty 的设置时,上述命名的属性将具有优先权(例如,excludeProtocols 将优先于 SslContextFactory.ExcludeProtocols)。所有属性均以小写开头。

名为 a1 的代理的示例 http 源:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props
a1.sources.r1.HttpConfiguration.sendServerVersion = false
a1.sources.r1.ServerConnector.idleTimeout = 300
JSONHandler

开箱即用的处理程序可以处理以 JSON 格式表示的事件,并支持 UTF-8,UTF-16 和 UTF-32 字符集。处理程序接受事件数组(即使只有一个事件,该事件也必须在数组中发送),并根据请求中指定的编码将其转换为 Flume 事件。如果未指定编码,则假定为 UTF-8. JSON 处理程序支持 UTF-8,UTF-16 和 UTF-32.事件表示如下。

[{
  "headers" : {
             "timestamp" : "434324343",
             "host" : "random_host.example.com"
             },
  "body" : "random_body"
  },
  {
  "headers" : {
             "namenode" : "namenode.example.com",
             "datanode" : "random_datanode.example.com"
             },
  "body" : "really_random_body"
  }]

要设置字符集,请求必须具有指定为application/json; charset=UTF\-8的 Content Type(根据需要将 UTF-8 替换为 UTF-16 或 UTF-32)。

以该处理程序期望的格式创建事件的一种方法是使用 Flume SDK 中提供的 JSONEvent,然后使用 Google Gson 使用 Gson#fromJson(Object,Type)方法创建 JSON 字符串。可以通过以下方法创建作为事件列表的此方法的第二个参数传递的类型令牌:

Type type = new TypeToken<List<JSONEvent>>() {}.getType();
BlobHandler

默认情况下,HTTPSource 将 JSONImporting 拆分为 Flume 事件。作为替代方案,BlobHandler 是 HTTPSource 的处理程序,该处理程序返回一个事件,该事件包含请求参数以及与此请求一起上载的二进制大对象(BLOB)。例如 PDF 或 JPG 文件。请注意,此方法不适用于非常大的对象,因为它会缓冲 RAM 中的整个 BLOB。

Property NameDefaultDescription
handler此类的 FQCN:org\.apache\.flume\.sink\.solr\.morphline\.BlobHandler
handler.maxBlobLength100000000给定请求读取和缓冲的最大字节数

Stress Source

StressSource 是内部负载生成源实现,对于压力测试非常有用。它允许用户使用空标题配置事件有效负载的大小。用户可以配置要发送的事件总数以及要发送的成功事件的最大数目。

必填属性以**表示。

Property NameDefaultDescription
type组件类型名称,必须为org\.apache\.flume\.source\.StressSource
size500每个事件的有效负载大小。单位: 字节
maxTotalEvents-1发送的最大事件数
maxSuccessfulEvents-1成功发送事件的最大数量
batchSize1一批发送的事件数
maxEventsPerSecond0设置为大于零的整数时,对源强制实施速率限制器。

名为 a1 的代理的示例:

a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1

Legacy Sources

旧资源允许 Flume 1.x 代理从 Flume 0.9.4 代理接收事件。它接受 Flume 0.9.4 格式的事件,将其转换为 Flume 1.0 格式,并将其存储在连接的通道中。 0.9.4 事件属性(例如时间戳,pri,host,nano 等)将转换为 1.x 事件 Headers 属性。旧版源同时支持 Avro 和 Thrift RPC 连接。要在两个 Flume 版本之间使用此 bridge,您需要使用 avroLegacy 或 thriftLegacy 源启动 Flume 1.x 代理。 0.9.4 代理应使代理接收器指向 1.x 代理的主机/端口。

Note

Flume 1.x 的可靠性语义与 Flume 0.9.x 的语义不同。旧版源不支持 Flume 0.9.x 代理的 E2E 或 DFO 模式。唯一受支持的 0.9.x 模式是尽力而为,尽管 1.x 流的可靠性设置将在事件被旧版源保存到 Flume 1.x 通道中后适用。

必填属性以**表示。

Avro 旧版来源
Property NameDefaultDescription
channels
type组件类型名称,必须为org\.apache\.flume\.source\.avroLegacy\.AvroLegacySource
host要绑定的主机名或 IP 地址
port要监听的端口号
selector.type 复制或多路复用
selector.*replicating取决于 selector.type 值
interceptors以空格分隔的拦截器列表
interceptors.*

名为 a1 的代理的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1
节俭旧版来源
Property NameDefaultDescription
channels
type组件类型名称,必须为org\.apache\.flume\.source\.thriftLegacy\.ThriftLegacySource
host要绑定的主机名或 IP 地址
port要监听的端口号
selector.type 复制或多路复用
selector.*replicating取决于 selector.type 值
interceptors以空格分隔的拦截器列表
interceptors.*

名为 a1 的代理的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1

Custom Source

定制源是您自己对 Source 接口的实现。启动 Flume 代理时,自定义源的类及其依赖项必须包含在代理的 Classpath 中。自定义来源的类型为其 FQCN。

Property NameDefaultDescription
channels
type组件类型名称,必须是您的 FQCN
selector.type replicatingmultiplexing
selector.*replicating取决于 selector.type 值
interceptors以空格分隔的拦截器列表
interceptors.*

名为 a1 的代理的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MySource
a1.sources.r1.channels = c1

Scribe Source

抄写员是另一种摄取系统。要采用现有的 Scribe 摄取系统,Flume 应该使用基于 Thrift 且兼容传输协议的 ScribeSource。对于 Scribe 的部署,请遵循 Facebook 的指南。必填属性以**表示。

Property NameDefaultDescription
type组件类型名称,必须为org\.apache\.flume\.source\.scribe\.ScribeSource
port1499Scribe 应该连接的端口
maxReadBufferBytes16384000节俭的默认 FrameBuffer 大小
workerThreads5在 Thrift 中处理线程数
selector.type
selector.*

名为 a1 的代理的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.scribe.ScribeSource
a1.sources.r1.port = 1463
a1.sources.r1.workerThreads = 5
a1.sources.r1.channels = c1

Flume Sinks

HDFS Sink

该接收器将事件写入 Hadoop 分布式文件系统(HDFS)。当前,它支持创建文本和序列文件。它支持两种文件类型的压缩。可以根据经过的时间或数据大小或事件数定期滚动文件(关闭当前文件并创建一个新文件)。它还按时间戳或事件发生的机器之类的属性对数据进行存储/分区。 HDFS 目录路径可能包含格式化转义序列,这些序列将被 HDFS 接收器取代,以生成用于存储事件的目录/文件名。使用此接收器需要安装 hadoop,以便 Flume 可以使用 Hadoop jar 与 HDFS 群集进行通信。请注意,需要支持 sync()调用的 Hadoop 版本。

以下是支持的转义序列:

AliasDescription
%{host}替换名为“ host”的事件 Headers 的值。支持任意标题名称。
%tUnix 时间(以毫秒为单位)
%a语言环境的简短工作日名称(星期一,星期二...)
%A语言环境的完整工作日名称(星期一,星期二,...)
%b语言环境的短月份名称(1 月,2 月,...)
%B语言环境的长月份名称(一月,二月,...)
%c语言环境的日期和时间(2005 年 3 月 3 日,星期四,23:05:25)
%d每月的一天(01)
%e一个月中没有填充的日期(1)
%D日期;与%m /%d /%y 相同
%Hhour (00..23)
%Ihour (01..12)
%j一年中的一天(001..366)
%k小时(0..23)
%mmonth (01..12)
%n一个月不填充(1..12)
%Mminute (00..59)
%p相当于上午或下午的语言环境
%s自 1970-01-01 00:00:00 UTC 以来的秒数
%Ssecond (00..59)
%y年份的后两位(00..99)
%Yyear (2010)
%zhhmm 数字时区(例如-0400)
%[localhost]替换运行代理程序的主机的主机名
%[IP]替换运行代理的主机的 IP 地址
%[FQDN]替换运行代理的主机的规范主机名

注意:转义字符串%[localhost],%[IP]和%[FQDN]都依赖 Java 获取主机名的能力,这在某些网络环境中可能会失败。

正在使用的文件的名称将经过修饰,以末尾包含“ .tmp”。关闭文件后,将删除此 extensions。这样可以排除目录中部分完整的文件。必填属性以**表示。

Note

对于所有与时间相关的转义序列,事件的 Headers 中必须存在带有关键字“时间戳”的 Headers(除非将hdfs\.useLocalTimeStamp设置为true)。一种自动添加此方法的方法是使用 TimestampInterceptor。

NameDefaultDescription
channel
type组件类型名称,必须为hdfs
hdfs.pathHDFS 目录路径(例如 hdfs:// namenode/flume/webdata /)
hdfs.filePrefixFlumeData由 Flume 在 hdfs 目录中创建的文件加上前缀的名称
hdfs.fileSuffix添加到文件的后缀(例如\.avro-注意:不会自动添加句点)
hdfs.inUsePrefix用于将 Sink 主动写入的临时文件的前缀
hdfs.inUseSuffix\.tmp后缀用于临时写入 Sink 的临时文件
hdfs.emptyInUseSuffixfalse如果在写入输出时使用falsehdfs\.inUseSuffix。关闭后,输出hdfs\.inUseSuffix从输出文件名中删除。如果true忽略hdfs\.inUseSuffix参数,则使用空字符串代替。
hdfs.rollInterval30滚动当前文件之前要 await 的秒数(0 =根据时间间隔从不滚动)
hdfs.rollSize1024触发滚动的文件大小,以字节为单位(0:从不基于文件大小滚动)
hdfs.rollCount10滚动之前写入文件的事件数(0 =根据事件数从不滚动)
hdfs.idleTimeout0超时,不活动的文件将被关闭(0 =禁用自动关闭空闲文件)
hdfs.batchSize100刷新到 HDFS 之前写入文件的事件数
hdfs.codeC压缩编解码器。以下之一:gzip,bzip2,lzo,lzop,snappy
hdfs.fileTypeSequenceFileFiles 格式:目前SequenceFileDataStreamCompressedStream(1)DataStream 不会压缩输出 Files,请不要设定 codeC(2)CompressedStream 需要设定 hdfs.codeC 和可用的 codeC
hdfs.maxOpenFiles5000仅允许此数量的打开文件。如果超过此数目,则关闭最旧的文件。
hdfs.minBlockReplicas指定每个 HDFS 块的最小副本数。如果未指定,则它来自 Classpath 中的默认 Hadoop 配置。
hdfs.writeFormatWritable序列文件记录的格式。 TextWritable之一。在使用 Flume 创建数据文件之前将其设置为Text,否则 Apache Impala(正在孵化)或 Apache Hive 无法读取这些文件。
hdfs.threadsPoolSize10每个 HDFS 接收器用于 HDFS IO 操作(打开,写入等)的线程数
hdfs.rollTimerPoolSize1每个 HDFS 接收器用于调度定时文件滚动的线程数
hdfs.kerberosPrincipalKerberos 用户主体,用于访问安全的 HDFS
hdfs.kerberosKeytab用于访问安全 HDFS 的 Kerberos 密钥表
hdfs.proxyUser
hdfs.roundfalse时间戳是否应四舍五入(如果为 true,则将影响所有基于时间的转义序列,但%t 除外)
hdfs.roundValue1四舍五入到此最高倍数(使用hdfs\.roundUnit配置的单位),小于当前时间。
hdfs.roundUnitsecond下舍入值的单位-secondminutehour
hdfs.timeZoneLocal Time用于解析目录路径的时区名称,例如美国/洛杉矶。
hdfs.useLocalTimeStampfalse替换转义序列时,请使用本地时间(而不是事件 Headers 中的时间戳)。
hdfs.closeTries0启动关闭尝试后,接收器必须尝试重命名文件的次数。如果设置为 1,则该接收器将不会重试失败的重命名(例如由于 NameNode 或 DataNode 故障),并且可能使文件处于打开状态,extensions 为.tmp。如果设置为 0,接收器将尝试重命名文件,直到文件最终被重命名为止(尝试的次数没有限制)。如果关闭调用失败,则文件可能仍保持打开状态,但数据将保持不变,在这种情况下,仅在 Flume 重新启动后,文件才会关闭。
hdfs.retryInterval180两次连续尝试关闭文件之间的时间(以秒为单位)。每个 close 调用都会花费多个 RPC 往返到 Namenode,因此将其设置得太低会导致 name 节点上的大量负载。如果设置为 0 或小于 0,则在第一次尝试失败时,接收器将不会尝试关闭文件,并且可能会使文件保持打开状态或 extensions 为“ .tmp”。
serializerTEXT其他可能的选项包括avro_eventEventSerializer\.Builder接口实现的完全限定的类名。
serializer.*

Deprecated Properties

名称默认说明============================================= ================================================== ======= hdfs.callTimeout 30000 HDFS 操作(例如打开,写入,刷新,关闭)允许的毫秒数。如果发生许多 HDFS 超时操作,则应增加此数字。 ================================================== ================================================== ====

名为 a1 的代理的示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

上面的配置会将时间戳四舍五入到最后 10 分钟。例如,某个时间戳为 2012 年 6 月 12 日上午 11:54:34 的事件将导致 hdfs 路径变为/flume/events/2012\-06\-12/1150/00

Hive Sink

此接收器将包含定界文本或 JSON 数据的事件直接流到 Hive 表或分区中。使用 Hive 事务编写事件。一旦将一组事件提交给 Hive,它们便立即对 Hive 查询可见。可以预创建将要发送到 Sink 的分区,或者如果缺少,则可以选择由 Flume 创建。来自传入事件数据的字段被 Map 到 Hive 表中的相应列。

NameDefaultDescription
channel
type组件类型名称,必须为hive
hive.metastoreHive Metastore URI(例如 thrift://a.b.com:9083)
hive.database配置单元数据库名称
hive.table蜂房表名称
hive.partition逗号分隔的分区值列表,用于标识要写入的分区。可能包含转义序列。例如:如果表格按(大陆:字符串,国家/地区:字符串,时间:字符串)划分,那么“亚洲,印度,2014-02-26-01-21”将指示大洲=亚洲,国家/地区=印度,时间= 2014 -02-26-01-21
hive.txnsPerBatchAsk100Hive 向流媒体 Client 端(例如 Flume)授予Transaction 批,而不是单个 Transaction。此设置配置每个事务批处理所需的事务数。单个批次中来自所有事务的数据最终存储在单个文件中。 Flume 将在批处理中的每个事务中最多写入一个 batchSize 事件。此设置与 batchSize 结合使用,可以控制每个文件的大小。请注意,最终,Hive 将透明地将这些文件压缩为更大的文件。
heartBeatInterval240(以秒为单位)发送给 Hive 的连续心跳之间的间隔,以防止未使用的事务过期。将此值设置为 0 以禁用心跳。
autoCreatePartitionstrueFlume 将自动创建必要的 Hive 分区以流式传输到
batchSize15000单个 Hive 事务中写入 Hive 的最大事件数
maxOpenConnections500仅允许此数量的打开连接。如果超过此数量,则关闭最近最少使用的连接。
callTimeout10000(以毫秒为单位)Hive 和 HDFS I/O 操作的超时,例如 openTxn,写入,提交,中止。
serializer 序列化器负责从事件中解析字段并将其 Map 到配置单元表中的列。串行器的选择取决于事件中数据的格式。支持的序列化器:DELIMITED 和 JSON
roundUnitminute下舍入值的单位-secondminutehour
roundValue1四舍五入到此值的最高倍数(使用 hive.roundUnit 配置的单位),小于当前时间
timeZoneLocal Time解析分区中的转义序列时应使用的时区名称,例如美国/洛杉矶。
useLocalTimeStampfalse替换转义序列时,请使用本地时间(而不是事件 Headers 中的时间戳)。

Hive 下沉提供了以下序列化器:

JSON :处理 UTF8 编码的 Json(严格语法)事件,不需要进行配置。 JSON 中的对象名称直接 Map 到 Hive 表中具有相同名称的列。内部使用 org.apache.hive.hcatalog.data.JsonSerDe,但独立于 Hive 表的 Serde。该序列化程序需要安装 HCatalog。

DELIMITED :处理简单的带分隔符的文本事件。内部使用 LazySimpleSerde,但独立于 Hive 表的 Serde。

NameDefaultDescription
serializer.delimiter,(类型:字符串)传入数据中的字段定界符。要使用特殊字符,请用双引号将它们括起来,例如“\t”
serializer.fieldnames从 Importing 字段到配置单元表中的列的 Map。指定为 Hive 表列名称的逗号分隔列表(不包含空格),以其出现的 Sequences 标识 Importing 字段。要跳过字段,请保留列名未指定。例如。 'time ,, ip,message'指示 ImportingMap 中的第一,第三和第四字段到配置单元表中的 time,ip 和 message 列。
serializer.serdeSeparatorCtrl-A(类型:字符)自定义基础 Serde 使用的分隔符。如果 serializer.fieldnames 中的字段与表列的 Sequences 相同,serializer.delimiter 与 serializer.serdeSeparator 相同,并且 serializer.fieldnames 中的字段数小于或等于表数,则可以提高效率。列,因为传入事件主体中的字段不需要重新排序以匹配表列的 Sequences。对特殊字符(如“t”)使用单引号。确保 Importing 字段不包含此字符。注意:如果 serializer.delimiter 是单个字符,则最好将其设置为相同字符

以下是支持的转义序列:

AliasDescription
%{host}替换名为“ host”的事件 Headers 的值。支持任意标题名称。
%tUnix 时间(以毫秒为单位)
%a语言环境的简短工作日名称(星期一,星期二...)
%A语言环境的完整工作日名称(星期一,星期二,...)
%b语言环境的短月份名称(1 月,2 月,...)
%B语言环境的长月份名称(一月,二月,...)
%c语言环境的日期和时间(2005 年 3 月 3 日,星期四,23:05:25)
%d每月的一天(01)
%D日期;与%m /%d /%y 相同
%Hhour (00..23)
%Ihour (01..12)
%j一年中的一天(001..366)
%k小时(0..23)
%mmonth (01..12)
%Mminute (00..59)
%p相当于上午或下午的语言环境
%s自 1970-01-01 00:00:00 UTC 以来的秒数
%Ssecond (00..59)
%y年份的后两位(00..99)
%Yyear (2010)
%zhhmm 数字时区(例如-0400)

Note

对于所有与时间相关的转义序列,事件的 Headers 中必须存在带有关键字“时间戳”的 Headers(除非将useLocalTimeStamp设置为true)。一种自动添加此方法的方法是使用 TimestampInterceptor。

Hive 表示例:

create table weblogs ( id int , msg string )
    partitioned by (continent string, country string, time string)
    clustered by (id) into 5 buckets
    stored as orc;

名为 a1 的代理的示例:

a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames =id,,msg

上面的配置会将时间戳四舍五入到最后 10 分钟。例如,某个时间戳记 Headers 设置为 2012 年 6 月 12 日上午 11:54:34,而“国家”Headers 设置为“印度”的事件将评估为该分区(大陆=“ asia”,国家=“印度”,时间='2012-06-12-11-50'。序列化器配置为接受包含三个字段的制表符分隔的 Importing,并跳过第二个字段。

Logger Sink

在 INFO 级别记录事件。通常用于测试/调试目的。必填属性以**表示。该接收器是唯一的 exception,它不需要记录原始数据部分中说明的额外配置。

Property NameDefaultDescription
channel
type组件类型名称,必须为logger
maxBytesToLog16要记录的事件正文的最大字节数

名为 a1 的代理的示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1

Avro Sink

该接收器构成了 Flume 分层收集支持的一半。发送到该接收器的 Flume 事件将转换为 Avro 事件,并发送到已配置的主机名/端口对。这些事件是从已配置的通道中以已配置的批次大小批量获取的。必填属性以**表示。

Property NameDefaultDescription
channel
type组件类型名称必须为avro
hostname要绑定的主机名或 IP 地址。
port要监听的端口号。
batch-size100要分批发送的事件数。
connect-timeout20000允许第一个(握手)请求的时间(毫秒)。
request-timeout20000在第一个请求之后允许请求的时间(毫秒)。
reset-connection-intervalnone重置与下一跳的连接之前的时间。这将迫使 Avro 接收器重新连接到下一跳。当添加新闻主机时,这将使接收器可以连接到硬件负载平衡器后面的主机,而不必重新启动代理。
compression-typenone这可以是“无”或“放气”。压缩类型必须与匹配的 AvroSource 的压缩类型匹配
compression-level6压缩事件的压缩级别。 0 =无压缩,1-9 为压缩。数字越高,压缩越多
sslfalse设置为 true 可以为此 AvroSink 启用 SSL。配置 SSL 时,可以选择设置“信任库”,“信任库密码”,“信任库类型”,并指定是否为“全部信任证书”。
trust-all-certsfalse如果将其设置为 true,将不检查远程服务器(Avro 源)的 SSL 服务器证书。这不应在 Producing 使用,因为它使攻击者更容易执行中间人攻击并“侦听”加密的连接。
truststore定制 Java 信任库文件的路径。 Flume 使用此文件中的证书颁发机构信息来确定是否应信任远程 Avro Source 的 SSL 身份验证凭据。如果未指定,则将使用全局密钥库。如果也未指定全局密钥库,那么将使用默认的 Java JSSE 证书颁发机构文件(在 Oracle JRE 中通常为“ jssecacerts”或“ cacerts”)。
truststore-password信任库的密码。如果未指定,则将使用全局密钥库密码(如果已定义)。
truststore-typeJKSJava 信任库的类型。这可以是“ JKS”或其他受支持的 Java 信任库类型。如果未指定,则将使用全局密钥库类型(如果已定义,则默认值为 JKS)。
exclude-protocolsSSLv3要排除的 SSL/TLS 协议的空格分隔列表。除了指定的协议之外,还将始终排除 SSLv3.
maxIoWorkers2 *机器中可用处理器的数量I/O 工作线程的最大数量。这是在 NettyAvroRpcClient NioClientSocketChannelFactory 上配置的。

名为 a1 的代理的示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

Thrift Sink

该接收器构成了 Flume 分层收集支持的一半。发送到该接收器的 Flume 事件将转换为 Thrift 事件,并发送到已配置的主机名/端口对。这些事件是从已配置的通道中以已配置的批次大小批量获取的。

通过启用 kerberos 身份验证,可以将节流接收器配置为以安全模式启动。为了与以安全模式启动的 Thrift 源进行通信,Thrift 接收器也应以安全模式运行。 client-principal 和 client-keytab 是 Thrift 接收器用来验证 kerberos KDC 的属性。服务器主体表示此接收器配置为以安全模式连接到的 Thrift 源的主体。必填属性以**表示。

Property NameDefaultDescription
channel
type组件类型名称必须为thrift
hostname要绑定的主机名或 IP 地址。
port要监听的端口号。
batch-size100要分批发送的事件数。
connect-timeout20000允许第一个(握手)请求的时间(毫秒)。
request-timeout20000在第一个请求之后允许请求的时间(毫秒)。
connection-reset-intervalnone重置与下一跳的连接之前的时间。这将迫使节流槽重新连接到下一跳。当添加新闻主机时,这将使接收器可以连接到硬件负载平衡器后面的主机,而不必重新启动代理。
sslfalse设置为 true 可以为此 ThriftSink 启用 SSL。配置 SSL 时,您可以选择设置“信任库”,“信任库密码”和“信任库类型”
truststore定制 Java 信任库文件的路径。 Flume 使用此文件中的证书颁发机构信息来确定是否应信任远程 Thrift Source 的 SSL 身份验证凭据。如果未指定,则将使用全局密钥库。如果也未指定全局密钥库,那么将使用默认的 Java JSSE 证书颁发机构文件(在 Oracle JRE 中通常为“ jssecacerts”或“ cacerts”)。
truststore-password信任库的密码。如果未指定,则将使用全局密钥库密码(如果已定义)。
truststore-typeJKSJava 信任库的类型。这可以是“ JKS”或其他受支持的 Java 信任库类型。如果未指定,则将使用全局密钥库类型(如果已定义,则默认值为 JKS)。
exclude-protocolsSSLv3SSL/TLS 协议用空格分隔的列表,以排除
kerberosfalse设置为 true 以启用 kerberos 身份验证。在 kerberos 模式下,成功进行身份验证和与启用了 kerberos 的 Thrift Source 进行通信需要使用以 Client 端为主体,以 client-keytab 和 server-principal 为主体。
client-principal—-Thrift Sink 使用的 kerberos 主体对 kerberos KDC 进行身份验证。
client-keytab—-Thrift Sink 与 Client 端主体结合使用的 keytab 位置,用于对 kerberos KDC 进行身份验证。
server-principalThrift Sink 配置为连接到的 Thrift Source 的 kerberos 主体。

名为 a1 的代理的示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545

IRC Sink

IRC 接收器从连接的通道获取消息,并将消息中继到已配置的 IRC 目的地。必填属性以**表示。

Property NameDefaultDescription
channel
type组件类型名称,必须为irc
hostname要连接的主机名或 IP 地址
port6667要连接的远程主机的端口号
nickNick name
userUser name
passwordUser password
chanchannel
name
splitlines(boolean)
splitcharsn行分隔符(如果要在配置文件中 Importing 默认值,则需要转义反斜杠,例如:“n”)

名为 a1 的代理的示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = irc
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = irc.yourdomain.com
a1.sinks.k1.nick = flume
a1.sinks.k1.chan = #flume

文件卷槽

将事件存储在本地文件系统上。必填属性以**表示。

Property NameDefaultDescription
channel
type组件类型名称必须为file_roll
sink.directory文件存储目录
sink.pathManagerDEFAULT要使用的 PathManager 实现。
sink.pathManager.extension如果使用默认的 PathManager,则为文件 extensions。
sink.pathManager.prefix如果使用默认的 PathManager,则添加到文件名开头的字符串
sink.rollInterval30每 30 秒滚动一次文件。指定 0 将禁用滚动,并导致将所有事件写入单个文件。
sink.serializerTEXT其他可能的选项包括avro_event或 EventSerializer.Builder 接口的实现的 FQCN。
sink.batchSize100

名为 a1 的代理的示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

Null Sink

舍弃它从通道收到的所有事件。必填属性以**表示。

Property NameDefaultDescription
channel
type组件类型名称必须为null
batchSize100

名为 a1 的代理的示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = null
a1.sinks.k1.channel = c1

HBaseSinks

HBaseSink

该接收器将数据写入 HBase。 Hbase 配置是从 Classpath 中遇到的第一个 hbase-site.xml 中提取的。由配置指定的实现 HbaseEventSerializer 的类用于将事件转换为 HBase 放置和/或增量。然后将这些推和增量写入 HBase。该接收器提供与 HBase 相同的一致性保证,HBase 当前是按行原子性。如果 Hbase 无法写入某些事件,则接收器将重播该事务中的所有事件。

HBaseSink 支持将数据写入安全的 HBase。要写入安全的 HBase,正在运行代理的用户必须对接收器配置为写入的表具有写入权限。可以在配置中指定用于对 KDC 进行身份验证的主体和密钥表。 Flume 代理的 Classpath 中的 hbase-site.xml 必须将身份验证设置为kerberos(有关如何执行此操作的详细信息,请参阅 HBase 文档)。

为了方便起见,Flume 随附了两个串行器。 SimpleHbaseEventSerializer(org.apache.flume.sink.hbase.SimpleHbaseEventSerializer)将事件主体原样写入 HBase,并可选地增加 Hbase 中的列。这主要是示例实现。 RegexHbaseEventSerializer(org.apache.flume.sink.hbase.RegexHbaseEventSerializer)根据给定的 regex 分解事件主体,并将每个部分写入不同的列。

类型是 FQCN:org.apache.flume.sink.hbase.HBaseSink。

必填属性以**表示。

Property NameDefaultDescription
channel
type组件类型名称,必须为hbase
tableHbase 中要写入的表的名称。
columnFamilyHbase 中要写入的列族。
zookeeperQuorum法定规格。这是 hbase-site.xml 中属性hbase\.zookeeper\.quorum的值
znodeParent/hbase-ROOT-区域的 znode 的基本路径。 hbase-site.xml 中的zookeeper\.znode\.parent
batchSize100每个 txn 要写入的事件数。
coalesceIncrementsfalse如果接收器合并,则每批将多个增量合并到一个单元中。如果对有限数量的单元有多个增量,这可能会提供更好的性能。
serializerorg.apache.flume.sink.hbase.SimpleHbaseEventSerializer默认增量列=“ iCol”,有效负载列=“ pCol”。
serializer.*要传递给序列化程序的属性。
kerberosPrincipalKerberos 用户主体,用于访问安全的 HBase
kerberosKeytab用于访问安全 HBase 的 Kerberos 密钥表

名为 a1 的代理的示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1
HBase2Sink

HBase2Sink 等效于 HBase 版本 2 的 HBaseSink。所提供的功能和配置参数与 HBaseSink 相同(除了接收器类型和程序包/类名称中的 hbase2 标记)。

类型是 FQCN:org.apache.flume.sink.hbase2.HBase2Sink。

必填属性以**表示。

Property NameDefaultDescription
channel
type组件类型名称,必须为hbase2
tableHBase 中要写入的表的名称。
columnFamilyHBase 中要写入的列族。
zookeeperQuorum法定规格。这是 hbase-site.xml 中属性hbase\.zookeeper\.quorum的值
znodeParent/hbase-ROOT-区域的 znode 的基本路径。 hbase-site.xml 中的zookeeper\.znode\.parent
batchSize100每个 txn 要写入的事件数。
coalesceIncrementsfalse如果接收器合并,则每批将多个增量合并到一个单元中。如果对有限数量的单元有多个增量,这可能会提供更好的性能。
serializerorg.apache.flume.sink.hbase2.SimpleHBase2EventSerializer默认增量列=“ iCol”,有效负载列=“ pCol”。
serializer.*要传递给序列化程序的属性。
kerberosPrincipalKerberos 用户主体,用于访问安全的 HBase
kerberosKeytab用于访问安全 HBase 的 Kerberos 密钥表

名为 a1 的代理的示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase2
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
a1.sinks.k1.channel = c1
AsyncHBaseSink

该接收器使用异步模型将数据写入 HBase。由配置指定的实现 AsyncHbaseEventSerializer 的类用于将事件转换为 HBase 放置和/或增量。然后将这些推和增量写入 HBase。该接收器使用Asynchbase API写入 HBase。该接收器提供与 HBase 相同的一致性保证,HBase 当前是按行原子性。如果 Hbase 无法写入某些事件,则接收器将重播该事务中的所有事件。 AsyncHBaseSink 只能与 HBase 1.x 一起使用。 AsyncHBaseSink 使用的异步 Client 端库不适用于 HBase2.类型为 FQCN:org.apache.flume.sink.hbase.AsyncHBaseSink。必填属性以**表示。

Property NameDefaultDescription
channel
type组件类型名称,必须为asynchbase
tableHbase 中要写入的表的名称。
zookeeperQuorum法定规格。这是 hbase-site.xml 中属性hbase\.zookeeper\.quorum的值
znodeParent/hbase-ROOT-区域的 znode 的基本路径。 hbase-site.xml 中的zookeeper\.znode\.parent
columnFamilyHbase 中要写入的列族。
batchSize100每个 txn 要写入的事件数。
coalesceIncrementsfalse如果接收器合并,则每批将多个增量合并到一个单元中。如果对有限数量的单元有多个增量,这可能会提供更好的性能。
timeout60000接收器 awaithbase 对事务中的所有事件进行确认的时间(以毫秒为单位)。
serializerorg.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
serializer.*要传递给序列化程序的属性。
async.*传递给 asyncHbase 库的属性。这些属性优先于旧的zookeeperQuorumznodeParent值。您可以在AsyncHBase 的文档页面找到可用属性的列表。

请注意,此接收器将在配置中获取 Zookeeper Quorum 和父级 znode 信息。 Zookeeper 仲裁和父节点配置可以在 Sink 配置文件中指定。或者,这些配置值取自 Classpath 中的第一个 hbase-site.xml 文件。

如果配置中未提供这些信息,则接收器将从 Classpath 中的第一个 hbase-site.xml 文件读取此信息。

名为 a1 的代理的示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = asynchbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
a1.sinks.k1.channel = c1

MorphlineSolrSink

该接收器从 Flume 事件中提取数据,对其进行转换,并将其几乎实时地加载到 Apache Solr 服务器中,后者再为最终用户或搜索应用程序提供查询。

该接收器非常适合将原始数据流传输到 HDFS(通过 HdfsSink)并同时提取,转换和加载相同数据(通过 MorphlineSolrSink)并将其加载到 Solr 的用例。特别是,该接收器可以处理来自不同数据源的任意异构原始数据,并将其转换为对 Search 应用程序有用的数据模型。

可使用morphline 配置文件定制 ETL 功能,该morphline 配置文件定义了一系列转换命令,这些事件将事件记录从一个命令传递到另一个命令。

Morphlines 可以看作是 Unix 管道的演进,其中数据模型被通用化以与通用记录流(包括任意二进制有效载荷)一起工作。 morphline 命令有点像 Flume Interceptor。 Morphline 可以嵌入到 Flume 等 Hadoop 组件中。

开箱即用地提供了用于解析和转换一组标准数据格式的命令,例如日志文件,Avro,CSV,文本,HTML,XML,PDF,Word,Excel 等,以及用于其他数据的其他自定义命令和解析器格式可以添加为 morphline 插件。可以为任何类型的数据格式构建索引,并且可以为任何类型的 Solr 模式生成任何 Solr 文档,并且可以注册和执行任何自定义 ETL 逻辑。

变形线操纵连续的记录流。数据模型可以描述如下:记录是一组命名字段,其中每个字段都有一个或多个值的有序列表。值可以是任何 Java 对象。即,一条记录本质上是一个哈希表,其中每个哈希表条目都包含一个 String 键和一个 Java Objects 列表作为值。 (该实现使用 Guava 的ArrayListMultimap,即ListMultimap)。请注意,一个字段可以具有多个值,并且任何两个记录都不必使用公共字段名。

该接收器将 Flume 事件的主体填充到 morphline 记录的_attachment_body字段中,并将 Flume 事件的标题复制到同名的记录字段中。然后,命令可以对该数据起作用。

支持路由到 SolrCloud 集群以提高可伸缩性。索引负载可以分布在大量 MorphlineSolrSink 上,以提高可伸缩性。索引负载可以跨多个 MorphlineSolrSink 复制,以实现高可用性,例如,使用 Flume 功能(例如负载平衡接收器处理器)。 MorphlineInterceptor 还可以帮助实现对多个 Solr 集合的动态路由(例如,用于多租户)。

您的环境所需的 morphline 和 solr jar 必须放置在 Apache Flume 安装的 lib 目录中。

类型是 FQCN:org.apache.flume.sink.solr.morphline.MorphlineSolrSink

必填属性以**表示。

Property NameDefaultDescription
channel
type组件类型名称,必须为org\.apache\.flume\.sink\.solr\.morphline\.MorphlineSolrSink
morphlineFile本地文件系统上到吗啉配置文件的相对或绝对路径。示例:/etc/flume\-ng/conf/morphline\.conf
morphlineIdnull如果在 morphline 配置文件中有多个 morphline,则用于标识 morphline 的可选名称
batchSize1000每个 Sink 事务处理要采取的最大事件数。
batchDurationMillis1000每个 Sink 事务的最大持续时间(毫秒)。在此持续时间之后或超过 batchSize 时(以先到者为准),事务将提交。
handlerClassorg.apache.flume.sink.solr.morphline.MorphlineHandlerImpl实现 org.apache.flume.sink.solr.morphline.MorphlineHandler 的类的 FQCN
isProductionModefalse应该为关键任务的大型在线生产系统启用此标志,当发生不可恢复的异常时,这些系统需要在不停机的情况下取得进展。解析器 Importing 数据损坏或格式错误,解析器错误以及与未知 Solr 架构字段相关的错误,将导致无法恢复的异常。
recoverableExceptionClassesorg.apache.solr.client.solrj.SolrServerException以逗号分隔的可恢复异常列表,这些列表往往是暂时的,在这种情况下,可以重试相应的任务。示例包括网络连接错误,超时等。当生产模式标志设置为 true 时,将不忽略使用此参数配置的可恢复异常,因此将导致重试。
isIgnoringRecoverableExceptionsfalse如果不可恢复的异常被意外误分类为可恢复,则应启用此标志。这使接收器能够取得进展并避免永远重试事件。

名为 a1 的代理的示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
a1.sinks.k1.channel = c1
a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf
# a1.sinks.k1.morphlineId = morphline1
# a1.sinks.k1.batchSize = 1000
# a1.sinks.k1.batchDurationMillis = 1000

ElasticSearchSink

该接收器将数据写入 Elasticsearch 集群。默认情况下,将写入事件,以便Kibana图形界面可以显示它们-就像logstash编写它们一样。

您的环境所需的 elasticsearch 和 lucene-core jar 必须放置在 Apache Flume 安装的 lib 目录中。 Elasticsearch 要求 Client 端 JAR 的主要版本与服务器的主要版本匹配,并且两者都运行相同的 JVM 次要版本。如果不正确,则会出现 SerializationExceptions。要选择所需的版本,首先确定 elasticsearch 的版本和目标集群正在运行的 JVM 版本。然后选择一个与主要版本匹配的 elasticsearchClient 库。 0.19.xClient 端可以与 0.19.x 群集通信; 0.20.x 可以与 0.20.x 通话,而 0.90.x 可以与 0.90.x 通话。确定了 Elasticsearch 版本后,请阅读 pom.xml 文件,以确定要使用的正确的 lucene-core JAR 版本。运行 ElasticSearchSink 的 Flume 代理还应该与目标集群降级到次要版本的 JVM 相匹配。

活动将每天写入新索引。名称将为\ -yyyy-MM-dd,其中\ 是 indexName 参数。接收器将在 UTC 午夜开始写入新索引。

默认情况下,事件由 ElasticSearchLogStashEventSerializer 序列化以进行 Elasticsearch。可以使用 serializer 参数覆盖此行为。此参数接受 org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer 或 org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory 的实现。不推荐使用 ElasticSearchEventSerializer,而建议使用功能更强大的 ElasticSearchIndexRequestBuilderFactory。

类型是 FQCN:org.apache.flume.sink.elasticsearch.ElasticSearchSink

必填属性以**表示。

Property NameDefaultDescription
channel
type组件类型名称,必须为org\.apache\.flume\.sink\.elasticsearch\.ElasticSearchSink
hostNames主机名:端口的逗号分隔列表,如果端口不存在,则将使用默认端口“ 9300”
indexNameflume日期将附加到索引的名称。示例'flume'->'flume-yyyy-MM-dd'支持任意 Headers 替换,例如。 %\ {}替换为命名事件 Headers 的值
indexTypelogs用于将文档编入索引的类型,默认为'log'支持任意 Headers 替换,例如。 %\ {}替换为命名事件 Headers 的值
clusterNameelasticsearch要连接的 ElasticSearch 集群的名称
batchSize100每个 txn 要写入的事件数。
ttlTTL(以天为单位)在设置后将导致过期文件自动被删除,如果未设置,则永远不会被自动删除。 TTL 仅以较早的整数形式被接受,例如 a1.sinks.k1.ttl = 5,还带有限定符 ms(毫秒),s(秒),m(分钟),h(小时),d(天)和 w(周)。示例 a1.sinks.k1.ttl = 5d 将 TTL 设置为 5 天。遵循http://www.elasticsearch.org/guide/reference/mapping/ttl-field/以获得更多信息。
serializerorg.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer要使用的 ElasticSearchIndexRequestBuilderFactory 或 ElasticSearchEventSerializer。可以接受这两个类的实现,但首选 ElasticSearchIndexRequestBuilderFactory。
serializer.*要传递给序列化程序的属性。

Note

头替换很方便,可以使用事件头的值动态确定存储事件时要使用的 indexName 和 indexType。使用此功能时应谨慎,因为事件提交者现在可以控制 indexName 和 indexType。此外,如果使用 elasticsearch RESTClient 端,则事件提交者可以控制使用的 URL 路径。

名为 a1 的代理的示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1

Kite 数据集接收器

实验接收器,将事件写入Kite Dataset。该接收器将反序列化每个传入事件的主体,并将结果记录存储在 Kite 数据集中。它通过按 URI 加载数据集来确定目标数据集。

唯一受支持的序列化是 avro,并且记录架构必须在事件 Headers 中传递,使用 JSON 模式表示形式的flume\.avro\.schema\.literal或可以找到该架构的 URL flume\.avro\.schema\.url(支持hdfs:/\.\.\. URI)。这与 Log4jAppender flumeClient 端以及使用deserializer\.schemaType = LITERAL的假脱机目录源的 Avro 反序列化程序兼容。

注意 1: 不支持flume\.avro\.schema\.hash 。注意 2:在某些情况下,超过滚动间隔后,文件滚动可能会稍微发生。但是,此延迟不会超过 5 秒。在大多数情况下,延迟是可以忽略的。

Property NameDefaultDescription
channel
type必须是 org.apache.flume.sink.kite.DatasetSink
kite.dataset.uri要打开的数据集的 URI
kite.repo.uri要打开的存储库的 URI(不建议使用;改用 kite.dataset.uri)
kite.dataset.namespace记录将写入的数据集的名称空间(不建议使用;改用 kite.dataset.uri)
kite.dataset.name记录将写入的数据集的名称(不建议使用;改用 kite.dataset.uri)
kite.batchSize100每批要处理的记录数
kite.rollInterval30释放数据文件之前的最大 await 时间(秒)
kite.flushable.commitOnBatchtrue如果true,将提交 Flume 事务,并在每批kite\.batchSize记录上刷新写入器。此设置仅适用于可刷新数据集。 true时,可能会将带有已提交数据的临时文件保留在数据集目录中。需要手动恢复这些文件,以使数据对 DatasetReaders 可见。
kite.syncable.syncOnBatchtrue控制接收器在提交事务时是否还将同步数据。此设置仅适用于可同步的数据集。同步确保数据将被写入远程系统上稳定存储的信息,同时仅刷新数据已离开 FlumeClient 端缓冲区的信息。当kite\.flushable\.commitOnBatch属性设置为false时,此属性也必须设置为false
kite.entityParseravro将 Flume Events转换为 Kite 实体的解析器。有效值为avroEntityParser\.Builder接口实现的完全限定的类名。
kite.failurePolicyretry处理不可恢复错误的策略,例如EventHeaders 中缺少Schema。默认值retry将使当前批次失败,然后重试与旧行为匹配的行为。其他有效值是save,它将原始Event写入kite\.error\.dataset\.uri数据集,以及FailurePolicy\.Builder接口实现的完全限定的类名。
kite.error.dataset.urikite\.failurePolicy设置为save时,保存失败事件的数据集的 URI。 kite\.failurePolicy设置为save时是必填
auth.kerberosPrincipalKerberos 用户主体,用于对 HDFS 的安全身份验证
auth.kerberosKeytab主体的 Kerberos 密钥表位置(本地 FS)
auth.proxyUserHDFS 操作的有效用户(如果不同于 kerberos 主体)

Kafka Sink

这是 Flume Sink 的实现,可以将数据发布到Kafka主题。目标之一是将 Flume 与 Kafka 集成在一起,以使基于拉的处理系统可以处理来自各种 Flume 来源的数据。

当前支持 Kafka 服务器版本 0.10.1.0 或更高。测试已完成至 2.0.1,这是发行时的最高可用版本。

必需的属性以粗体标记。

Property NameDefaultDescription
type必须设置为org\.apache\.flume\.sink\.kafka\.KafkaSink
kafka.bootstrap.serversKafka-Sink 将连接到的代理列表,以获取主题分区列表。这可以是部分代理列表,但是对于 HA,我们建议至少两个。格式是用逗号分隔的主机名:端口列表
kafka.topicdefault-flume-topicKafka 中将发布消息的主题。如果配置了此参数,则消息将发布到该主题。如果事件标题包含“主题”字段,则事件将发布到该主题,并覆盖此处配置的主题。支持任意头替换,例如。 %\ {}被名为“ header”的事件标题的值替换。 (如果使用替换,建议将 Kafka 代理的“ auto.create.topics.enable”属性设置为 true.)
flumeBatchSize100一批中要处理多少条消息。较大的批次可提高吞吐量,同时增加延迟。
kafka.producer.acks1在成功考虑一条消息之前,有多少个副本必须确认一条消息。接受的值为 0(永远不 await 确认),1(仅 await 领导者),-1(await 所有副本)将其设置为-1,以避免在某些领导者失败的情况下丢失数据。
useFlumeEventFormatfalse默认情况下,事件直接从事件主体以字节形式放入 Kafka 主题。设置为 true 可将事件存储为 Flume Avro 二进制格式。与 KafkaSource 上的相同属性或 Kafka Channel 上的 parseAsFlumeEvent 属性结合使用,将为生产方保留任何 FlumeHeaders。
defaultPartitionId指定此通道中要发送到的所有事件的 Kafka 分区 ID(整数),除非被partitionIdHeader覆盖。默认情况下,如果未设置此属性,则事件将由 Kafka Producer 的分区程序进行分配-包括通过key(如果指定)(或由kafka\.partitioner\.class指定的分区)进行分配。
partitionIdHeader设置后,接收器将从事件头中获取使用此属性的值命名的字段的值,并将消息发送到主题的指定分区。如果该值表示无效的分区,则将引发 EventDeliveryException。如果标题值存在,则此设置将覆盖defaultPartitionId
allowTopicOverridetrue设置后,接收器将允许将消息生成到由topicHeader属性(如果提供)指定的主题中。
topicHeadertopicallowTopicOverride结合设置时,将在此属性的值命名的 Headers 值中生成一条消息。与 Kafka Source topicHeader属性一起使用时应注意避免产生环回。
kafka.producer.security.protocolPLAINTEXT如果使用某种安全级别写入 Kafka,则设置为 SASL_PLAINTEXT,SASL_SSL 或 SSL。有关安全设置的其他信息,请参见下文。
更多制作人安全道具 如果使用 SASL_PLAINTEXT,则 SASL_SSL 或 SSL 请参考Kafka security以获取需要在生产者上设置的其他属性。
Kafka Producer 的其他属性这些属性用于配置 Kafka Producer。可以使用 Kafka 支持的任何生产者属性。唯一的要求是在属性名称前添加前缀kafka\.producer。例如:kafka.producer.linger.ms

Note

Kafka Sink 使用 FlumeEventHeaders 中的topickey属性将事件发送到 Kafka。如果标题中存在topic,则事件将发送到该特定主题,从而覆盖为接收器配置的主题。如果标题中存在key,Kafka 将使用该键在主题分区之间对数据进行分区。具有相同键的事件将被发送到同一分区。如果键为 null,则事件将发送到随机分区。

Kafka 接收器还提供了 key.serializer(org.apache.kafka.common.serialization.StringSerializer)和 value.serializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默认值。不建议修改这些参数。

Deprecated Properties

Property NameDefaultDescription
brokerListUse kafka.bootstrap.servers
topicdefault-flume-topicUse kafka.topic
batchSize100Use kafka.flumeBatchSize
requiredAcks1Use kafka.producer.acks

Kafka 接收器的示例配置如下。以前缀kafka\.producer Kafka 生产者开头的属性。创建 Kafka 生产者时传递的属性不限于本示例中给出的属性。也可以在此处包括您的自定义属性,并通过作为方法参数传入的 Flume Context 对象在预处理器中访问它们。

a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

安全性和 Kafka 接收器:

Flume 和 Kafka 之间的通信通道支持安全身份验证以及数据加密。对于安全身份验证,可以从 Kafka 0.9.0 版开始使用 SASL/GSSAPI(Kerberos V5)或 SSL(即使该参数名为 SSL,但实际协议是 TLS 实现)。

到目前为止,数据加密仅由 SSL/TLS 提供。

kafka\.producer\.security\.protocol设置为以下任意值表示:

  • SASL_PLAINTEXT -Kerberos 或纯文本身份验证,无数据加密

  • SASL_SSL -带有数据加密的 Kerberos 或纯文本身份验证

  • SSL -基于 TLS 的加密,带有可选身份验证。

Warning

启用 SSL 时,性能会降低,幅度取决于 CPU 类型和 JVM 实现。参考:Kafka 安全概述和用于跟踪此问题的吉拉:KAFKA-2561

TLS 和 Kafka 接收器:

请阅读配置 KafkaClient 端 SSL中描述的步骤,以了解用于微调的其他配置设置,例如以下任意项:安全提供程序,密码套件,已启用的协议,信任库或密钥库类型。

服务器端身份验证和数据加密的示例配置。

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

在此指定信任库是可选的,可以改为使用全局信任库。有关全局 SSL 设置的更多详细信息,请参见SSL/TLS support部分。

注意:默认情况下,未定义属性ssl\.endpoint\.identification\.algorithm,因此不执行主机名验证。为了启用主机名验证,请设置以下属性

a1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS

启用后,Client 端将根据以下两个字段之一验证服务器的标准域名(FQDN):

如果还需要 Client 端身份验证,则还需要在 Flume 代理配置中添加以下内容,或者可以使用全局 SSL 设置(请参阅SSL/TLS support部分)。每个 Flume 代理必须拥有其 Client 证书,该证书必须由 Kafkabroker 单独或通过其签名链来信任。常见的示例是通过单个根 CA 对每个 Client 端证书进行签名,而根 CA 又会受到 Kafkabroker 的信任。

# optional, the global keystore can be used alternatively
a1.sinks.sink1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
a1.sinks.sink1.kafka.producer.ssl.keystore.password = <password to access the keystore>

如果密钥库和密钥使用不同的密码保护,则ssl\.key\.password属性将为生产者密钥库提供所需的其他 Secret:

a1.sinks.sink1.kafka.producer.ssl.key.password = <password to access the key>

Kerberos 和 Kafka 接收器:

要将 Kafka 接收器与受 Kerberos 保护的 Kafka 群集一起使用,请为生产者设置上面提到的producer\.security\.protocol属性。在 JAAS 文件的“ KafkaClient”部分中指定了与 Kafka 代理一起使用的 Kerberos 密钥表和主体。如果需要,“Client 端”部分介绍了 Zookeeper 连接。有关 JAAS 文件内容的信息,请参见Kafka doc。可以通过 flume-env.sh 中的 JAVA_OPTS 来指定此 JAAS 文件的位置以及系统范围内的 kerberos 配置:

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

使用 SASL_PLAINTEXT 的示例安全配置:

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka

使用 SASL_SSL 的示例安全配置:

a1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.sink1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sinks.sink1.kafka.topic = mytopic
a1.sinks.sink1.kafka.producer.security.protocol = SASL_SSL
a1.sinks.sink1.kafka.producer.sasl.mechanism = GSSAPI
a1.sinks.sink1.kafka.producer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.sinks.sink1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.sinks.sink1.kafka.producer.ssl.truststore.password = <password to access the truststore>

samplesJAAS 文件。有关其内容的参考,请参阅SASL configuration的 Kafka 文档中所需身份验证机制(GSSAPI/PLAIN)的 Client 端配置部分。与 Kafka 来源或 KafkaChannels 不同,除非其他连接组件需要“Client 端”部分,否则不需要。另外,请确保 Flume 进程的 os 用户具有 jaas 和 keytab 文件的读取特权。

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

HTTP Sink

该接收器的行为是它将接收来自通道的事件,并使用 HTTP POST 请求将这些事件发送到远程服务。事件内容作为 POST 正文发送。

此接收器的错误处理行为取决于目标服务器返回的 HTTP 响应。接收器退回/就绪状态是可配置的,事务提交/回滚结果以及事件是否有助于成功的事件消耗计数也是可配置的。

服务器返回的任何格式错误的 HTTP 响应(状态代码不可读)都将导致退避 signal,并且不会从通道中消耗事件。

必填属性以**表示。

Property NameDefaultDescription
channel
type组件类型名称必须为http
endpointPOST 的标准 URL 端点
connectTimeout5000套接字连接超时(以毫秒为单位)
requestTimeout5000最大请求处理时间(以毫秒为单位)
contentTypeHeadertext/plainHTTP Content-TypeHeaders
acceptHeadertext/plainHTTP AcceptHeaders 值
defaultBackofftrue默认情况下是否在收到所有 HTTP 状态代码时退避
defaultRollbacktrue默认情况下是否在收到所有 HTTP 状态代码时回滚
defaultIncrementMetricsfalse默认情况下是否在接收所有 HTTP 状态代码时增加 Metrics
backoff.CODE为单个(即 200)代码或组(即 2XX)代码配置特定的退避
rollback.CODE为单个(即 200)代码或组(即 2XX)代码配置特定的回滚
incrementMetrics.CODE为单个(即 200)代码或组(即 2XX)代码配置特定的 Metrics 增量

请注意,最具体的 HTTP 状态代码匹配项用于 backoff,rollback 和 crementMetrics 配置选项。如果同时具有 2XX 和 200 状态代码的配置值,则 200 个 HTTP 代码将使用 200 值,而 201-299 范围内的所有其他 HTTP 代码将使用 2XX 值。

在不向 HTTP 端点发出任何请求的情况下,将使用任何空或空事件。

名为 a1 的代理的示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = http
a1.sinks.k1.channel = c1
a1.sinks.k1.endpoint = http://localhost:8080/someuri
a1.sinks.k1.connectTimeout = 2000
a1.sinks.k1.requestTimeout = 2000
a1.sinks.k1.acceptHeader = application/json
a1.sinks.k1.contentTypeHeader = application/json
a1.sinks.k1.defaultBackoff = true
a1.sinks.k1.defaultRollback = true
a1.sinks.k1.defaultIncrementMetrics = false
a1.sinks.k1.backoff.4XX = false
a1.sinks.k1.rollback.4XX = false
a1.sinks.k1.incrementMetrics.4XX = true
a1.sinks.k1.backoff.200 = false
a1.sinks.k1.rollback.200 = false
a1.sinks.k1.incrementMetrics.200 = true

Custom Sink

自定义接收器是您自己的接收器接口的实现。启动 Flume 代理时,自定义接收器的类及其依赖项必须包含在代理的 Classpath 中。定制接收器的类型是其 FQCN。必填属性以**表示。

Property NameDefaultDescription
channel
type组件类型名称,必须是您的 FQCN

名为 a1 的代理的示例:

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.example.MySink
a1.sinks.k1.channel = c1

Flume Channels

通道是事件在代理上进行的存储库。源添加事件,接收器将其删除。

Memory Channel

事件存储在内存队列中,该队列具有可配置的最大大小。对于需要更高吞吐量并准备在代理发生故障时丢失分段数据的流而言,它是理想的选择。必填属性以**表示。

Property NameDefaultDescription
type组件类型名称,必须为memory
capacity100通道中存储的最大事件数
transactionCapacity100每次 Transaction 通道从源或接收器接收的最大事件数
keep-alive3添加或删除事件的超时时间(以秒为单位)
byteCapacityBufferPercentage20定义 byteCapacity 和通道中所有事件的估计总大小之间的缓冲区百分比,以说明报头中的数据。见下文。
byteCapacitysee description此通道中所有事件的总和所允许的最大总“字节”内存。该实现仅计算事件body,这也是提供byteCapacityBufferPercentage配置参数的原因。默认值为计算值等于 JVM 可用最大内存的 80%(即,命令行中传递的-Xmx 值的 80%)。请注意,如果您在单个 JVM 上有多个内存通道,并且它们恰好持有相同的物理事件(即,如果您使用的是来自单个源的复制通道 selectors),那么出于通道字节容量的目的,可以对这些事件大小进行重复计算。将此值设置为0将导致该值回落到大约 200 GB 的内部硬限制。

名为 a1 的代理的示例:

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

JDBC Channel

事件存储在数据库支持的持久性存储中。 JDBC 通道当前支持嵌入式 Derby。这是一个持久通道,非常适合可恢复性很重要的流程。必填属性以**表示。

Property NameDefaultDescription
type组件类型名称,必须为jdbc
db.typeDERBY数据库供应商,必须是 DERBY。
driver.classorg.apache.derby.jdbc.EmbeddedDriver供应商的 JDBC 驱动程序的类
driver.url(由其他属性构成)JDBC 连接 URL
db.username"sa"数据库连接的用户标识
db.password数据库连接密码
connection.properties.fileJDBC 连接属性文件路径
create.schematrue如果为 true,则创建数据库模式(如果不存在)
create.indextrue创建索引以加快查找速度
create.foreignkeytrue
transaction.isolation"READ_COMMITTED"数据库会话 READ_UNCOMMITTED,READ_COMMITTED,SERIALIZABLE,REPEATABLE_READ 的隔离级别
maximum.connections10与数据库的最大连接数
maximum.capacity0 (unlimited)Channels 中的最大事件数
sysprop.* 数据库供应商特定的属性
sysprop.user.home 存储嵌入式 Derby 数据库的主路径

名为 a1 的代理的示例:

a1.channels = c1
a1.channels.c1.type = jdbc

Kafka Channel

事件存储在 Kafka 集群中(必须单独安装)。 Kafka 提供高可用性和复制功能,因此,如果代理或 kafka 代理崩溃,则事件可立即用于其他接收器

KafkaChannels 可用于多种情况:

  • 借助 Flume 源和接收器-它为事件提供了可靠且高度可用的 Channel

  • 带有 Flume 源代码和拦截器,但没有接收器-允许将 Flume 事件写入 Kafka 主题,以供其他应用程序使用

  • 使用 Flume 接收器但没有源-这是一种低延迟,容错方式,可将事件从 Kafka 发送到 Flume 接收器,例如 HDFS,HBase 或 Solr

当前支持 Kafka 服务器版本 0.10.1.0 或更高。测试已完成至 2.0.1,这是发行时的最高可用版本。

配置参数的组织方式如下:

  • 通常,与通道相关的配置值应用于通道配置级别,例如:a1.channel.k1.type =

  • 与 Kafka 或 Channel 的操作相关的配置值以“ kafka。”为前缀(与 CommonClient Configs 相似),例如:a1.channels.k1.kafka.topic 和 a1.channels.k1.kafka.bootstrap.servers。这与 hdfs 接收器的操作方式并无不同

  • 生产者/Consumer 特定的属性以 kafka.producer 或 kafka.consumer 为前缀

  • 在可能的情况下,使用 Kafka 参数名称,例如:bootstrap.servers 和 acks

此版本的 Sink 与以前的版本向后兼容,但是下表中指示了已弃用的属性,并且当配置文件中存在它们时,将在启动时记录警告消息。

必填属性以**表示。

Property NameDefaultDescription
type组件类型名称,必须为org\.apache\.flume\.channel\.kafka\.KafkaChannel
kafka.bootstrap.servers该通道使用的 Kafka 集群中的代理列表。这可以是部分代理列表,但是对于 HA,我们建议至少两个。格式是用逗号分隔的主机名:端口列表
kafka.topicflume-channel该 Channels 将使用的 Kafka 主题
kafka.consumer.group.idflumeChannel 用于向 Kafka 注册的 Consumer 组 ID。多个 Channel 必须使用相同的主题和组,以确保当一个代理失败时,另一个代理可以获取数据。请注意,使用具有相同 ID 的非 Channel 使用者可以导致数据丢失。
parseAsFlumeEventtrue期望通道中具有 FlumeEvent 模式的 Avro 基准面。如果 Flume 源正在将内容写入通道,则应该为 true;如果其他生产者正在写入该通道使用的主题,则为 false。可以使用 flume-ng-sdk 工件提供的 org.apache.flume.source.avro.AvroFlumeEvent 在 Flume 外部解析发送到 Kafka 的 Flume 源消息。
pollTimeout500使用者的“ poll()”调用中 await 的时间(以毫秒为单位)。 https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long
defaultPartitionId指定此通道中要发送到的所有事件的 Kafka 分区 ID(整数),除非被partitionIdHeader覆盖。默认情况下,如果未设置此属性,则事件将由 Kafka Producer 的分区程序进行分配-包括通过key(如果指定)(或由kafka\.partitioner\.class指定的分区)进行分配。
partitionIdHeader设置后,生产者将从事件头中获取使用该属性的值命名的字段的值,并将消息发送到主题的指定分区。如果该值表示无效分区,则该事件将不被接收到通道中。如果标题值存在,则此设置将覆盖defaultPartitionId
kafka.consumer.auto.offset.resetlatest当 Kafka 中没有初始偏移量或服务器上不再存在当前偏移量时(例如,因为该数据已被删除),该怎么办:最早:自动将偏移量重置为最早的偏移量最新:自动重置偏移量到最新偏移量无:如果未找到使用者组的先前偏移量,则向 Consumer 引发异常;其他:向 Consumer 引发异常。
kafka.producer.security.protocolPLAINTEXT如果使用某种安全级别写入 Kafka,则设置为 SASL_PLAINTEXT,SASL_SSL 或 SSL。有关安全设置的其他信息,请参见下文。
kafka.consumer.security.protocolPLAINTEXT与 kafka.producer.security.protocol 相同,但可从 Kafka 读取/使用。
更多生产者/Consumer 安全道具 如果使用 SASL_PLAINTEXT,则 SASL_SSL 或 SSL 请参考Kafka security,以了解需要在生产者/Consumer 上设置的其他属性。

Deprecated Properties

Property NameDefaultDescription
brokerList该通道使用的 Kafka 集群中的代理列表。这可以是部分代理列表,但是对于 HA,我们建议至少两个。格式是用逗号分隔的主机名:端口列表
topicflume-channelUse kafka.topic
groupIdflumeUse kafka.consumer.group.id
readSmallestOffsetfalseUse kafka.consumer.auto.offset.reset
migrateZookeeperOffsetstrue如果找不到 Kafka 存储的偏移量,请在 Zookeeper 中查找偏移量并将其提交给 Kafka。为支持从旧版 Flume 进行无缝 KafkaClient 端迁移,这应该是正确的。迁移后,可以将其设置为 false,尽管通常不需要这样做。如果未找到 Zookeeper 偏移,则 kafka.consumer.auto.offset.reset 配置定义如何处理偏移。

Note

由于通道是负载均衡的方式,因此在代理首次启动时可能会出现重复的事件

名为 a1 的代理的示例:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer

安全和 KafkaChannels:

Flume 和 Kafka 之间的通信通道支持安全身份验证以及数据加密。对于安全身份验证,可以从 Kafka 0.9.0 版开始使用 SASL/GSSAPI(Kerberos V5)或 SSL(即使该参数名为 SSL,但实际协议是 TLS 实现)。

到目前为止,数据加密仅由 SSL/TLS 提供。

kafka\.producer\|consumer\.security\.protocol设置为以下任意值表示:

  • SASL_PLAINTEXT -Kerberos 或纯文本身份验证,无数据加密

  • SASL_SSL -带有数据加密的 Kerberos 或纯文本身份验证

  • SSL -基于 TLS 的加密,带有可选身份验证。

Warning

启用 SSL 时,性能会降低,幅度取决于 CPU 类型和 JVM 实现。参考:Kafka 安全概述和用于跟踪此问题的吉拉:KAFKA-2561

TLS 和 KafkaChannels:

请阅读配置 KafkaClient 端 SSL中描述的步骤,以了解用于微调的其他配置设置,例如以下任意项:安全提供程序,密码套件,已启用的协议,信任库或密钥库类型。

服务器端身份验证和数据加密的示例配置。

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
a1.channels.channel1.kafka.consumer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>

在此指定信任库是可选的,可以改为使用全局信任库。有关全局 SSL 设置的更多详细信息,请参见SSL/TLS support部分。

注意:默认情况下,未定义属性ssl\.endpoint\.identification\.algorithm,因此不执行主机名验证。为了启用主机名验证,请设置以下属性

a1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
a1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm = HTTPS

启用后,Client 端将根据以下两个字段之一验证服务器的标准域名(FQDN):

如果还需要 Client 端身份验证,则还需要在 Flume 代理配置中添加以下内容,或者可以使用全局 SSL 设置(请参阅SSL/TLS support部分)。每个 Flume 代理必须拥有其 Client 证书,该证书必须由 Kafkabroker 单独或通过其签名链来信任。常见的示例是通过单个根 CA 对每个 Client 端证书进行签名,而根 CA 又会受到 Kafkabroker 的信任。

# optional, the global keystore can be used alternatively
a1.channels.channel1.kafka.producer.ssl.keystore.location = /path/to/client.keystore.jks
a1.channels.channel1.kafka.producer.ssl.keystore.password = <password to access the keystore>
# optional, the global keystore can be used alternatively
a1.channels.channel1.kafka.consumer.ssl.keystore.location = /path/to/client.keystore.jks
a1.channels.channel1.kafka.consumer.ssl.keystore.password = <password to access the keystore>

如果密钥库和密钥使用不同的密码保护,则ssl\.key\.password属性将为使用者和生产者密钥库提供所需的其他 Secret:

a1.channels.channel1.kafka.producer.ssl.key.password = <password to access the key>
a1.channels.channel1.kafka.consumer.ssl.key.password = <password to access the key>

Kerberos 和 KafkaChannels:

要将 Kafka 通道与受 Kerberos 保护的 Kafka 群集一起使用,请为生产者和/或使用者设置上面提到的producer/consumer\.security\.protocol属性。在 JAAS 文件的“ KafkaClient”部分中指定了与 Kafka 代理一起使用的 Kerberos 密钥表和主体。如果需要,“Client 端”部分介绍了 Zookeeper 连接。有关 JAAS 文件内容的信息,请参见Kafka doc。可以通过 flume-env.sh 中的 JAVA_OPTS 来指定此 JAAS 文件的位置以及系统范围内的 kerberos 配置:

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

使用 SASL_PLAINTEXT 的示例安全配置:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SASL_PLAINTEXT
a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
a1.channels.channel1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka

使用 SASL_SSL 的示例安全配置:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.channels.channel1.kafka.topic = channel1
a1.channels.channel1.kafka.consumer.group.id = flume-consumer
a1.channels.channel1.kafka.producer.security.protocol = SASL_SSL
a1.channels.channel1.kafka.producer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.producer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.producer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.producer.ssl.truststore.password = <password to access the truststore>
a1.channels.channel1.kafka.consumer.security.protocol = SASL_SSL
a1.channels.channel1.kafka.consumer.sasl.mechanism = GSSAPI
a1.channels.channel1.kafka.consumer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.channels.channel1.kafka.consumer.ssl.truststore.location = /path/to/truststore.jks
a1.channels.channel1.kafka.consumer.ssl.truststore.password = <password to access the truststore>

samplesJAAS 文件。有关其内容的参考,请参阅SASL configuration的 Kafka 文档中所需身份验证机制(GSSAPI/PLAIN)的 Client 端配置部分。由于 Kafka Source 也可以连接到 Zookeeper 进行偏移量迁移,因此在此示例中还添加了“Client 端”部分。除非需要偏移量迁移,或者其他安全组件需要此部分,否则将不需要这样做。另外,请确保 Flume 进程的 os 用户具有 jaas 和 keytab 文件的读取特权。

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

File Channel

必填属性以**表示。

属性名称默认Description
type组件类型名称必须为file
checkpointDir~/.flume/file-channel/checkpoint将存储检查点文件的目录
useDualCheckpointsfalse备份检查点。如果将其设置为true,则必须设置backupCheckpointDir **
backupCheckpointDir备份检查点的目录。该目录 必须 与数据目录或检查点目录相同
dataDirs~/.flume/file-channel/data用逗号分隔的目录列表,用于存储日志文件。在单独的磁盘上使用多个目录可以提高文件通道性能
transactionCapacity10000Channel 支持的最大 Transaction 规模
checkpointInterval30000检查点之间的时间(以毫秒为单位)
maxFileSize2146435071单个日志文件的最大大小(以字节为单位)
minimumRequiredSpace524288000所需的最小可用空间(以字节为单位)。为避免数据损坏,当可用空间降至此值以下时,文件通道将停止接受获取/放入请求
capacity1000000通道最大容量
keep-alive3await 放置操作的时间(以秒为单位)
use-log-replay-v1falsemaven:使用旧的重播逻辑
use-fast-replayfalsemaven:无需队列即可重播
checkpointOnClosetrue控制通道关闭时是否创建检查点。在关闭位置创建检查点可以避免重播,从而加快了文件通道的后续启动速度。
encryption.activeKey密钥名称,用于加密新数据
encryption.cipherProvider密码提供者类型,支持的类型:AESCTRNOPADDING
encryption.keyProvider密钥提供者类型,支持的类型:JCEKSFILE
encryption.keyProvider.keyStoreFile密钥库文件的路径
encrpytion.keyProvider.keyStorePasswordFile密钥库密码文件的路径
encryption.keyProvider.keys所有按键的列表(例如 activeKey 设置的历史记录)
encyption.keyProvider.keys.*.passwordFile可选密钥密码文件的路径

Note

默认情况下,文件通道使用上面指定的用户目录中检查点和数据目录的路径。因此,如果代理中有多个活动的文件通道实例,则只有一个能够锁定目录并导致另一个通道初始化失败。因此,有必要提供到所有已配置通道的显式路径,最好是在不同的磁盘上。此外,由于文件通道将在每次提交后同步到磁盘,因此在多个磁盘无法用于检查点和数据目录的情况下,可能需要将文件通道与将事件分批处理的接收器/源耦合在一起,以提供良好的性能。

名为 a1 的代理的示例:

a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

Encryption

以下是一些示例配置:

生成具有与密钥库密码分开的密码的密钥:

keytool -genseckey -alias key-0 -keypass keyPassword -keyalg AES \
  -keysize 128 -validity 9000 -keystore test.keystore \
  -storetype jceks -storepass keyStorePassword

使用与密钥存储区密码相同的密码生成密钥:

keytool -genseckey -alias key-1 -keyalg AES -keysize 128 -validity 9000 \
  -keystore src/test/resources/test.keystore -storetype jceks \
  -storepass keyStorePassword
a1.channels.c1.encryption.activeKey = key-0
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = key-provider-0
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0

假设您已经淘汰了 key-0,并且应该使用 key-1 对新文件进行加密:

a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1

与上述相同的场景,但是 key-0 有其自己的密码:

a1.channels.c1.encryption.activeKey = key-1
a1.channels.c1.encryption.cipherProvider = AESCTRNOPADDING
a1.channels.c1.encryption.keyProvider = JCEKSFILE
a1.channels.c1.encryption.keyProvider.keyStoreFile = /path/to/my.keystore
a1.channels.c1.encryption.keyProvider.keyStorePasswordFile = /path/to/my.keystore.password
a1.channels.c1.encryption.keyProvider.keys = key-0 key-1
a1.channels.c1.encryption.keyProvider.keys.key-0.passwordFile = /path/to/key-0.password

溢出内存通道

事件存储在内存队列中和磁盘上。内存中队列充当主存储,磁盘充当溢出。使用嵌入式文件通道 Management 磁盘存储。当内存队列已满时,其他传入事件将存储在文件通道中。该通道非常适合在正常操作期间需要高存储通道吞吐量的流,但同时又需要较大的文件通道容量,以更好地容忍间歇性接收器侧中断或排水速率下降。在这种异常情况下,吞吐量将大约降低到文件通道速度。如果代理崩溃或重新启动,则当代理联机时,只会恢复磁盘上存储的事件。 该 Channels 目前处于实验阶段,不建议在 Producing 使用.

必填属性以**表示。请参考文件通道以获取其他必需属性。

Property NameDefaultDescription
type组件类型名称,必须为SPILLABLEMEMORY
memoryCapacity10000存储在内存队列中的最大事件数。要禁用内存队列的使用,请将其设置为零。
overflowCapacity100000000存储在溢出磁盘(即文件通道)中的最大事件数。要禁用溢出功能,请将其设置为零。
overflowTimeout3内存已满时启用磁盘溢出之前要 await 的秒数。
byteCapacityBufferPercentage20定义 byteCapacity 和通道中所有事件的估计总大小之间的缓冲区百分比,以说明报头中的数据。见下文。
byteCapacitysee description允许的最大 字节 内存作为内存队列中所有事件的总和。该实现仅计算事件body,这也是提供byteCapacityBufferPercentage配置参数的原因。默认值为计算值等于 JVM 可用最大内存的 80%(即,命令行中传递的-Xmx 值的 80%)。请注意,如果您在单个 JVM 上有多个内存通道,并且它们恰好持有相同的物理事件(即,如果您使用的是来自单个源的复制通道 selectors),那么出于通道字节容量的目的,可以对这些事件大小进行重复计算。将此值设置为0将导致该值回落到大约 200 GB 的内部硬限制。
avgEventSize500进入通道的事件的估计平均大小(以字节为单位)
\查看文件 Channels可以使用任何文件通道属性,但“ keep-alive”和“ capacity”除外。文件通道的保持活动由可溢出内存通道 Management。使用“ overflowCapacity”设置文件通道的容量。

如果达到 memoryCapacity 或 byteCapacity 限制,则认为内存中队列已满。

名为 a1 的代理的示例:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 10000
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.byteCapacity = 800000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

要禁用内存队列并使用类似于文件通道的功能:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 0
a1.channels.c1.overflowCapacity = 1000000
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

要禁用溢出磁盘并仅用作内存通道,请执行以下操作:

a1.channels = c1
a1.channels.c1.type = SPILLABLEMEMORY
a1.channels.c1.memoryCapacity = 100000
a1.channels.c1.overflowCapacity = 0

伪 TransactionChannels

Warning

伪事务通道仅用于单元测试目的,并不用于生产用途。

必填属性以**表示。

Property NameDefaultDescription
type组件类型名称,必须为org\.apache\.flume\.channel\.PseudoTxnMemoryChannel
capacity50通道中存储的最大事件数
keep-alive3添加或删除事件的超时时间(以秒为单位)

Custom Channel

自定义通道是您自己对 Channel 接口的实现。启动 Flume 代理时,自定义 Channel 的类及其依赖项必须包含在代理的 Classpath 中。自定义 Channel 的类型为其 FQCN。必填属性以**表示。

Property NameDefaultDescription
type组件类型名称,必须为 FQCN

名为 a1 的代理的示例:

a1.channels = c1
a1.channels.c1.type = org.example.MyChannel

Sink 通道 selectors

如果未指定类型,则默认为“复制”。

复制 Channelsselectors(默认)

必填属性以**表示。

Property NameDefaultDescription
selector.typereplicating组件类型名称,必须为replicating
selector.optional一组 Channels 将被标记为optional

名为 a1 的代理的示例,其来源为 r1:

a1.sources = r1
a1.channels = c1 c2 c3
a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
a1.sources.r1.selector.optional = c3

在上述配置中,c3 是可选通道。写入 c3 的失败将被忽略。由于 c1 和 c2 没有标记为可选,因此无法写入这些通道将导致事务失败。

多路通道 selectors

必填属性以**表示。

Property NameDefaultDescription
selector.typereplicating组件类型名称,必须为multiplexing
selector.headerflume.selector.header
selector.default
selector.mapping.*

名为 a1 的代理的示例,其来源为 r1:

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

自定义 Channelsselectors

自定义通道 selectors 是您自己的 ChannelSelector 接口实现。启动 Flume 代理时,自定义通道 selectors 的类及其依赖项必须包含在代理的 Classpath 中。自定义 Channelsselectors 的类型为其 FQCN。

Property NameDefaultDescription
selector.type组件类型名称,必须是您的 FQCN

名为 a1 的代理及其源称为 r1 的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.selector.type = org.example.MyChannelSelector

SinkSink 处理器

接收器组允许用户将多个接收器分组为一个实体。接收器处理器可用于在组内的所有接收器上提供负载平衡功能,或在出现临时故障的情况下实现从一个接收器到另一个接收器的故障转移。

必填属性以**表示。

Property NameDefaultDescription
sinks参与组的接收器的以空格分隔的列表
processor.typedefault组件类型名称,必须为defaultfailoverload_balance

名为 a1 的代理的示例:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance

默认接收器处理器

默认接收器处理器仅接受单个接收器。不必强迫用户为单个接收器创建处理器(接收器组)。取而代之的是,用户可以遵循本用户指南中上面说明的源-通道-接收器模式。

故障转移接收器处理器

故障转移接收器处理器维护接收器的优先级列表,以确保只要有一个可用的事件都将被处理(传递)。

故障转移机制的工作原理是将失败的接收器委派给一个池,在该池中为其分配一个冷却期,并在重试之前随着 Sequences 出现的故障而增加。接收器成功发送事件后,它将还原到活动池。接收器具有与之关联的优先级,数量越大,优先级越高。如果接收器在发送事件时失败,则下一个具有最高优先级的接收器应尝试发送事件。例如,优先级为 100 的接收器在优先级为 80 的接收器之前被激活。如果未指定优先级,则根据配置中指定接收器的 Sequences 确定优先级。

要进行配置,请将接收器组处理器设置为failover并为所有单个接收器设置优先级。所有指定的优先级必须唯一。此外,可以使用maxpenalty属性设置故障转移时间的上限(以毫秒为单位)。

必填属性以**表示。

Property NameDefaultDescription
sinks参与组的接收器的以空格分隔的列表
processor.typedefault组件类型名称,必须为failover
processor.priority.<sinkName>优先级值。<sinkName>必须是与当前接收器组关联的接收器实例之一。优先级较高的接收器较早被激活。绝对值越大表示优先级越高
processor.maxpenalty30000失败的接收器的最大退避时间(以毫秒为单位)

名为 a1 的代理的示例:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

负载均衡接收器处理器

负载均衡接收器处理器提供了负载均衡多个接收器上的流的能力。它维护必须在其上分配负载的活动接收器的索引列表。实现支持通过round_robinrandom选择机制来分配负载。选择机制的选择默认为round_robin类型,但可以通过配置覆盖。通过从AbstractSinkSelector继承的自定义类支持自定义选择机制。

调用时,此 selectors 使用其配置的选择机制选择下一个接收器并调用它。对于round_robinrandom,如果所选接收器无法传递事件,则处理器通过其配置的选择机制选择下一个可用接收器。此实现不会将出现故障的接收器列入黑名单,而是 continue 乐观地尝试每个可用的接收器。如果所有接收器调用均导致失败,那么 selectors 会将故障传播到接收器运行器。

如果启用了backoff,则接收器处理器会将失败的接收器列入黑名单,并将其删除以选择给定的超时时间。超时结束后,如果接收器仍未响应,则超时将成倍增加,以避免潜在地陷入对无响应接收器的长时间 await 中。禁用此功能后,在循环中所有失败的接收器负载都将传递到线路中的下一个接收器,因此无法均衡地平衡

必填属性以**表示。

Property NameDefaultDescription
processor.sinks参与组的接收器的以空格分隔的列表
processor.typedefault组件类型名称,必须为load_balance
processor.backofffalse失败的接收器应以指数方式减少。
processor.selectorround_robin选择机制。必须是从AbstractSinkSelector继承的自定义类的round_robinrandom或 FQCN
processor.selector.maxTimeOut30000退避 selectors 使用它来限制指数退避(以毫秒为单位)

名为 a1 的代理的示例:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

自定义接收器处理器

目前不支持自定义接收器处理器。

Event Serializers

file_roll接收器和hdfs接收器均支持EventSerializer接口。以下提供了 Flume 随附的 EventSerializers 的详细信息。

正文文本序列化器

别名:text。该拦截器将事件的主体写入输出流,而无需任何转换或修改。事件标题将被忽略。配置选项如下:

Property NameDefaultDescription
appendNewlinetrue是否在写时将换行符附加到每个事件。缺省值为 true,假定事件出于传统原因不包含换行符。

名为 a1 的代理的示例:

a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
a1.sinks.k1.sink.serializer = text
a1.sinks.k1.sink.serializer.appendNewline = false

“Sink 事件” Avro 事件序列化器

别名:avro_event

该拦截器将 Flume 事件序列化为 Avro 容器文件。使用的架构与 Avro RPC 机制中的 Flume 事件使用的架构相同。

该序列化器继承自AbstractAvroEventSerializer类。

配置选项如下:

Property NameDefaultDescription
syncIntervalBytes2048000Avro 同步间隔,以大约字节为单位。
compressionCodecnullAvro 压缩编解码器。有关受支持的编解码器,请参阅 Avro 的 CodecFactory 文档。

名为 a1 的代理的示例:

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = avro_event
a1.sinks.k1.serializer.compressionCodec = snappy

Avro 事件序列化器

别名:此序列化器没有别名,必须使用完全限定的类名类名来指定。

这会将 Flume 事件序列化为 Avro 容器文件,例如“ Flume Event” Avro 事件序列化程序,但是记录模式是可配置的。记录模式可以指定为 Flume 配置属性,也可以在事件 Headers 中传递。

要将记录模式作为 Flume 配置的一部分进行传递,请使用下面列出的属性schemaURL

要在事件标题中传递记录模式,请指定包含事件格式的 JSON 格式表示的事件标题flume\.avro\.schema\.literal或带有可在其中找到该模式的 URL 的flume\.avro\.schema\.url(支持hdfs:/\.\.\. URI)。

该序列化器继承自AbstractAvroEventSerializer类。

配置选项如下:

Property NameDefaultDescription
syncIntervalBytes2048000Avro 同步间隔,以大约字节为单位。
compressionCodecnullAvro 压缩编解码器。有关受支持的编解码器,请参阅 Avro 的 CodecFactory 文档。
schemaURLnullAvro 模式 URL。标题 ovverride 中指定的模式。

名为 a1 的代理的示例:

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
a1.sinks.k1.serializer.compressionCodec = snappy
a1.sinks.k1.serializer.schemaURL = hdfs://namenode/path/to/schema.avsc

Flume Interceptors

Flume 能够实时修改/删除事件。这是在拦截器的帮助下完成的。拦截器是实现org\.apache\.flume\.interceptor\.Interceptor接口的类。拦截器可以根据拦截器开发人员选择的任何标准来修改甚至删除事件。 Flume 支持拦截器链接。通过在配置中指定拦截器生成器类名称的列表,可以实现此目的。拦截器在源配置中指定为用空格分隔的列表。指定拦截器的 Sequences 就是调用它们的 Sequences。由一个拦截器返回的事件列表将传递到链中的下一个拦截器。拦截器可以修改或删除事件。如果拦截器需要丢弃事件,则它不会在返回的列表中返回该事件。如果要删除所有事件,则只返回一个空列表。拦截器称为组件,这是如何通过配置创建拦截器的示例:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
a1.sinks.k1.channel = c1

请注意,拦截器生成器将传递给 type config 参数。拦截器本身是可配置的,可以像传递到其他任何可配置组件一样传递配置值。在上面的示例中,事件首先传递给 HostInterceptor,然后 HostInterceptor 返回的事件再传递给 TimestampInterceptor。您可以指定完全限定的类名(FQCN)或别名timestamp。如果您有多个收集器写入同一 HDFS 路径,则也可以使用 HostInterceptor。

Timestamp Interceptor

该拦截器会将事件处理的时间(以毫秒为单位)插入到事件 Headers 中。该拦截器将插入一个具有关键字timestamp(或由header属性指定)的 Headers,其值是相关的时间戳。如果配置中已经存在该时间戳,则该拦截器可以保留现有时间戳。

Property NameDefaultDescription
type组件类型名称必须为timestamp或 FQCN
headerNametimestamp放置生成的时间戳的标题的名称。
preserveExistingfalse如果时间戳已经存在,则应保留它-是还是否

名为 a1 的代理的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

Host Interceptor

该拦截器插入此代理运行所在的主机的主机名或 IP 地址。它会根据配置插入带有密钥host或已配置密钥的 Headers,该密钥的值是主机的主机名或 IP 地址。

Property NameDefaultDescription
type组件类型名称,必须为host
preserveExistingfalse如果主机 Headers 已经存在,则应保留它-true 或 false
useIPtrue如果为 true,请使用 IP 地址,否则请使用主机名。
hostHeaderhost要使用的标题密钥。

名为 a1 的代理的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host

Static Interceptor

静态拦截器允许用户将具有静态值的静态 Headers 附加到所有事件。

当前实现不允许一次指定多个 Headers。相反,用户可以链接多个静态拦截器,每个拦截器都定义一个静态头。

Property NameDefaultDescription
type组件类型名称,必须为static
preserveExistingtrue如果已配置的 Headers 已经存在,则应保留它-true 或 false
keykey应创建的标题名称
valuevalue应该创建的静态值

名为 a1 的代理的示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK

删除标题拦截器

该拦截器通过删除一个或多个 Headers 来操纵 Flume 事件 Headers。它可以删除静态定义的 Headers,基于正则表达式的 Headers 或列表中的 Headers。如果未定义这些事件,或者没有标题与条件匹配,则不会修改 Flume 事件。

请注意,如果只需要删除一个 Headers,则通过名称指定它可以比其他两种方法提供性能上的好处。

Property NameDefaultDescription
type组件类型名称必须为remove_header
withName要删除的标题的名称
fromList要删除的标题列表,用fromListSeparator指定的分隔符分隔
fromListSeparator\s*,\s*用于分隔fromList指定的列表中的多个 Headers 名称的正则表达式。默认为逗号,由任意数量的空格字符包围
matching名称与该正则表达式匹配的所有 Headers 都将被删除

UUID Interceptor

该拦截器在所有被拦截的事件上设置一个通用的唯一标识符。 UUID 示例为b5755073\-77a9\-43c1\-8fad\-b7a586fc1b97,它表示一个 128 位值。

如果没有可用的事件级别的应用程序唯一键,请考虑使用 UUIDInterceptor 为事件自动分配 UUID。在事件进入 Flume 网络后立即为事件分配 UUID,这一点很重要。也就是说,在流的第一个 Flume Source 中。面对旨在实现高可用性和高性能的 Flume 网络中的复制和重新交付,可以实现事件的后续重复数据删除。如果应用程序级别密钥可用,则它比自动生成的 UUID 更好,因为它允许使用所述众所周知的应用程序级别密钥对数据存储中的事件进行后续更新和删除。

Property NameDefaultDescription
type组件类型名称必须为org\.apache\.flume\.sink\.solr\.morphline\.UUIDInterceptor$Builder
headerNameid要修改的 FlumeHeaders 的名称
preserveExistingtrue如果 UUIDHeaders 已经存在,则应保留它-true 或 false
prefix""前缀字符串常量,以前缀到每个生成的 UUID

Morphline Interceptor

该拦截器通过morphline 配置文件过滤事件,该morphline 配置文件定义了一系列转换命令,这些记录将记录从一个命令传递到另一个命令。例如,morphline 可以通过基于正则表达式的模式匹配来忽略某些事件或更改或插入某些事件 Headers,或者可以通过 Apache Tika 对截获的事件自动检测并设置 MIME 类型。例如,这种数据包嗅探可用于 Flume 拓扑中基于内容的动态路由。 MorphlineInterceptor 还可以帮助实现对多个 Apache Solr 集合的动态路由(例如,用于多租户)。

当前,存在一个限制,即拦截器的 morphline 必须为每个 Importing 事件生成一个以上的输出记录。此拦截器不适用于重型 ETL 处理-如果您需要这样做,请考虑将 ETL 处理从 Flume Source 移至 Flume Sink,例如到 MorphlineSolrSink。

必填属性以**表示。

Property NameDefaultDescription
type组件类型名称必须为org\.apache\.flume\.sink\.solr\.morphline\.MorphlineInterceptor$Builder
morphlineFile本地文件系统上到吗啉配置文件的相对或绝对路径。示例:/etc/flume\-ng/conf/morphline\.conf
morphlineIdnull如果在 morphline 配置文件中有多个 morphline,则用于标识 morphline 的可选名称

示例 flume.conf 文件:

a1.sources.avroSrc.interceptors = morphlineinterceptor
a1.sources.avroSrc.interceptors.morphlineinterceptor.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineFile = /etc/flume-ng/conf/morphline.conf
a1.sources.avroSrc.interceptors.morphlineinterceptor.morphlineId = morphline1

搜索和替换拦截器

该拦截器基于 Java 正则表达式提供简单的基于字符串的搜索和替换功能。还可以进行回溯/组捕获。该拦截器使用与 Java Matcher.replaceAll()方法相同的规则。

Property NameDefaultDescription
type组件类型名称必须为search_replace
searchPattern搜索和替换的模式。
replaceString替换字符串。
charsetUTF-8事件主体的字符集。默认情况下假定为 UTF-8.

Example configuration:

a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace

# Remove leading alphanumeric characters in an event body.
a1.sources.avroSrc.interceptors.search-replace.searchPattern = ^[A-Za-z0-9_]+
a1.sources.avroSrc.interceptors.search-replace.replaceString =

Another example:

a1.sources.avroSrc.interceptors = search-replace
a1.sources.avroSrc.interceptors.search-replace.type = search_replace

# Use grouping operators to reorder and munge words on a line.
a1.sources.avroSrc.interceptors.search-replace.searchPattern = The quick brown ([a-z]+) jumped over the lazy ([a-z]+)
a1.sources.avroSrc.interceptors.search-replace.replaceString = The hungry $2 ate the careless $1

正则表达式过滤拦截器

该拦截器通过将事件主体解释为文本并将文本与已配置的正则表达式进行匹配来选择性地过滤事件。提供的正则表达式可用于包含事件或排除事件。

Property NameDefaultDescription
type组件类型名称必须为regex_filter
regex".*"用于与事件匹配的正则表达式
excludeEventsfalse如果为 true,则正则表达式确定要排除的事件,否则正则表达式确定要包括的事件。

正则表达式提取器拦截器

此拦截器使用指定的正则表达式提取正则表达式匹配组,并将匹配组附加为事件的 Headers。它还支持可插入序列化程序,用于在将匹配组添加为事件 Headers 之前对其进行格式化。

Property NameDefaultDescription
type组件类型名称必须为regex_extractor
regex用于与事件匹配的正则表达式
serializers用空格分隔的序列化程序列表,用于将匹配项 Map 到 Headers 名称并序列化其值。 (请参见下面的示例)Flume 为以下序列化器提供了内置支持:org\.apache\.flume\.interceptor\.RegexExtractorInterceptorPassThroughSerializer org\.apache\.flume\.interceptor\.RegexExtractorInterceptorMillisSerializer
serializers.<s1>.typedefault必须为default(org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer),org\.apache\.flume\.interceptor\.RegexExtractorInterceptorMillisSerializer或实现org\.apache\.flume\.interceptor\.RegexExtractorInterceptorSerializer的自定义类的 FQCN
serializers.<s1>. name
serializers.*Serializer-specific properties

序列化程序用于将匹配项 Map 到 Headers 名称和格式化的 Headers 值。默认情况下,您只需要指定标题名称,就会使用默认的org\.apache\.flume\.interceptor\.RegexExtractorInterceptorPassThroughSerializer。该序列化器仅将匹配项 Map 到指定的 Headers 名称,然后将值传递给正则表达式提取的值。您可以使用完全限定的类名(FQCN)将自定义序列化程序实现插入提取程序,以任意方式格式化匹配项。

Example 1:

如果 Flume 事件主体包含1:2:3\.4foobar5,并且使用了以下配置

a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three

提取的事件将包含相同的正文,但将添加以下 Headersone=\>1, two=\>2, three=\>3

Example 2:

如果 Flume 事件主体包含2012\-10\-18 18:47:57,614 some log line,并且使用了以下配置

a1.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)
a1.sources.r1.interceptors.i1.serializers = s1
a1.sources.r1.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
a1.sources.r1.interceptors.i1.serializers.s1.name = timestamp
a1.sources.r1.interceptors.i1.serializers.s1.pattern = yyyy-MM-dd HH:mm

提取的事件将包含相同的正文,但将添加以下 Headerstimestamp=\>1350611220000

Flume Properties

Property NameDefaultDescription
flume.called.from.service如果指定了此属性,则即使在预期位置找不到配置文件,Flume 代理也会 continue 轮询配置文件。否则,如果配置在预期位置不存在,则 Flume 代理将终止。设置此属性时不需要属性值(例如,仅指定-Dflume.call.from.service 就足够了)

Property: flume.called.from.service

Flume 每 30 秒定期轮询一次,以查看对指定配置文件的更改。如果是第一次轮询现有文件,或者自上次轮询以来,现有文件的修改日期已更改,Flume 代理将从配置文件中加载新配置。重命名或移动文件不会更改其修改时间。当 Flume 代理轮询一个不存在的文件时,会发生以下两种情况之一:1.当该代理第一次轮询一个不存在的配置文件时,该代理将根据 flume.call.from.service 属性运行。如果设置了该属性,则代理将 continue 轮询(总是在同一时间段-每 30 秒)。如果未设置该属性,则代理将立即终止。 ...或... 2.当代理轮询一个不存在的配置文件并且这不是第一次轮询该文件时,则代理在此轮询期间不进行任何配置更改。代理 continue 轮询而不是终止。

Configuration Filters

Flume 提供了一种工具,用于以配置过滤器的形式将敏感数据或生成的数据注入到配置中。可以将配置键设置为配置属性的值,并且它将由配置过滤器替换为其表示的值。

配置过滤器的常用用法

该格式类似于 Java 表达式语言,但是目前它不是一个可以正常工作的 EL 表达式解析器,只是一种看起来像它的格式。

<agent_name>.configfilters = <filter_name>
<agent_name>.configfilters.<filter_name>.type = <filter_type>

<agent_name>.sources.<source_name>.parameter = ${<filter_name>['<key_for_sensitive_or_generated_data>']}
<agent_name>.sinks.<sink_name>.parameter = ${<filter_name>['<key_for_sensitive_or_generated_data>']}
<agent_name>.<component_type>.<component_name>.parameter = ${<filter_name>['<key_for_sensitive_or_generated_data>']}
#or
<agent_name>.<component_type>.<component_name>.parameter = ${<filter_name>["<key_for_sensitive_or_generated_data>"]}
#or
<agent_name>.<component_type>.<component_name>.parameter = ${<filter_name>[<key_for_sensitive_or_generated_data>]}
#or
<agent_name>.<component_type>.<component_name>.parameter = some_constant_data${<filter_name>[<key_for_sensitive_or_generated_data>]}

环境变量配置过滤器

Property NameDefaultDescription
type组件类型名称必须为env

Example

要在配置中隐藏密码,请按照以下示例设置其值。

a1.sources = r1
a1.channels = c1
a1.configfilters = f1

a1.configfilters.f1.type = env

a1.sources.r1.channels =  c1
a1.sources.r1.type = http
a1.sources.r1.keystorePassword = ${f1['my_keystore_password']} #will get the value Secret123

在这里a1\.sources\.r1\.keystorePassword配置属性将获取my_keystore_password环境变量的值。设置环境变量的一种方法是像这样运行 flume 代理:

$ my_keystore_password=Secret123 bin/flume\-ng agent \-\-conf conf \-\-conf\-file example\.conf \.\.\.

外部流程配置过滤器

Property NameDefaultDescription
type组件类型名称必须为external
command将执行以获取给定键值的命令。该命令将被调用为:\<command\> \<key\>并预期返回带有退出代码0的单行值。
charsetUTF-8返回字符串的字符集。

Example

要在配置中隐藏密码,请按照以下示例设置其值。

a1.sources = r1
a1.channels = c1
a1.configfilters = f1

a1.configfilters.f1.type = external
a1.configfilters.f1.command = /usr/bin/passwordResolver.sh
a1.configfilters.f1.charset = UTF-8

a1.sources.r1.channels =  c1
a1.sources.r1.type = http
a1.sources.r1.keystorePassword = ${f1['my_keystore_password']} #will get the value Secret123

在此示例中,Sink 将运行以下命令以获取值

$ /usr/bin/passwordResolver\.sh my_keystore_password

passwordResolver\.sh将返回Secret123,并带有退出代码0

Example 2

要生成用于滚动文件接收器的目录的一部分,请按照以下示例设置其值。

a1.sources = r1
a1.channels = c1
a1.configfilters = f1

a1.configfilters.f1.type = external
a1.configfilters.f1.command = /usr/bin/generateUniqId.sh
a1.configfilters.f1.charset = UTF-8

a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume/agent_${f1['agent_name']} # will be /var/log/flume/agent_1234

在此示例中,Sink 将运行以下命令以获取值

$ /usr/bin/generateUniqId\.sh agent_name

generateUniqId\.sh将返回1234,并带有退出代码0

Hadoop 凭据存储配置过滤器

此功能的 Classpath 上需要一个 hadoop-common 库(2.6 版)。如果已安装 hadoop,则代理会自动将其添加到 Classpath 中

Property NameDefaultDescription
type组件类型名称必须为hadoop
credential.provider.path提供者路径。请参阅 hadoop 文档_here:https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/CredentialProviderAPI.html#Configuring_the_Provider_Path
credstore.java-keystore-provider.password-file密码文件的名称(如果使用文件来存储密码)。该文件必须位于 Classpath 上。可以使用 HADOOP_CREDSTORE_PASSWORD 环境变量设置提供者密码,或者将其保留为空。

Example

要在配置中隐藏密码,请按照以下示例设置其值。

a1.sources = r1
a1.channels = c1
a1.configfilters = f1

a1.configfilters.f1.type = hadoop
a1.configfilters.f1.credential.provider.path = jceks://file/<path_to_jceks file>

a1.sources.r1.channels =  c1
a1.sources.r1.type = http
a1.sources.r1.keystorePassword = ${f1['my_keystore_password']} #will get the value from the credential store

Log4J Appender

将 Log4j 事件附加到 Sink 代理的 avro 源。使用此附加程序的 Client 端必须在 Classpath 中具有 flume-ng-sdk(例如 flume-ng-sdk-1.9.0.jar)。必填属性以**表示。

Property NameDefaultDescription
Hostname运行 Flume 代理和 avro 源的主机名。
Port远程 Flume 代理的 Avro 源正在侦听的端口。
UnsafeModefalse如果为 true,则添加程序不会在发送事件失败时引发异常。
AvroReflectionEnabledfalse使用 Avro Reflection 序列化 Log4j 事件。 (当用户记录字符串时请勿使用)
AvroSchemaUrl可以从中检索 Avro 模式的 URL。

sampleslog4j.properties 文件:

#...
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = example.com
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

默认情况下,通过调用toString\(\)或使用 Log4j 布局(如果指定)将每个事件转换为字符串。

如果事件是org\.apache\.avro\.generic\.GenericRecordorg\.apache\.avro\.specific\.SpecificRecord的实例,或者如果属性AvroReflectionEnabled设置为true,则将使用 Avro 序列化对事件进行序列化。

使用其 Avro 模式序列化每个事件效率很低,因此,最好提供一个模式 URL,下游接收器(通常是 HDFS 接收器)可以从中检索模式。如果未指定AvroSchemaUrl,则该架构将作为 FlumeHeaders 包含在内。

配置为使用 Avro 序列化的 sampleslog4j.properties 文件:

#...
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = example.com
log4j.appender.flume.Port = 41414
log4j.appender.flume.AvroReflectionEnabled = true
log4j.appender.flume.AvroSchemaUrl = hdfs://namenode/path/to/schema.avsc

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

Log4J Appender 负载平衡

将 Log4j 事件附加到 Sink 代理的 avro 源列表。使用此附加程序的 Client 端必须在 Classpath 中具有 flume-ng-sdk(例如 flume-ng-sdk-1.9.0.jar)。该追加器支持循环和随机方案以执行负载平衡。它还支持可配置的退避超时,以便将 Down 代理暂时从主机集中删除。必填属性为 bold

Property NameDefaultDescription
Hosts由 Flume(通过 AvroSource)侦听事件的 host:port 的空格分隔列表
SelectorROUND_ROBIN选择机制。对于从 LoadBalancingSelector 继承的类,必须为 ROUND_ROBIN,RANDOM 或自定义 FQDN。
MaxBackoff一个长值,表示负载平衡 Client 端将从未能消耗事件的节点退避的最长时间(以毫秒为单位)。默认为无退避
UnsafeModefalse如果为 true,则添加程序不会在发送事件失败时引发异常。
AvroReflectionEnabledfalse使用 Avro Reflection 序列化 Log4j 事件。
AvroSchemaUrl可以从中检索 Avro 模式的 URL。

使用默认配置的 sampleslog4j.properties 文件:

#...
log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.out2.Hosts = localhost:25430 localhost:25431

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

使用 RANDOM 负载平衡配置的 sampleslog4j.properties 文件:

#...
log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.out2.Hosts = localhost:25430 localhost:25431
log4j.appender.out2.Selector = RANDOM

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

使用回退配置的 sampleslog4j.properties 文件:

#...
log4j.appender.out2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.out2.Hosts = localhost:25430 localhost:25431 localhost:25432
log4j.appender.out2.Selector = ROUND_ROBIN
log4j.appender.out2.MaxBackoff = 30000

# configure a class's logger to output to the flume appender
log4j.logger.org.example.MyClass = DEBUG,flume
#...

Security

HDFS 接收器,HBase 接收器,Thrift 源,Thrift 接收器和 Kite Dataset 接收器均支持 Kerberos 身份验证。请参考相应部分以配置与 Kerberos 相关的选项。

Flume 代理将作为单个主体向 kerberos KDC 进行身份验证,将由需要 kerberos 身份验证的不同组件使用。为 Thrift 源,Thrift 接收器,HDFS 接收器,HBase 接收器和 DataSet 接收器配置的主体和密钥表应相同,否则组件将无法启动。

Monitoring

Flume 中的监视仍在进行中。变化可能经常发生。几个 Flume 组件向 JMX 平台 MBean 服务器报告 Metrics。可以使用 Jconsole 查询这些 Metrics。

可用组件 Metrics

下表显示了可用于组件的度量标准。每个组件仅维护一组 Metrics,用“ x”表示,未维护的 Metrics 显示默认值,即 0.这些表告诉您可以在哪里预期有意义的数据。Metrics 的名称应具有足够的描述性,有关更多信息,您必须深入了解组件的源代码。

Sources 1

AvroExecHTTPJMSKafkaMultiportSyslogTCPScribe
AppendAcceptedCountx
AppendBatchAcceptedCountx xx
AppendBatchReceivedCountx xx
AppendReceivedCountx
ChannelWriteFailx xxxxx
EventAcceptedCountxxxxxxx
EventReadFail xxxxx
EventReceivedCountxxxxxxx
GenericProcessingFail x x
KafkaCommitTimer x
KafkaEmptyCount x
KafkaEventGetTimer x
OpenConnectionCountx

Sources 2

SequenceGeneratorSpoolDirectorySyslogTcpSyslogUDPTaildirThrift
AppendAcceptedCount x
AppendBatchAcceptedCountxx xx
AppendBatchReceivedCount x xx
AppendReceivedCount x
ChannelWriteFailxxxxxx
EventAcceptedCountxxxxxx
EventReadFail xxxx
EventReceivedCount xxxxx
GenericProcessingFail x x
KafkaCommitTimer
KafkaEmptyCount
KafkaEventGetTimer
OpenConnectionCount

Sinks 1

Avro/ThriftAsyncHBaseElasticSearchHBaseHBase2
BatchCompleteCountxxxxx
BatchEmptyCountxxxxx
BatchUnderflowCountxxxxx
ChannelReadFailx x
ConnectionClosedCountxxxxx
ConnectionCreatedCountxxxxx
ConnectionFailedCountxxxxx
EventDrainAttemptCountxxxxx
EventDrainSuccessCountxxxxx
EventWriteFailx x
KafkaEventSendTimer
RollbackCount

Sinks 2

HDFSEventHiveHttpKafkaMorphlineRollingFile
BatchCompleteCountxx x
BatchEmptyCountxx xx
BatchUnderflowCountxx xx
ChannelReadFailxxxxxx
ConnectionClosedCountxx x
ConnectionCreatedCountxx x
ConnectionFailedCountxx x
EventDrainAttemptCountxxx xx
EventDrainSuccessCountxxxxxx
EventWriteFailxxxxxx
KafkaEventSendTimer x
RollbackCount x

Channels

FileKafkaMemoryPseudoTxnMemorySpillableMemory
ChannelCapacityx x x
ChannelSizex xxx
CheckpointBackupWriteErrorCountx
CheckpointWriteErrorCountx
EventPutAttemptCountxxxxx
EventPutErrorCountx
EventPutSuccessCountxxxxx
EventTakeAttemptCountxxxxx
EventTakeErrorCountx
EventTakeSuccessCountxxxxx
KafkaCommitTimer x
KafkaEventGetTimer x
KafkaEventSendTimer x
Openx
RollbackCounter x
Unhealthyx

JMX Reporting

可以通过使用 flume-env.sh 在 JAVA_OPTS 环境变量中指定 JMX 参数来启用 JMX 报告。

Note

export JAVA_OPTS =“-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port = 5445 -Dcom.sun.management.jmxremote.authenticate = false -Dcom.sun.management.jmxremote.ssl = false”

注意:上面的示例禁用了安全性。要启用安全性,请参阅http://docs.oracle.com/javase/6/docs/technotes/guides/management/agent.html

Ganglia Reporting

Flume 还可以将这些 Metrics 报告给 Ganglia 3 或 Ganglia 3.1 元节点。要将 Metrics 报告给 Ganglia,必须在此支持下启动 Sink 代理。 Flume 代理必须通过传递以下参数作为系统属性(以flume\.monitoring\.开头)来启动,并且可以在 flume-env.sh 中指定:

Property NameDefaultDescription
type组件类型名称,必须为ganglia
hosts以逗号分隔的hostname:port个 Ganglia 服务器列表
pollFrequency60连续报告给 Ganglia 服务器之间的时间(以秒为单位)
isGanglia3falseGanglia 服务器版本为 3.默认情况下,Flume 以 Ganglia 3.1 格式发送

我们可以通过 Ganglia 支持启动 Flume,如下所示:

$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=com.example:1234,com.example2:5455

JSON Reporting

Flume 还可以以 JSON 格式报告 Metrics。为了启用 JSON 格式的报告,Flume 在可配置端口上托管了 Web 服务器。 Flume 以以下 JSON 格式报告 Metrics:

{
"typeName1.componentName1" : {"metric1" : "metricValue1", "metric2" : "metricValue2"},
"typeName2.componentName2" : {"metric3" : "metricValue3", "metric4" : "metricValue4"}
}

这是一个例子:

{
"CHANNEL.fileChannel":{"EventPutSuccessCount":"468085",
                      "Type":"CHANNEL",
                      "StopTime":"0",
                      "EventPutAttemptCount":"468086",
                      "ChannelSize":"233428",
                      "StartTime":"1344882233070",
                      "EventTakeSuccessCount":"458200",
                      "ChannelCapacity":"600000",
                      "EventTakeAttemptCount":"458288"},
"CHANNEL.memChannel":{"EventPutSuccessCount":"22948908",
                   "Type":"CHANNEL",
                   "StopTime":"0",
                   "EventPutAttemptCount":"22948908",
                   "ChannelSize":"5",
                   "StartTime":"1344882209413",
                   "EventTakeSuccessCount":"22948900",
                   "ChannelCapacity":"100",
                   "EventTakeAttemptCount":"22948908"}
}
Property NameDefaultDescription
type组件类型名称,必须为http
port41414用于启动服务器的端口。

我们可以使用 JSON Reporting 支持来启动 Flume,如下所示:

$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545

然后,可以在 http://: /metrics 网页上获得 Metrics。自定义组件可以报告 Metrics,如上文“Ganglia”部分所述。

Custom Reporting

通过编写执行报告的服务器,可以向其他系统报告 Metrics。任何报告类都必须实现org\.apache\.flume\.instrumentation\.MonitorService接口。可以像使用 GangliaServer 进行报告一样使用此类。他们可以轮询平台 mbean 服务器以轮询 mbean 以获取 Metrics。例如,如果可以按如下方式使用名为HTTPReporting的 HTTP 监视服务:

$ bin/flume-ng agent --conf-file example.conf --name a1 -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332
Property NameDefaultDescription
type组件类型名称,必须为 FQCN

自定义组件的报告 Metrics

任何自定义的 Sink 组件都应继承org\.apache\.flume\.instrumentation\.MonitoredCounterGroup类。然后,该类应为其公开的每个 Metrics 提供 getter 方法。请参见下面的代码。 MonitoredCounterGroup 需要一个此类可以公开其度量的属性列表。到目前为止,此类仅支持将 Metrics 公开为长值。

public class SinkCounter extends MonitoredCounterGroup implements
    SinkCounterMBean {

  private static final String COUNTER_CONNECTION_CREATED =
    "sink.connection.creation.count";

  private static final String COUNTER_CONNECTION_CLOSED =
    "sink.connection.closed.count";

  private static final String COUNTER_CONNECTION_FAILED =
    "sink.connection.failed.count";

  private static final String COUNTER_BATCH_EMPTY =
    "sink.batch.empty";

  private static final String COUNTER_BATCH_UNDERFLOW =
      "sink.batch.underflow";

  private static final String COUNTER_BATCH_COMPLETE =
    "sink.batch.complete";

  private static final String COUNTER_EVENT_DRAIN_ATTEMPT =
    "sink.event.drain.attempt";

  private static final String COUNTER_EVENT_DRAIN_SUCCESS =
    "sink.event.drain.sucess";

  private static final String[] ATTRIBUTES = {
    COUNTER_CONNECTION_CREATED, COUNTER_CONNECTION_CLOSED,
    COUNTER_CONNECTION_FAILED, COUNTER_BATCH_EMPTY,
    COUNTER_BATCH_UNDERFLOW, COUNTER_BATCH_COMPLETE,
    COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS
  };

  public SinkCounter(String name) {
    super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES);
  }

  @Override
  public long getConnectionCreatedCount() {
    return get(COUNTER_CONNECTION_CREATED);
  }

  public long incrementConnectionCreatedCount() {
    return increment(COUNTER_CONNECTION_CREATED);
  }

}

Tools

文件通道完整性工具

文件通道完整性工具可验证文件通道中各个事件的完整性,并删除损坏的事件。

这些工具可以按以下方式运行:

$bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir

其中 datadir 是要验证的数据目录的逗号分隔列表。

以下是可用的选项

Option NameDescription
h/helpDisplays help
l/dataDirs该工具必须验证的数据目录的逗号分隔列表

事件验证器工具

事件验证器工具可用于以特定于应用程序的方式验证文件通道事件。该工具将用户提供者验证登录名应用于每个事件,并将未确认的事件丢弃到逻辑中。

这些工具可以按以下方式运行:

$bin/flume-ng tool --conf ./conf FCINTEGRITYTOOL -l ./datadir -e org.apache.flume.MyEventValidator -DmaxSize 2000

其中 datadir 是要验证的数据目录的逗号分隔列表。

以下是可用的选项

Option NameDescription
h/helpDisplays help
l/dataDirs该工具必须验证的数据目录的逗号分隔列表
e/eventValidator事件验证程序实现的全限定名称。罐子必须在 FlumeClasspath 上

事件验证器实现必须实现 EventValidator 接口。建议不要在实现中抛出任何异常,因为它们被视为无效事件。可以通过-D 选项将其他参数传递给 EventValitor 实现。

让我们看一个基于简单大小的事件验证器的示例,该示例将拒绝大于指定最大大小的事件。

public static class MyEventValidator implements EventValidator {

  private int value = 0;

  private MyEventValidator(int val) {
    value = val;
  }

  @Override
  public boolean validateEvent(Event event) {
    return event.getBody() <= value;
  }

  public static class Builder implements EventValidator.Builder {

    private int sizeValidator = 0;

    @Override
    public EventValidator build() {
      return new DummyEventVerifier(sizeValidator);
    }

    @Override
    public void configure(Context context) {
      binaryValidator = context.getInteger("maxSize");
    }
  }
}

拓扑设计注意事项

Flume 非常灵活,并允许各种可能的部署方案。如果您打算在大型的生产部署中使用 Flume,则应花一些时间考虑如何用 Flume 拓扑表达问题。本节涵盖一些注意事项。

Flume 非常适合您的问题吗?

如果您需要将文本日志数据提取到 Hadoop/HDFS 中,那么 Flume 非常适合您的问题,请停下来。对于其他用例,以下是一些准则:

Flume 旨在在相对稳定,可能复杂的拓扑结构上传输和吸收定期生成的事件数据。 “事件数据”的概念定义非常广泛。对 Flume 而言,事件只是字节的一般 Blob。事件的大小有一些限制-例如,事件的大小不能大于您在单个计算机上的内存或磁盘中存储的大小-但实际上,Sink 事件可以是从文本日志条目到图像文件的所有内容。事件的关键属性是它们以连续的流方式生成。如果您的数据不是定期生成的(即您正在尝试将单个数据批量加载到 Hadoop 集群中),那么 Flume 仍然可以正常工作,但是对于您的情况而言可能是过大了。 Flume 喜欢相对稳定的拓扑。您的拓扑不需要是不变的,因为 Flume 可以处理拓扑变化而不会丢失数据,并且还可以容忍由于故障转移或配置而导致的定期重新配置。如果您每天都要更改拓扑,则可能无法很好地工作,因为重新配置需要一些考虑和开销。

Flume 中的流量可靠性

Flume 流量的可靠性取决于多个因素。通过调整这些因素,您可以使用 Flume 实现多种可靠性选项。

您使用哪种类型的通道. Flume 同时具有持久性通道(那些将数据持久保存到磁盘)和非持久性通道(如果机器出现故障,这些通道将丢失数据)。耐用的通道使用基于磁盘的存储,并且在这些通道中存储的数据将在计算机重新启动或与磁盘无关的故障之间保持不变。

您的通道是否已为工作负载充分配置. Flume 中的通道充当各种跃点的缓冲区。这些缓冲区具有固定的容量,一旦容量已满,您将在流中较早的点上产生背压。如果此压力传播到流源,则 Flume 将不可用,并且可能会丢失数据。

是否使用冗余拓扑. Flume 让您跨冗余拓扑复制流。这可以提供非常容易的容错资源,并且可以克服磁盘或计算机故障。

*在 Flume 拓扑中考虑可靠性的最佳方法是考虑各种故障情况及其后果.*如果磁盘发生故障会怎样?如果机器出现故障会怎样?如果您的终端接收器(例如 HDFS)下降了一段时间并承受了背压怎么办?可能的设计空间很大,但是您需要提出的基本问题很少。

Flume 拓扑设计

设计 Flume 拓扑的第一步是枚举数据的所有源和目标(终端接收器)。这些将定义拓扑的边缘点。接下来要考虑的是引入中间聚合层还是事件路由。如果要从大量来源收集数据,则汇总数据以简化终端接收器的接收可能会很有帮助。聚合层还可以充当缓冲区,从而消除源中的突发性或接收器上的不可用。如果要在不同位置之间路由数据,则可能还需要在各个点拆分流:这会创建子拓扑,这些子拓扑本身可能包括聚合点。

调整 Flume 部署的规模

一旦了解了拓扑的外观,下一个问题就是需要多少硬件和网络容量。首先要量化生成的数据量。这并不总是一件容易的事!大多数数据流都是突发性的(例如,由于昼夜模式),并且可能不可预测。一个好的出发点是考虑拓扑的每一层中的最大吞吐量,无论是每秒事件数还是每秒字节数。一旦知道给定层的所需吞吐量,就可以计算该层所需的节点数的下限。为了确定可达到的吞吐量,最好在硬件上使用合成或采样事件数据在 Flume 上进行实验。通常,基于磁盘的通道应获得 10 的 MB/s,而基于内存的通道应获得 100 的 MB/s 或更多。性能将有很大的不同,但是取决于硬件和操作环境。

调整总吞吐量的大小可以使您对每层所需的节点数有一个下限。有额外的节点有几个原因,例如增加的冗余性和更好的吸收负载突发的能力。

Troubleshooting

处理代理失败

如果 Flume 代理关闭,则该代理上托管的所有流都将中止。代理重新启动后,流程将恢复。使用文件通道或其他稳定通道的流将在中断处恢复处理事件。如果无法在同一硬件上重新启动代理,则可以选择将数据库迁移到另一硬件并设置新的 Flume 代理,该代理可以 continue 处理数据库中保存的事件。可以利用数据库 HA 期货将 Flume 代理移至另一台主机。

Compatibility

HDFS

目前,Flume 支持 HDFS 0.20.2 和 0.23.

AVRO

TBD

其他版本要求

TBD

Tracing

TBD

更多示例配置

TBD

Component Summary

Component InterfaceType AliasImplementation Class
org.apache.flume.Channelmemoryorg.apache.flume.channel.MemoryChannel
org.apache.flume.Channeljdbcorg.apache.flume.channel.jdbc.JdbcChannel
org.apache.flume.Channelfileorg.apache.flume.channel.file.FileChannel
org.apache.flume.Channelorg.apache.flume.channel.PseudoTxnMemoryChannel
org.apache.flume.Channelorg.example.MyChannel
org.apache.flume.Sourceavroorg.apache.flume.source.AvroSource
org.apache.flume.Sourcenetcatorg.apache.flume.source.NetcatSource
org.apache.flume.Sourceseqorg.apache.flume.source.SequenceGeneratorSource
org.apache.flume.Sourceexecorg.apache.flume.source.ExecSource
org.apache.flume.Sourcesyslogtcporg.apache.flume.source.SyslogTcpSource
org.apache.flume.Sourcemultiport_syslogtcporg.apache.flume.source.MultiportSyslogTCPSource
org.apache.flume.Sourcesyslogudporg.apache.flume.source.SyslogUDPSource
org.apache.flume.Sourcespooldirorg.apache.flume.source.SpoolDirectorySource
org.apache.flume.Sourcehttporg.apache.flume.source.http.HTTPSource
org.apache.flume.Sourcethriftorg.apache.flume.source.ThriftSource
org.apache.flume.Sourcejmsorg.apache.flume.source.jms.JMSSource
org.apache.flume.Sourceorg.apache.flume.source.avroLegacy.AvroLegacySource
org.apache.flume.Sourceorg.apache.flume.source.thriftLegacy.ThriftLegacySource
org.apache.flume.Sourceorg.example.MySource
org.apache.flume.Sinknullorg.apache.flume.sink.NullSink
org.apache.flume.Sinkloggerorg.apache.flume.sink.LoggerSink
org.apache.flume.Sinkavroorg.apache.flume.sink.AvroSink
org.apache.flume.Sinkhdfsorg.apache.flume.sink.hdfs.HDFSEventSink
org.apache.flume.Sinkhbaseorg.apache.flume.sink.hbase.HBaseSink
org.apache.flume.Sinkhbase2org.apache.flume.sink.hbase2.HBase2Sink
org.apache.flume.Sinkasynchbaseorg.apache.flume.sink.hbase.AsyncHBaseSink
org.apache.flume.Sinkelasticsearchorg.apache.flume.sink.elasticsearch.ElasticSearchSink
org.apache.flume.Sinkfile_rollorg.apache.flume.sink.RollingFileSink
org.apache.flume.Sinkircorg.apache.flume.sink.irc.IRCSink
org.apache.flume.Sinkthriftorg.apache.flume.sink.ThriftSink
org.apache.flume.Sinkorg.example.MySink
org.apache.flume.ChannelSelectorreplicatingorg.apache.flume.channel.ReplicatingChannelSelector
org.apache.flume.ChannelSelectormultiplexingorg.apache.flume.channel.MultiplexingChannelSelector
org.apache.flume.ChannelSelectororg.example.MyChannelSelector
org.apache.flume.SinkProcessordefaultorg.apache.flume.sink.DefaultSinkProcessor
org.apache.flume.SinkProcessorfailoverorg.apache.flume.sink.FailoverSinkProcessor
org.apache.flume.SinkProcessorload_balanceorg.apache.flume.sink.LoadBalancingSinkProcessor
org.apache.flume.SinkProcessor
org.apache.flume.interceptor.Interceptortimestamporg.apache.flume.interceptor.TimestampInterceptor$Builder
org.apache.flume.interceptor.Interceptorhostorg.apache.flume.interceptor.HostInterceptor$Builder
org.apache.flume.interceptor.Interceptorstaticorg.apache.flume.interceptor.StaticInterceptor$Builder
org.apache.flume.interceptor.Interceptorregex_filterorg.apache.flume.interceptor.RegexFilteringInterceptor$Builder
org.apache.flume.interceptor.Interceptorregex_extractororg.apache.flume.interceptor.RegexFilteringInterceptor$Builder
org.apache.flume.channel.file.encryption.KeyProvider$Builderjceksfileorg.apache.flume.channel.file.encryption.JCEFileKeyProvider
org.apache.flume.channel.file.encryption.KeyProvider$Builderorg.example.MyKeyProvider
org.apache.flume.channel.file.encryption.CipherProvideraesctrnopaddingorg.apache.flume.channel.file.encryption.AESCTRNoPaddingProvider
org.apache.flume.channel.file.encryption.CipherProviderorg.example.MyCipherProvider
org.apache.flume.serialization.EventSerializer$Buildertextorg.apache.flume.serialization.BodyTextEventSerializer$Builder
org.apache.flume.serialization.EventSerializer$Builderavro_eventorg.apache.flume.serialization.FlumeEventAvroEventSerializer$Builder
org.apache.flume.serialization.EventSerializer$Builderorg.example.MyEventSerializer$Builder

Alias Conventions

这些别名的约定在上面的特定于组件的示例中使用,以使名称简短且在所有示例中保持一致。

Alias NameAlias Type
aa gent
cc hannel
r资源
ksin k
g下沉 g
ii nterceptor
yke y
hh ost
ss erializer