Flume 1.9.0 User Guide

Introduction

Overview

Apache Flume is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store.

The use of Apache Flume is not only restricted to log data aggregation. Since data sources are customizable, Flume can be used to transport massive quantities of event data including but not limited to network traffic data, social-media-generated data, email messages and pretty much any data source possible.

Apache Flume is a top level project at the Apache Software Foundation.

System Requirements

  1. Java Runtime Environment - Java 1.8 or later
  2. Memory - Sufficient memory for configurations used by sources, channels or sinks
  3. Disk Space - Sufficient disk space for configurations used by channels or sinks
  4. Directory Permissions - Read/Write permissions for directories used by agent

Architecture

Data flow model

A Flume event is defined as a unit of data flow having a byte payload and an optional set of string attributes. A Flume agent is a (JVM) process that hosts the components through which events flow from an external source to the next destination (hop).

Agent component diagram

A Flume source consumes events delivered to it by an external source like a web server. The external source sends events to Flume in a format that is recognized by the target Flume source. For example, an Avro Flume source can be used to receive Avro events from Avro clients or other Flume agents in the flow that send events from an Avro sink. A similar flow can be defined using a Thrift Flume Source to receive events from a Thrift Sink or a Flume Thrift Rpc Client or Thrift clients written in any language generated from the Flume thrift protocol.When a Flume source receives an event, it stores it into one or more channels. The channel is a passive store that keeps the event until it’s consumed by a Flume sink. The file channel is one example – it is backed by the local filesystem. The sink removes the event from the channel and puts it into an external repository like HDFS (via Flume HDFS sink) or forwards it to the Flume source of the next Flume agent (next hop) in the flow. The source and sink within the given agent run asynchronously with the events staged in the channel.

Complex flows

Flume allows a user to build multi-hop flows where events travel through multiple agents before reaching the final destination. It also allows fan-in and fan-out flows, contextual routing and backup routes (fail-over) for failed hops.

Reliability

The events are staged in a channel on each agent. The events are then delivered to the next agent or terminal repository (like HDFS) in the flow. The events are removed from a channel only after they are stored in the channel of next agent or in the terminal repository. This is a how the single-hop message delivery semantics in Flume provide end-to-end reliability of the flow.

Flume uses a transactional approach to guarantee the reliable delivery of the events. The sources and sinks encapsulate in a transaction the storage/retrieval, respectively, of the events placed in or provided by a transaction provided by the channel. This ensures that the set of events are reliably passed from point to point in the flow. In the case of a multi-hop flow, the sink from the previous hop and the source from the next hop both have their transactions running to ensure that the data is safely stored in the channel of the next hop.

Recoverability

The events are staged in the channel, which manages recovery from failure. Flume supports a durable file channel which is backed by the local file system. There’s also a memory channel which simply stores the events in an in-memory queue, which is faster but any events still left in the memory channel when an agent process dies can’t be recovered.

Setup

Setting up an agent

Flume agent configuration is stored in a local configuration file. This is a text file that follows the Java properties file format. Configurations for one or more agents can be specified in the same configuration file. The configuration file includes properties of each source, sink and channel in an agent and how they are wired together to form data flows.

Configuring individual components

Each component (source, sink or channel) in the flow has a name, type, and set of properties that are specific to the type and instantiation. For example, an Avro source needs a hostname (or IP address) and a port number to receive data from. A memory channel can have max queue size (“capacity”), and an HDFS sink needs to know the file system URI, path to create files, frequency of file rotation (“hdfs.rollInterval”) etc. All such attributes of a component needs to be set in the properties file of the hosting Flume agent.

Wiring the pieces together

The agent needs to know what individual components to load and how they are connected in order to constitute the flow. This is done by listing the names of each of the sources, sinks and channels in the agent, and then specifying the connecting channel for each sink and source. For example, an agent flows events from an Avro source called avroWeb to HDFS sink hdfs-cluster1 via a file channel called file-channel. The configuration file will contain names of these components and file-channel as a shared channel for both avroWeb source and hdfs-cluster1 sink.

Starting an agent

An agent is started using a shell script called flume-ng which is located in the bin directory of the Flume distribution. You need to specify the agent name, the config directory, and the config file on the command line:

$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

Now the agent will start running source and sinks configured in the given properties file.

A simple example

Here, we give an example configuration file, describing a single-node Flume deployment. This configuration lets a user generate events and subsequently logs them to the console.

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

This configuration defines a single agent named a1. a1 has a source that listens for data on port 44444, a channel that buffers event data in memory, and a sink that logs event data to the console. The configuration file names the various components, then describes their types and configuration parameters. A given configuration file might define several named agents; when a given Flume process is launched a flag is passed telling it which named agent to manifest.

Given this configuration file, we can start Flume as follows:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

Note that in a full deployment we would typically include one more option: --conf=<conf-dir>. The <conf-dir> directory would include a shell script flume-env.sh and potentially a log4j properties file. In this example, we pass a Java option to force Flume to log to the console and we go without a custom environment script.

From a separate terminal, we can then telnet port 44444 and send Flume an event:

$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK

The original Flume terminal will output the event in a log message.

12/06/19 15:32:19 INFO source.NetcatSource: Source starting
12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D          Hello world!. }

Congratulations - you’ve successfully configured and deployed a Flume agent! Subsequent sections cover agent configuration in much more detail.

Using environment variables in configuration files

Flume has the ability to substitute environment variables in the configuration. For example:

a1.sources = r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = ${NC_PORT}
a1.sources.r1.channels = c1

NB: it currently works for values only, not for keys. (Ie. only on the “right side” of the = mark of the config lines.)

This can be enabled via Java system properties on agent invocation by setting propertiesImplementation = org.apache.flume.node.EnvVarResolverProperties.

For example::
$ NC_PORT=44444 bin/flume-ng agent –conf conf –conf-file example.conf –name a1 -Dflume.root.logger=INFO,console -DpropertiesImplementation=org.apache.flume.node.EnvVarResolverProperties

Note the above is just an example, environment variables can be configured in other ways, including being set in conf/flume-env.sh.

Logging raw data

Logging the raw stream of data flowing through the ingest pipeline is not desired behaviour in many production environments because this may result in leaking sensitive data or security related configurations, such as secret keys, to Flume log files. By default, Flume will not log such information. On the other hand, if the data pipeline is broken, Flume will attempt to provide clues for debugging the problem.

One way to debug problems with event pipelines is to set up an additional Memory Channel connected to a Logger Sink, which will output all event data to the Flume logs. In some situations, however, this approach is insufficient.

In order to enable logging of event- and configuration-related data, some Java system properties must be set in addition to log4j properties.

To enable configuration-related logging, set the Java system property -Dorg.apache.flume.log.printconfig=true. This can either be passed on the command line or by setting this in the JAVA_OPTS variable in flume-env.sh.

To enable data logging, set the Java system property -Dorg.apache.flume.log.rawdata=true in the same way described above. For most components, the log4j logging level must also be set to DEBUG or TRACE to make event-specific logging appear in the Flume logs.

Here is an example of enabling both configuration logging and raw data logging while also setting the Log4j loglevel to DEBUG for console output:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true

Zookeeper based Configuration

Flume supports Agent configurations via Zookeeper. This is an experimental feature. The configuration file needs to be uploaded in the Zookeeper, under a configurable prefix. The configuration file is stored in Zookeeper Node data. Following is how the Zookeeper Node tree would look like for agents a1 and a2

- /flume
 |- /a1 [Agent config file]
 |- /a2 [Agent config file]

Once the configuration file is uploaded, start the agent with following options

$ bin/flume-ng agent –conf conf -z zkhost:2181,zkhost1:2181 -p /flume –name a1 -Dflume.root.logger=INFO,console
Argument Name Default Description
z Zookeeper connection string. Comma separated list of hostname:port
p /flume Base Path in Zookeeper to store Agent configurations

Installing third-party plugins

Flume has a fully plugin-based architecture. While Flume ships with many out-of-the-box sources, channels, sinks, serializers, and the like, many implementations exist which ship separately from Flume.

While it has always been possible to include custom Flume components by adding their jars to the FLUME_CLASSPATH variable in the flume-env.sh file, Flume now supports a special directory called plugins.d which automatically picks up plugins that are packaged in a specific format. This allows for easier management of plugin packaging issues as well as simpler debugging and troubleshooting of several classes of issues, especially library dependency conflicts.

The plugins.d directory

The plugins.d directory is located at $FLUME_HOME/plugins.d. At startup time, the flume-ng start script looks in the plugins.d directory for plugins that conform to the below format and includes them in proper paths when starting up java.

Directory layout for plugins

Each plugin (subdirectory) within plugins.d can have up to three sub-directories:

  1. lib - the plugin’s jar(s)
  2. libext - the plugin’s dependency jar(s)
  3. native - any required native libraries, such as .so files

Example of two plugins within the plugins.d directory:

plugins.d/
plugins.d/custom-source-1/
plugins.d/custom-source-1/lib/my-source.jar
plugins.d/custom-source-1/libext/spring-core-2.5.6.jar
plugins.d/custom-source-2/
plugins.d/custom-source-2/lib/custom.jar
plugins.d/custom-source-2/native/gettext.so

Data ingestion

Flume supports a number of mechanisms to ingest data from external sources.

RPC

An Avro client included in the Flume distribution can send a given file to Flume Avro source using avro RPC mechanism:

$ bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10

The above command will send the contents of /usr/logs/log.10 to to the Flume source listening on that ports.

Executing commands

There’s an exec source that executes a given command and consumes the output. A single ‘line’ of output ie. text followed by carriage return (‘\r’) or line feed (‘\n’) or both together.

Network streams

Flume supports the following mechanisms to read data from popular log stream types, such as:

  1. Avro
  2. Thrift
  3. Syslog
  4. Netcat

Setting multi-agent flow

Two agents communicating over Avro RPC

In order to flow the data across multiple agents or hops, the sink of the previous agent and source of the current hop need to be avro type with the sink pointing to the hostname (or IP address) and port of the source.

Consolidation

A very common scenario in log collection is a large number of log producing clients sending data to a few consumer agents that are attached to the storage subsystem. For example, logs collected from hundreds of web servers sent to a dozen of agents that write to HDFS cluster.

A fan-in flow using Avro RPC to consolidate events in one place

This can be achieved in Flume by configuring a number of first tier agents with an avro sink, all pointing to an avro source of single agent (Again you could use the thrift sources/sinks/clients in such a scenario). This source on the second tier agent consolidates the received events into a single channel which is consumed by a sink to its final destination.

Multiplexing the flow

Flume supports multiplexing the event flow to one or more destinations. This is achieved by defining a flow multiplexer that can replicate or selectively route an event to one or more channels.

A fan-out flow using a (multiplexing) channel selector

The above example shows a source from agent “foo” fanning out the flow to three different channels. This fan out can be replicating or multiplexing. In case of replicating flow, each event is sent to all three channels. For the multiplexing case, an event is delivered to a subset of available channels when an event’s attribute matches a preconfigured value. For example, if an event attribute called “txnType” is set to “customer”, then it should go to channel1 and channel3, if it’s “vendor” then it should go to channel2, otherwise channel3. The mapping can be set in the agent’s configuration file.

Configuration

As mentioned in the earlier section, Flume agent configuration is read from a file that resembles a Java property file format with hierarchical property settings.

Defining the flow

To define the flow within a single agent, you need to link the sources and sinks via a channel. You need to list the sources, sinks and channels for the given agent, and then point the source and sink to a channel. A source instance can specify multiple channels, but a sink instance can only specify one channel. The format is as follows:

# list the sources, sinks and channels for the agent
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>

# set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...

# set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>

For example, an agent named agent_foo is reading data from an external avro client and sending it to HDFS via a memory channel. The config file weblog.config could look like:

# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.sinks = hdfs-sink-1
agent_foo.channels = mem-channel-1

# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1

# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1

This will make the events flow from avro-AppSrv-source to hdfs-Cluster1-sink through the memory channel mem-channel-1. When the agent is started with the weblog.config as its config file, it will instantiate that flow.

Configuring individual components

After defining the flow, you need to set properties of each source, sink and channel. This is done in the same hierarchical namespace fashion where you set the component type and other values for the properties specific to each component:

# properties for sources
<Agent>.sources.<Source>.<someProperty> = <someValue>

# properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue>

# properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>

The property “type” needs to be set for each component for Flume to understand what kind of object it needs to be. Each source, sink and channel type has its own set of properties required for it to function as intended. All those need to be set as needed. In the previous example, we have a flow from avro-AppSrv-source to hdfs-Cluster1-sink through the memory channel mem-channel-1. Here’s an example that shows configuration of each of those components:

agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1

# set channel for sources, sinks

# properties of avro-AppSrv-source
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000

# properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100

# properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata

#...

Adding multiple flows in an agent

A single Flume agent can contain several independent flows. You can list multiple sources, sinks and channels in a config. These components can be linked to form multiple flows:

# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

Then you can link the sources and sinks to their corresponding channels (for sources) of channel (for sinks) to setup two different flows. For example, if you need to setup two flows in an agent, one going from an external avro client to external HDFS and another from output of a tail to avro sink, then here’s a config to do that:

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1 exec-tail-source2
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2

# flow #1 configuration
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1

# flow #2 configuration
agent_foo.sources.exec-tail-source2.channels = file-channel-2
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2

Configuring a multi agent flow

To setup a multi-tier flow, you need to have an avro/thrift sink of first hop pointing to avro/thrift source of the next hop. This will result in the first Flume agent forwarding events to the next Flume agent. For example, if you are periodically sending files (1 file per event) using avro client to a local Flume agent, then this local agent can forward it to another agent that has the mounted for storage.

Weblog agent config:

# list sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = avro-forward-sink
agent_foo.channels = file-channel

# define the flow
agent_foo.sources.avro-AppSrv-source.channels = file-channel
agent_foo.sinks.avro-forward-sink.channel = file-channel

# avro sink properties
agent_foo.sinks.avro-forward-sink.type = avro
agent_foo.sinks.avro-forward-sink.hostname = 10.1.1.100
agent_foo.sinks.avro-forward-sink.port = 10000

# configure other pieces
#...

HDFS agent config:

# list sources, sinks and channels in the agent
agent_foo.sources = avro-collection-source
agent_foo.sinks = hdfs-sink
agent_foo.channels = mem-channel

# define the flow
agent_foo.sources.avro-collection-source.channels = mem-channel
agent_foo.sinks.hdfs-sink.channel = mem-channel

# avro source properties
agent_foo.sources.avro-collection-source.type = avro
agent_foo.sources.avro-collection-source.bind = 10.1.1.100
agent_foo.sources.avro-collection-source.port = 10000

# configure other pieces
#...

Here we link the avro-forward-sink from the weblog agent to the avro-collection-source of the hdfs agent. This will result in the events coming from the external appserver source eventually getting stored in HDFS.

Fan out flow

As discussed in previous section, Flume supports fanning out the flow from one source to multiple channels. There are two modes of fan out, replicating and multiplexing. In the replicating flow, the event is sent to all the configured channels. In case of multiplexing, the event is sent to only a subset of qualifying channels. To fan out the flow, one needs to specify a list of channels for a source and the policy for the fanning it out. This is done by adding a channel “selector” that can be replicating or multiplexing. Then further specify the selection rules if it’s a multiplexer. If you don’t specify a selector, then by default it’s replicating:

# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>

# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>

<Agent>.sources.<Source1>.selector.type = replicating

The multiplexing select has a further set of properties to bifurcate the flow. This requires specifying a mapping of an event attribute to a set for channel. The selector checks for each configured attribute in the event header. If it matches the specified value, then that event is sent to all the channels mapped to that value. If there’s no match, then the event is sent to set of channels configured as default:

# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...

<Agent>.sources.<Source1>.selector.default = <Channel2>

The mapping allows overlapping the channels for each value.

The following example has a single flow that multiplexed to two paths. The agent named agent_foo has a single avro source and two channels linked to two sinks:

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2

# set channels for source
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2

# set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2

# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

The selector checks for a header called “State”. If the value is “CA” then its sent to mem-channel-1, if its “AZ” then it goes to file-channel-2 or if its “NY” then both. If the “State” header is not set or doesn’t match any of the three, then it goes to mem-channel-1 which is designated as ‘default’.

The selector also supports optional channels. To specify optional channels for a header, the config parameter ‘optional’ is used in the following way:

# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

The selector will attempt to write to the required channels first and will fail the transaction if even one of these channels fails to consume the events. The transaction is reattempted on all of the channels. Once all required channels have consumed the events, then the selector will attempt to write to the optional channels. A failure by any of the optional channels to consume the event is simply ignored and not retried.

If there is an overlap between the optional channels and required channels for a specific header, the channel is considered to be required, and a failure in the channel will cause the entire set of required channels to be retried. For instance, in the above example, for the header “CA” mem-channel-1 is considered to be a required channel even though it is marked both as required and optional, and a failure to write to this channel will cause that event to be retried on all channels configured for the selector.

Note that if a header does not have any required channels, then the event will be written to the default channels and will be attempted to be written to the optional channels for that header. Specifying optional channels will still cause the event to be written to the default channels, if no required channels are specified. If no channels are designated as default and there are no required, the selector will attempt to write the events to the optional channels. Any failures are simply ignored in that case.

SSL/TLS support

Several Flume components support the SSL/TLS protocols in order to communicate with other systems securely.

Component SSL server or client
Avro Source server
Avro Sink client
Thrift Source server
Thrift Sink client
Kafka Source client
Kafka Channel client
Kafka Sink client
HTTP Source server
JMS Source client
Syslog TCP Source server
Multiport Syslog TCP Source server

The SSL compatible components have several configuration parameters to set up SSL, like enable SSL flag, keystore / truststore parameters (location, password, type) and additional SSL parameters (eg. disabled protocols).

Enabling SSL for a component is always specified at component level in the agent configuration file. So some components may be configured to use SSL while others not (even with the same component type).

The keystore / truststore setup can be specified at component level or globally.

In case of the component level setup, the keystore / truststore is configured in the agent configuration file through component specific parameters. The advantage of this method is that the components can use different keystores (if this would be needed). The disadvantage is that the keystore parameters must be copied for each component in the agent configuration file. The component level setup is optional, but if it is defined, it has higher precedence than the global parameters.

With the global setup, it is enough to define the keystore / truststore parameters once and use the same settings for all components, which means less and more centralized configuration.

The global setup can be configured either through system properties or through environment variables.

System property Environment variable Description
javax.net.ssl.keyStore FLUME_SSL_KEYSTORE_PATH Keystore location
javax.net.ssl.keyStorePassword FLUME_SSL_KEYSTORE_PASSWORD Keystore password
javax.net.ssl.keyStoreType FLUME_SSL_KEYSTORE_TYPE Keystore type (by default JKS)
javax.net.ssl.trustStore FLUME_SSL_TRUSTSTORE_PATH Truststore location
javax.net.ssl.trustStorePassword FLUME_SSL_TRUSTSTORE_PASSWORD Truststore password
javax.net.ssl.trustStoreType FLUME_SSL_TRUSTSTORE_TYPE Truststore type (by default JKS)
flume.ssl.include.protocols FLUME_SSL_INCLUDE_PROTOCOLS Protocols to include when calculating enabled protocols. A comma (,) separated list. Excluded protocols will be excluded from this list if provided.
flume.ssl.exclude.protocols FLUME_SSL_EXCLUDE_PROTOCOLS Protocols to exclude when calculating enabled protocols. A comma (,) separated list.
flume.ssl.include.cipherSuites FLUME_SSL_INCLUDE_CIPHERSUITES Cipher suites to include when calculating enabled cipher suites. A comma (,) separated list. Excluded cipher suites will be excluded from this list if provided.
flume.ssl.exclude.cipherSuites FLUME_SSL_EXCLUDE_CIPHERSUITES Cipher suites to exclude when calculating enabled cipher suites. A comma (,) separated list.

The SSL system properties can either be passed on the command line or by setting the JAVA_OPTS environment variable in conf/flume-env.sh. (Although, using the command line is inadvisable because the commands including the passwords will be saved to the command history.)

export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStore=/path/to/keystore.jks"
export JAVA_OPTS="$JAVA_OPTS -Djavax.net.ssl.keyStorePassword=password"

Flume uses the system properties defined in JSSE (Java Secure Socket Extension), so this is a standard way for setting up SSL. On the other hand, specifying passwords in system properties means that the passwords can be seen in the process list. For cases where it is not acceptable, it is also be possible to define the parameters in environment variables. Flume initializes the JSSE system properties from the corresponding environment variables internally in this case.

The SSL environment variables can either be set in the shell environment before starting Flume or in conf/flume-env.sh. (Although, using the command line is inadvisable because the commands including the passwords will be saved to the command history.)

export FLUME_SSL_KEYSTORE_PATH=/path/to/keystore.jks
export FLUME_SSL_KEYSTORE_PASSWORD=password

Please note:

  • SSL must be enabled at component level. Specifying the global SSL parameters alone will not have any effect.
  • If the global SSL parameters are specified at multiple levels, the priority is the following (from higher to lower):
    • component parameters in agent config
    • system properties
    • environment variables
  • If SSL is enabled for a component, but the SSL parameters are not specified in any of the ways described above, then
    • in case of keystores: configuration error
    • in case of truststores: the default truststore will be used (jssecacerts / cacerts in Oracle JDK)
  • The trustore password is optional in all cases. If not specified, then no integrity check will be performed on the truststore when it is opened by the JDK.

Source and sink batch sizes and channel transaction capacities

Sources and sinks can have a batch size parameter that determines the maximum number of events they process in one batch. This happens within a channel transaction that has an upper limit called transaction capacity. Batch size must be smaller than the channel’s transaction capacity. There is an explicit check to prevent incompatible settings. This check happens whenever the configuration is read.

Flume Sources

Avro Source

Listens on Avro port and receives events from external Avro client streams. When paired with the built-in Avro Sink on another (previous hop) Flume agent, it can create tiered collection topologies. Required properties are in bold.

Property Name Default Description
channels  
type The component type name, needs to be avro
bind hostname or IP address to listen on
port Port # to bind to
threads Maximum number of worker threads to spawn
selector.type    
selector.*    
interceptors Space-separated list of interceptors
interceptors.*    
compression-type none This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource
ssl false Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see SSL/TLS support section).
keystore This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error).
keystore-password The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error).
keystore-type JKS The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS).
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
include-protocols Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols.
exclude-cipher-suites Space-separated list of cipher suites to exclude.
include-cipher-suites Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. If included-cipher-suites is empty, it includes every supported cipher suites.
ipFilter false Set this to true to enable ipFiltering for netty
ipFilterRules Define N netty ipFilter pattern rules with this config.

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

Example of ipFilterRules

ipFilterRules defines N netty ipFilters separated by a comma a pattern rule must be in this format.

<’allow’ or deny>:<’ip’ or ‘name’ for computer name>:<pattern> or allow/deny:ip/name:pattern

example: ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:*

Note that the first rule to match will apply as the example below shows from a client on the localhost

This will Allow the client on localhost be deny clients from any other ip “allow:name:localhost,deny:ip:” This will deny the client on localhost be allow clients from any other ip “deny:name:localhost,allow:ip:

Thrift Source

Listens on Thrift port and receives events from external Thrift client streams. When paired with the built-in ThriftSink on another (previous hop) Flume agent, it can create tiered collection topologies. Thrift source can be configured to start in secure mode by enabling kerberos authentication. agent-principal and agent-keytab are the properties used by the Thrift source to authenticate to the kerberos KDC. Required properties are in bold.

Property Name Default Description
channels  
type The component type name, needs to be thrift
bind hostname or IP address to listen on
port Port # to bind to
threads Maximum number of worker threads to spawn
selector.type    
selector.*    
interceptors Space separated list of interceptors
interceptors.*    
ssl false Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see SSL/TLS support section)
keystore This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error).
keystore-password The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error).
keystore-type JKS The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS).
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
include-protocols Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols.
exclude-cipher-suites Space-separated list of cipher suites to exclude.
include-cipher-suites Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites.
kerberos false Set to true to enable kerberos authentication. In kerberos mode, agent-principal and agent-keytab are required for successful authentication. The Thrift source in secure mode, will accept connections only from Thrift clients that have kerberos enabled and are successfully authenticated to the kerberos KDC.
agent-principal The kerberos principal used by the Thrift Source to authenticate to the kerberos KDC.
agent-keytab —- The keytab location used by the Thrift Source in combination with the agent-principal to authenticate to the kerberos KDC.

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

Exec Source

Exec source runs a given Unix command on start-up and expects that process to continuously produce data on standard out (stderr is simply discarded, unless property logStdErr is set to true). If the process exits for any reason, the source also exits and will produce no further data. This means configurations such as cat [named pipe] or tail -F [file] are going to produce the desired results where as date will probably not - the former two commands produce streams of data where as the latter produces a single event and exits.

Required properties are in bold.

Property Name Default Description
channels  
type The component type name, needs to be exec
command The command to execute
shell A shell invocation used to run the command. e.g. /bin/sh -c. Required only for commands relying on shell features like wildcards, back ticks, pipes etc.
restartThrottle 10000 Amount of time (in millis) to wait before attempting a restart
restart false Whether the executed cmd should be restarted if it dies
logStdErr false Whether the command’s stderr should be logged
batchSize 20 The max number of lines to read and send to the channel at a time
batchTimeout 3000 Amount of time (in milliseconds) to wait, if the buffer size was not reached, before data is pushed downstream
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

Warning

The problem with ExecSource and other asynchronous sources is that the source can not guarantee that if there is a failure to put the event into the Channel the client knows about it. In such cases, the data will be lost. As a for instance, one of the most commonly requested features is the tail -F [file]-like use case where an application writes to a log file on disk and Flume tails the file, sending each line as an event. While this is possible, there’s an obvious problem; what happens if the channel fills up and Flume can’t send an event? Flume has no way of indicating to the application writing the log file that it needs to retain the log or that the event hasn’t been sent, for some reason. If this doesn’t make sense, you need only know this: Your application can never guarantee data has been received when using a unidirectional asynchronous interface such as ExecSource! As an extension of this warning - and to be completely clear - there is absolutely zero guarantee of event delivery when using this source. For stronger reliability guarantees, consider the Spooling Directory Source, Taildir Source or direct integration with Flume via the SDK.

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

The ‘shell’ config is used to invoke the ‘command’ through a command shell (such as Bash or Powershell). The ‘command’ is passed as an argument to ‘shell’ for execution. This allows the ‘command’ to use features from the shell such as wildcards, back ticks, pipes, loops, conditionals etc. In the absence of the ‘shell’ config, the ‘command’ will be invoked directly. Common values for ‘shell’ : ‘/bin/sh -c’, ‘/bin/ksh -c’, ‘cmd /c’, ‘powershell -Command’, etc.

a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done

JMS Source

JMS Source reads messages from a JMS destination such as a queue or topic. Being a JMS application it should work with any JMS provider but has only been tested with ActiveMQ. The JMS source provides configurable batch size, message selector, user/pass, and message to flume event converter. Note that the vendor provided JMS jars should be included in the Flume classpath using plugins.d directory (preferred), –classpath on command line, or via FLUME_CLASSPATH variable in flume-env.sh.

Required properties are in bold.

Property Name Default Description
channels  
type The component type name, needs to be jms
initialContextFactory Inital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactory The JNDI name the connection factory should appear as
providerURL The JMS provider URL
destinationName Destination name
destinationType Destination type (queue or topic)
messageSelector Message selector to use when creating the consumer
userName Username for the destination/provider
passwordFile File containing the password for the destination/provider
batchSize 100 Number of messages to consume in one batch
converter.type DEFAULT Class to use to convert messages to flume events. See below.
converter.* Converter properties.
converter.charset UTF-8 Default converter only. Charset to use when converting JMS TextMessages to byte arrays.
createDurableSubscription false Whether to create durable subscription. Durable subscription can only be used with destinationType topic. If true, “clientId” and “durableSubscriptionName” have to be specified.
clientId JMS client identifier set on Connection right after it is created. Required for durable subscriptions.
durableSubscriptionName Name used to identify the durable subscription. Required for durable subscriptions.
JMS message converter

The JMS source allows pluggable converters, though it’s likely the default converter will work for most purposes. The default converter is able to convert Bytes, Text, and Object messages to FlumeEvents. In all cases, the properties in the message are added as headers to the FlumeEvent.

BytesMessage:
Bytes of message are copied to body of the FlumeEvent. Cannot convert more than 2GB of data per message.
TextMessage:
Text of message is converted to a byte array and copied to the body of the FlumeEvent. The default converter uses UTF-8 by default but this is configurable.
ObjectMessage:
Object is written out to a ByteArrayOutputStream wrapped in an ObjectOutputStream and the resulting array is copied to the body of the FlumeEvent.

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE
SSL and JMS Source

JMS client implementations typically support to configure SSL/TLS via some Java system properties defined by JSSE (Java Secure Socket Extension). Specifying these system properties for Flume’s JVM, JMS Source (or more precisely the JMS client implementation used by the JMS Source) can connect to the JMS server through SSL (of course only when the JMS server has also been set up to use SSL). It should work with any JMS provider and has been tested with ActiveMQ, IBM MQ and Oracle WebLogic.

The following sections describe the SSL configuration steps needed on the Flume side only. You can find more detailed descriptions about the server side setup of the different JMS providers and also full working configuration examples on Flume Wiki.

SSL transport / server authentication:

If the JMS server uses self-signed certificate or its certificate is signed by a non-trusted CA (eg. the company’s own CA), then a truststore (containing the right certificate) needs to be set up and passed to Flume. It can be done via the global SSL parameters. For more details about the global SSL setup, see the SSL/TLS support section.

Some JMS providers require SSL specific JNDI Initial Context Factory and/or Provider URL settings when using SSL (eg. ActiveMQ uses ssl:// URL prefix instead of tcp://). In this case the source properties (initialContextFactory and/or providerURL) have to be adjusted in the agent config file.

Client certificate authentication (two-way SSL):

JMS Source can authenticate to the JMS server through client certificate authentication instead of the usual user/password login (when SSL is used and the JMS server is configured to accept this kind of authentication).

The keystore containing Flume’s key used for the authentication needs to be configured via the global SSL parameters again. For more details about the global SSL setup, see the SSL/TLS support section.

The keystore should contain only one key (if multiple keys are present, then the first one will be used). The key password must be the same as the keystore password.

In case of client certificate authentication, it is not needed to specify the userName / passwordFile properties for the JMS Source in the Flume agent config file.

Please note:

There are no component level configuration parameters for JMS Source unlike in case of other components. No enable SSL flag either. SSL setup is controlled by JNDI/Provider URL settings (ultimately the JMS server settings) and by the presence / absence of the truststore / keystore.

Spooling Directory Source

This source lets you ingest data by placing files to be ingested into a “spooling” directory on disk. This source will watch the specified directory for new files, and will parse events out of new files as they appear. The event parsing logic is pluggable. After a given file has been fully read into the channel, completion by default is indicated by renaming the file or it can be deleted or the trackerDir is used to keep track of processed files.

Unlike the Exec source, this source is reliable and will not miss data, even if Flume is restarted or killed. In exchange for this reliability, only immutable, uniquely-named files must be dropped into the spooling directory. Flume tries to detect these problem conditions and will fail loudly if they are violated:

  1. If a file is written to after being placed into the spooling directory, Flume will print an error to its log file and stop processing.
  2. If a file name is reused at a later time, Flume will print an error to its log file and stop processing.

To avoid the above issues, it may be useful to add a unique identifier (such as a timestamp) to log file names when they are moved into the spooling directory.

Despite the reliability guarantees of this source, there are still cases in which events may be duplicated if certain downstream failures occur. This is consistent with the guarantees offered by other Flume components.

Property Name Default Description
channels  
type The component type name, needs to be spooldir.
spoolDir The directory from which to read files from.
fileSuffix .COMPLETED Suffix to append to completely ingested files
deletePolicy never When to delete completed files: never or immediate
fileHeader false Whether to add a header storing the absolute path filename.
fileHeaderKey file Header key to use when appending absolute path filename to event header.
basenameHeader false Whether to add a header storing the basename of the file.
basenameHeaderKey basename Header Key to use when appending basename of file to event header.
includePattern ^.*$ Regular expression specifying which files to include. It can used together with ignorePattern. If a file matches both ignorePattern and includePattern regex, the file is ignored.
ignorePattern ^$ Regular expression specifying which files to ignore (skip). It can used together with includePattern. If a file matches both ignorePattern and includePattern regex, the file is ignored.
trackerDir .flumespool Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
trackingPolicy rename The tracking policy defines how file processing is tracked. It can be “rename” or “tracker_dir”. This parameter is only effective if the deletePolicy is “never”. “rename” - After processing files they get renamed according to the fileSuffix parameter. “tracker_dir” - Files are not renamed but a new empty file is created in the trackerDir. The new tracker file name is derived from the ingested one plus the fileSuffix.
consumeOrder oldest In which order files in the spooling directory will be consumed oldest, youngest and random. In case of oldest and youngest, the last modified time of the files will be used to compare the files. In case of a tie, the file with smallest lexicographical order will be consumed first. In case of random any file will be picked randomly. When using oldest and youngest the whole directory will be scanned to pick the oldest/youngest file, which might be slow if there are a large number of files, while using random may cause old files to be consumed very late if new files keep coming in the spooling directory.
pollDelay 500 Delay (in milliseconds) used when polling for new files.
recursiveDirectorySearch false Whether to monitor sub directories for new files to read.
maxBackoff 4000 The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter.
batchSize 100 Granularity at which to batch transfer to the channel
inputCharset UTF-8 Character set used by deserializers that treat the input file as text.
decodeErrorPolicy FAIL What to do when we see a non-decodable character in the input file. FAIL: Throw an exception and fail to parse the file. REPLACE: Replace the unparseable character with the “replacement character” char, typically Unicode U+FFFD. IGNORE: Drop the unparseable character sequence.
deserializer LINE Specify the deserializer used to parse the file into events. Defaults to parsing each line as an event. The class specified must implement EventDeserializer.Builder.
deserializer.*   Varies per event deserializer.
bufferMaxLines (Obselete) This option is now ignored.
bufferMaxLineLength 5000 (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

Example for an agent named agent-1:

a1.channels = ch-1
a1.sources = src-1

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
Event Deserializers

The following event deserializers ship with Flume.

LINE

This deserializer generates one event per line of text input.

Property Name Default Description
deserializer.maxLineLength 2048 Maximum number of characters to include in a single event. If a line exceeds this length, it is truncated, and the remaining characters on the line will appear in a subsequent event.
deserializer.outputCharset UTF-8 Charset to use for encoding events put into the channel.
AVRO

This deserializer is able to read an Avro container file, and it generates one event per Avro record in the file. Each event is annotated with a header that indicates the schema used. The body of the event is the binary Avro record data, not including the schema or the rest of the container file elements.

Note that if the spool directory source must retry putting one of these events onto a channel (for example, because the channel is full), then it will reset and retry from the most recent Avro container file sync point. To reduce potential event duplication in such a failure scenario, write sync markers more frequently in your Avro input files.

Property Name Default Description
deserializer.schemaType HASH How the schema is represented. By default, or when the value HASH is specified, the Avro schema is hashed and the hash is stored in every event in the event header “flume.avro.schema.hash”. If LITERAL is specified, the JSON-encoded schema itself is stored in every event in the event header “flume.avro.schema.literal”. Using LITERAL mode is relatively inefficient compared to HASH mode.
BlobDeserializer

This deserializer reads a Binary Large Object (BLOB) per event, typically one BLOB per file. For example a PDF or JPG file. Note that this approach is not suitable for very large objects because the entire BLOB is buffered in RAM.

Property Name Default Description
deserializer The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
deserializer.maxBlobLength 100000000 The maximum number of bytes to read and buffer for a given request

Taildir Source

Note

This source is provided as a preview feature. It does not work on Windows.

Watch the specified files, and tail them in nearly real-time once detected new lines appended to the each files. If the new lines are being written, this source will retry reading them in wait for the completion of the write.

This source is reliable and will not miss data even when the tailing files rotate. It periodically writes the last read position of each files on the given position file in JSON format. If Flume is stopped or down for some reason, it can restart tailing from the position written on the existing position file.

In other use case, this source can also start tailing from the arbitrary position for each files using the given position file. When there is no position file on the specified path, it will start tailing from the first line of each files by default.

Files will be consumed in order of their modification time. File with the oldest modification time will be consumed first.

This source does not rename or delete or do any modifications to the file being tailed. Currently this source does not support tailing binary files. It reads text files line by line.

Property Name Default Description
channels  
type The component type name, needs to be TAILDIR.
filegroups Space-separated list of file groups. Each file group indicates a set of files to be tailed.
filegroups.<filegroupName> Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only.
positionFile ~/.flume/taildir_position.json File in JSON format to record the inode, the absolute path and the last position of each tailing file.
headers.<filegroupName>.<headerKey> Header value which is the set with header key. Multiple headers can be specified for one file group.
byteOffsetHeader false Whether to add the byte offset of a tailed line to a header called ‘byteoffset’.
skipToEnd false Whether to skip the position to EOF in the case of files not written on the position file.
idleTimeout 120000 Time (ms) to close inactive files. If the closed file is appended new lines to, this source will automatically re-open it.
writePosInterval 3000 Interval time (ms) to write the last position of each file on the position file.
batchSize 100 Max number of lines to read and send to the channel at a time. Using the default is usually fine.
maxBatchCount Long.MAX_VALUE Controls the number of batches being read consecutively from the same file. If the source is tailing multiple files and one of them is written at a fast rate, it can prevent other files to be processed, because the busy file would be read in an endless loop. In this case lower this value.
backoffSleepIncrement 1000 The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data.
maxBackoffSleep 5000 The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data.
cachePatternMatching true Listing directories and applying the filename regex pattern may be time consuming for directories containing thousands of files. Caching the list of matching files can improve performance. The order in which files are consumed will also be cached. Requires that the file system keeps track of modification times with at least a 1-second granularity.
fileHeader false Whether to add a header storing the absolute path filename.
fileHeaderKey file Header key to use when appending absolute path filename to event header.

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000

Twitter 1% firehose Source (experimental)

Warning

This source is highly experimental and may change between minor versions of Flume. Use at your own risk.

Experimental source that connects via Streaming API to the 1% sample twitter firehose, continously downloads tweets, converts them to Avro format and sends Avro events to a downstream Flume sink. Requires the consumer and access tokens and secrets of a Twitter developer account. Required properties are in bold.

Property Name Default Description
channels  
type The component type name, needs to be org.apache.flume.source.twitter.TwitterSource
consumerKey OAuth consumer key
consumerSecret OAuth consumer secret
accessToken OAuth access token
accessTokenSecret OAuth token secret
maxBatchSize 1000 Maximum number of twitter messages to put in a single batch
maxBatchDurationMillis 1000 Maximum number of milliseconds to wait before closing a batch

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.twitter.TwitterSource
a1.sources.r1.channels = c1
a1.sources.r1.consumerKey = YOUR_TWITTER_CONSUMER_KEY
a1.sources.r1.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
a1.sources.r1.accessToken = YOUR_TWITTER_ACCESS_TOKEN
a1.sources.r1.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
a1.sources.r1.maxBatchSize = 10
a1.sources.r1.maxBatchDurationMillis = 200

Kafka Source

Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics. If you have multiple Kafka sources running, you can configure them with the same Consumer Group so each will read a unique set of partitions for the topics. This currently supports Kafka server releases 0.10.1.0 or higher. Testing was done up to 2.0.1 that was the highest avilable version at the time of the release.

Property Name Default Description
channels  
type The component type name, needs to be org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers List of brokers in the Kafka cluster used by the source
kafka.consumer.group.id flume Unique identified of consumer group. Setting the same id in multiple sources or agents indicates that they are part of the same consumer group
kafka.topics Comma-separated list of topics the kafka consumer will read messages from.
kafka.topics.regex Regex that defines set of topics the source is subscribed on. This property has higher priority than kafka.topics and overrides kafka.topics if exists.
batchSize 1000 Maximum number of messages written to Channel in one batch
batchDurationMillis 1000 Maximum time (in ms) before a batch will be written to Channel The batch will be written whenever the first of size and time will be reached.
backoffSleepIncrement 1000 Initial and incremental wait time that is triggered when a Kafka Topic appears to be empty. Wait period will reduce aggressive pinging of an empty Kafka Topic. One second is ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors.
maxBackoffSleep 5000 Maximum wait time that is triggered when a Kafka Topic appears to be empty. Five seconds is ideal for ingestion use cases but a lower value may be required for low latency operations with interceptors.
useFlumeEventFormat false By default events are taken as bytes from the Kafka topic directly into the event body. Set to true to read events as the Flume Avro binary format. Used in conjunction with the same property on the KafkaSink or with the parseAsFlumeEvent property on the Kafka Channel this will preserve any Flume headers sent on the producing side.
setTopicHeader true When set to true, stores the topic of the retrieved message into a header, defined by the topicHeader property.
topicHeader topic Defines the name of the header in which to store the name of the topic the message was received from, if the setTopicHeader property is set to true. Care should be taken if combining with the Kafka Sink topicHeader property so as to avoid sending the message back to the same topic in a loop.
kafka.consumer.security.protocol PLAINTEXT Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.
more consumer security props   If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on consumer.
Other Kafka Consumer Properties These properties are used to configure the Kafka Consumer. Any consumer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix kafka.consumer. For example: kafka.consumer.auto.offset.reset

Note

The Kafka Source overrides two Kafka consumer parameters: auto.commit.enable is set to “false” by the source and every batch is committed. Kafka source guarantees at least once strategy of messages retrieval. The duplicates can be present when the source starts. The Kafka Source also provides defaults for the key.deserializer(org.apache.kafka.common.serialization.StringSerializer) and value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer). Modification of these parameters is not recommended.

Deprecated Properties

Property Name Default Description
topic Use kafka.topics
groupId flume Use kafka.consumer.group.id
zookeeperConnect Is no longer supported by kafka consumer client since 0.9.x. Use kafka.bootstrap.servers to establish connection with kafka cluster
migrateZookeeperOffsets true When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set to false, though that should generally not be required. If no Zookeeper offset is found, the Kafka configuration kafka.consumer.auto.offset.reset defines how offsets are handled. Check Kafka documentation for details

Example for topic subscription by comma-separated topic list.

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id

Example for topic subscription by regex

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used

Security and Kafka Source:

Secure authentication as well as data encryption is supported on the communication channel between Flume and Kafka. For secure authentication SASL/GSSAPI (Kerberos V5) or SSL (even though the parameter is named SSL, the actual protocol is a TLS implementation) can be used from Kafka version 0.9.0.

As of now data encryption is solely provided by SSL/TLS.

Setting kafka.consumer.security.protocol to any of the following value means:

  • SASL_PLAINTEXT - Kerberos or plaintext authentication with no data encryption
  • SASL_SSL - Kerberos or plaintext authentication with data encryption
  • SSL - TLS based encryption with optional authentication.

Warning

There is a performance degradation when SSL is enabled, the magnitude of which depends on the CPU type and the JVM implementation. Reference: Kafka security overview and the jira for tracking this issue: KAFKA-2561

TLS and Kafka Source:

Please read the steps described in Configuring Kafka Clients SSL to learn about additional configuration settings for fine tuning for example any of the following: security provider, cipher suites, enabled protocols, truststore or keystore types.

Example configuration with server side authentication and data encryption.

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SSL
# optional, the global truststore can be used alternatively
a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>

Specyfing the truststore is optional here, the global truststore can be used instead. For more details about the global SSL setup, see the SSL/TLS support section.

Note: By default the property ssl.endpoint.identification.algorithm is not defined, so hostname verification is not performed. In order to enable hostname verification, set the following properties

a1.sources.source1.kafka.consumer.ssl.endpoint.identification.algorithm=HTTPS

Once enabled, clients will verify the server’s fully qualified domain name (FQDN) against one of the following two fields:

  1. Common Name (CN) https://tools.ietf.org/html/rfc6125#section-2.3
  2. Subject Alternative Name (SAN) https://tools.ietf.org/html/rfc5280#section-4.2.1.6

If client side authentication is also required then additionally the following needs to be added to Flume agent configuration or the global SSL setup can be used (see SSL/TLS support section). Each Flume agent has to have its client certificate which has to be trusted by Kafka brokers either individually or by their signature chain. Common example is to sign each client certificate by a single Root CA which in turn is trusted by Kafka brokers.

# optional, the global keystore can be used alternatively
a1.sources.source1.kafka.consumer.ssl.keystore.location=/path/to/client.keystore.jks
a1.sources.source1.kafka.consumer.ssl.keystore.password=<password to access the keystore>

If keystore and key use different password protection then ssl.key.password property will provide the required additional secret for both consumer keystores:

a1.sources.source1.kafka.consumer.ssl.key.password=<password to access the key>

Kerberos and Kafka Source:

To use Kafka source with a Kafka cluster secured with Kerberos, set the consumer.security.protocol properties noted above for consumer. The Kerberos keytab and principal to be used with Kafka brokers is specified in a JAAS file’s “KafkaClient” section. “Client” section describes the Zookeeper connection if needed. See Kafka doc for information on the JAAS file contents. The location of this JAAS file and optionally the system wide kerberos configuration can be specified via JAVA_OPTS in flume-env.sh:

JAVA_OPTS="$JAVA_OPTS -Djava.security.krb5.conf=/path/to/krb5.conf"
JAVA_OPTS="$JAVA_OPTS -Djava.security.auth.login.config=/path/to/flume_jaas.conf"

Example secure configuration using SASL_PLAINTEXT:

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka

Example secure configuration using SASL_SSL:

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093
a1.sources.source1.kafka.topics = mytopic
a1.sources.source1.kafka.consumer.group.id = flume-consumer
a1.sources.source1.kafka.consumer.security.protocol = SASL_SSL
a1.sources.source1.kafka.consumer.sasl.mechanism = GSSAPI
a1.sources.source1.kafka.consumer.sasl.kerberos.service.name = kafka
# optional, the global truststore can be used alternatively
a1.sources.source1.kafka.consumer.ssl.truststore.location=/path/to/truststore.jks
a1.sources.source1.kafka.consumer.ssl.truststore.password=<password to access the truststore>

Sample JAAS file. For reference of its content please see client config sections of the desired authentication mechanism (GSSAPI/PLAIN) in Kafka documentation of SASL configuration . Since the Kafka Source may also connect to Zookeeper for offset migration, the “Client” section was also added to this example. This won’t be needed unless you require offset migration, or you require this section for other secure components. Also please make sure that the operating system user of the Flume processes has read privileges on the jaas and keytab files.

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="/path/to/keytabs/flume.keytab"
  principal="flume/flumehost1.example.com@YOURKERBEROSREALM";
};

NetCat TCP Source

A netcat-like source that listens on a given port and turns each line of text into an event. Acts like nc -k -l [host] [port]. In other words, it opens a specified port and listens for data. The expectation is that the supplied data is newline separated text. Each line of text is turned into a Flume event and sent via the connected channel.

Required properties are in bold.

Property Name Default Description
channels  
type The component type name, needs to be netcat
bind Host name or IP address to bind to
port Port # to bind to
max-line-length 512 Max line length per event body (in bytes)
ack-every-event true Respond with an “OK” for every event received
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1

NetCat UDP Source

As per the original Netcat (TCP) source, this source that listens on a given port and turns each line of text into an event and sent via the connected channel. Acts like nc -u -k -l [host] [port].

Required properties are in bold.

Property Name Default Description
channels  
type The component type name, needs to be netcatudp
bind Host name or IP address to bind to
port Port # to bind to
remoteAddressHeader  
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcatudp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1

Sequence Generator Source

A simple sequence generator that continuously generates events with a counter that starts from 0, increments by 1 and stops at totalEvents. Retries when it can’t send events to the channel. Useful mainly for testing. During retries it keeps the body of the retried messages the same as before so that the number of unique events - after de-duplication at destination - is expected to be equal to the specified totalEvents. Required properties are in bold.

Property Name Default Description
channels  
type The component type name, needs to be seq
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    
batchSize 1 Number of events to attempt to process per request loop.
totalEvents Long.MAX_VALUE Number of unique events sent by the source.

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1

Syslog Sources

Reads syslog data and generate Flume events. The UDP source treats an entire message as a single event. The TCP sources create a new event for each string of characters separated by a newline (‘n’).

Required properties are in bold.

Syslog TCP Source

The original, tried-and-true syslog TCP source.

Property Name Default Description
channels  
type The component type name, needs to be syslogtcp
host Host name or IP address to bind to
port Port # to bind to
eventSize 2500 Maximum size of a single event line, in bytes
keepFields none Setting this to ‘all’ will preserve the Priority, Timestamp and Hostname in the body of the event. A spaced separated list of fields to include is allowed as well. Currently, the following fields can be included: priority, version, timestamp, hostname. The values ‘true’ and ‘false’ have been deprecated in favor of ‘all’ and ‘none’.
clientIPHeader If specified, the IP address of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the IP address of the client. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case.
clientHostnameHeader If specified, the host name of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the host name of the client. Retrieving the host name may involve a name service reverse lookup which may affect the performance. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case.
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    
ssl false Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see SSL/TLS support section).
keystore This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error).
keystore-password The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error).
keystore-type JKS The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS).
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
include-protocols Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols.
exclude-cipher-suites Space-separated list of cipher suites to exclude.
include-cipher-suites Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. If included-cipher-suites is empty, it includes every supported cipher suites.

For example, a syslog TCP source for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
Multiport Syslog TCP Source

This is a newer, faster, multi-port capable version of the Syslog TCP source. Note that the ports configuration setting has replaced port. Multi-port capability means that it can listen on many ports at once in an efficient manner. This source uses the Apache Mina library to do that. Provides support for RFC-3164 and many common RFC-5424 formatted messages. Also provides the capability to configure the character set used on a per-port basis.

Property Name Default Description
channels  
type The component type name, needs to be multiport_syslogtcp
host Host name or IP address to bind to.
ports Space-separated list (one or more) of ports to bind to.
eventSize 2500 Maximum size of a single event line, in bytes.
keepFields none Setting this to ‘all’ will preserve the Priority, Timestamp and Hostname in the body of the event. A spaced separated list of fields to include is allowed as well. Currently, the following fields can be included: priority, version, timestamp, hostname. The values ‘true’ and ‘false’ have been deprecated in favor of ‘all’ and ‘none’.
portHeader If specified, the port number will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the incoming port.
clientIPHeader If specified, the IP address of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the IP address of the client. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case.
clientHostnameHeader If specified, the host name of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the host name of the client. Retrieving the host name may involve a name service reverse lookup which may affect the performance. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case.
charset.default UTF-8 Default character set used while parsing syslog events into strings.
charset.port.<port> Character set is configurable on a per-port basis.
batchSize 100 Maximum number of events to attempt to process per request loop. Using the default is usually fine.
readBufferSize 1024 Size of the internal Mina read buffer. Provided for performance tuning. Using the default is usually fine.
numProcessors (auto-detected) Number of processors available on the system for use while processing messages. Default is to auto-detect # of CPUs using the Java Runtime API. Mina will spawn 2 request-processing threads per detected CPU, which is often reasonable.
selector.type replicating replicating, multiplexing, or custom
selector.* Depends on the selector.type value
interceptors Space-separated list of interceptors.
interceptors.*    
ssl false Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see SSL/TLS support section).
keystore This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error).
keystore-password The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error).
keystore-type JKS The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS).
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
include-protocols Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols.
exclude-cipher-suites Space-separated list of cipher suites to exclude.
include-cipher-suites Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. If included-cipher-suites is empty, it includes every supported cipher suites.

For example, a multiport syslog TCP source for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.channels = c1
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.ports = 10001 10002 10003
a1.sources.r1.portHeader = port
Syslog UDP Source
Property Name Default Description
channels  
type The component type name, needs to be syslogudp
host Host name or IP address to bind to
port Port # to bind to
keepFields false Setting this to true will preserve the Priority, Timestamp and Hostname in the body of the event.
clientIPHeader If specified, the IP address of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the IP address of the client. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case.
clientHostnameHeader If specified, the host name of the client will be stored in the header of each event using the header name specified here. This allows for interceptors and channel selectors to customize routing logic based on the host name of the client. Retrieving the host name may involve a name service reverse lookup which may affect the performance. Do not use the standard Syslog header names here (like _host_) because the event header will be overridden in that case.
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

For example, a syslog UDP source for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

HTTP Source

A source which accepts Flume Events by HTTP POST and GET. GET should be used for experimentation only. HTTP requests are converted into flume events by a pluggable “handler” which must implement the HTTPSourceHandler interface. This handler takes a HttpServletRequest and returns a list of flume events. All events handled from one Http request are committed to the channel in one transaction, thus allowing for increased efficiency on channels like the file channel. If the handler throws an exception, this source will return a HTTP status of 400. If the channel is full, or the source is unable to append events to the channel, the source will return a HTTP 503 - Temporarily unavailable status.

All events sent in one post request are considered to be one batch and inserted into the channel in one transaction.

This source is based on Jetty 9.4 and offers the ability to set additional Jetty-specific parameters which will be passed directly to the Jetty components.

Property Name Default Description
type   The component type name, needs to be http
port The port the source should bind to.
bind 0.0.0.0 The hostname or IP address to listen on
handler org.apache.flume.source.http.JSONHandler The FQCN of the handler class.
handler.* Config parameters for the handler
selector.type replicating replicating or multiplexing
selector.*   Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    
ssl false Set the property true, to enable SSL. HTTP Source does not support SSLv3.
exclude-protocols SSLv3 Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
include-protocols Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols.
exclude-cipher-suites Space-separated list of cipher suites to exclude.
include-cipher-suites Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites.
keystore   Location of the keystore including keystore file name. If SSL is enabled but the keystore is not specified here, then the global keystore will be used (if defined, otherwise configuration error).
keystore-password   Keystore password. If SSL is enabled but the keystore password is not specified here, then the global keystore password will be used (if defined, otherwise configuration error).
keystore-type JKS Keystore type. This can be “JKS” or “PKCS12”.
QueuedThreadPool.*   Jetty specific settings to be set on org.eclipse.jetty.util.thread.QueuedThreadPool. N.B. QueuedThreadPool will only be used if at least one property of this class is set.
HttpConfiguration.*   Jetty specific settings to be set on org.eclipse.jetty.server.HttpConfiguration
SslContextFactory.*   Jetty specific settings to be set on org.eclipse.jetty.util.ssl.SslContextFactory (only applicable when ssl is set to true).
ServerConnector.*   Jetty specific settings to be set on org.eclipse.jetty.server.ServerConnector

Deprecated Properties

Property Name Default Description
keystorePassword Use keystore-password. Deprecated value will be overwritten with the new one.
excludeProtocols SSLv3 Use exclude-protocols. Deprecated value will be overwritten with the new one.
enableSSL false Use ssl. Deprecated value will be overwritten with the new one.

N.B. Jetty-specific settings are set using the setter-methods on the objects listed above. For full details see the Javadoc for these classes (QueuedThreadPool , HttpConfiguration , SslContextFactory and ServerConnector ).

When using Jetty-specific setings, named properites above will take precedence (for example excludeProtocols will take precedence over SslContextFactory.ExcludeProtocols). All properties will be inital lower case.

An example http source for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = http
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1
a1.sources.r1.handler = org.example.rest.RestHandler
a1.sources.r1.handler.nickname = random props
a1.sources.r1.HttpConfiguration.sendServerVersion = false
a1.sources.r1.ServerConnector.idleTimeout = 300
JSONHandler

A handler is provided out of the box which can handle events represented in JSON format, and supports UTF-8, UTF-16 and UTF-32 character sets. The handler accepts an array of events (even if there is only one event, the event has to be sent in an array) and converts them to a Flume event based on the encoding specified in the request. If no encoding is specified, UTF-8 is assumed. The JSON handler supports UTF-8, UTF-16 and UTF-32. Events are represented as follows.

[{
  "headers" : {
             "timestamp" : "434324343",
             "host" : "random_host.example.com"
             },
  "body" : "random_body"
  },
  {
  "headers" : {
             "namenode" : "namenode.example.com",
             "datanode" : "random_datanode.example.com"
             },
  "body" : "really_random_body"
  }]

To set the charset, the request must have content type specified as application/json; charset=UTF-8 (replace UTF-8 with UTF-16 or UTF-32 as required).

One way to create an event in the format expected by this handler is to use JSONEvent provided in the Flume SDK and use Google Gson to create the JSON string using the Gson#fromJson(Object, Type) method. The type token to pass as the 2nd argument of this method for list of events can be created by:

Type type = new TypeToken<List<JSONEvent>>() {}.getType();
BlobHandler

By default HTTPSource splits JSON input into Flume events. As an alternative, BlobHandler is a handler for HTTPSource that returns an event that contains the request parameters as well as the Binary Large Object (BLOB) uploaded with this request. For example a PDF or JPG file. Note that this approach is not suitable for very large objects because it buffers up the entire BLOB in RAM.

Property Name Default Description
handler The FQCN of this class: org.apache.flume.sink.solr.morphline.BlobHandler
handler.maxBlobLength 100000000 The maximum number of bytes to read and buffer for a given request

Stress Source

StressSource is an internal load-generating source implementation which is very useful for stress tests. It allows User to configure the size of Event payload, with empty headers. User can configure total number of events to be sent as well maximum number of Successful Event to be delivered.

Required properties are in bold.

Property Name Default Description
type The component type name, needs to be org.apache.flume.source.StressSource
size 500 Payload size of each Event. Unit:byte
maxTotalEvents -1 Maximum number of Events to be sent
maxSuccessfulEvents -1 Maximum number of Events successfully sent
batchSize 1 Number of Events to be sent in one batch
maxEventsPerSecond 0 When set to an integer greater than zero, enforces a rate limiter onto the source.

Example for agent named a1:

a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1

Legacy Sources

The legacy sources allow a Flume 1.x agent to receive events from Flume 0.9.4 agents. It accepts events in the Flume 0.9.4 format, converts them to the Flume 1.0 format, and stores them in the connected channel. The 0.9.4 event properties like timestamp, pri, host, nanos, etc get converted to 1.x event header attributes. The legacy source supports both Avro and Thrift RPC connections. To use this bridge between two Flume versions, you need to start a Flume 1.x agent with the avroLegacy or thriftLegacy source. The 0.9.4 agent should have the agent Sink pointing to the host/port of the 1.x agent.

Note

The reliability semantics of Flume 1.x are different from that of Flume 0.9.x. The E2E or DFO mode of a Flume 0.9.x agent will not be supported by the legacy source. The only supported 0.9.x mode is the best effort, though the reliability setting of the 1.x flow will be applicable to the events once they are saved into the Flume 1.x channel by the legacy source.

Required properties are in bold.

Avro Legacy Source
Property Name Default Description
channels  
type The component type name, needs to be org.apache.flume.source.avroLegacy.AvroLegacySource
host The hostname or IP address to bind to
port The port # to listen on
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1
Thrift Legacy Source
Property Name Default Description
channels  
type The component type name, needs to be org.apache.flume.source.thriftLegacy.ThriftLegacySource
host The hostname or IP address to bind to
port The port # to listen on
selector.type   replicating or multiplexing
selector.* replicating Depends on the selector.type value
interceptors Space-separated list of interceptors
interceptors.*    

Example for agent named a1:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1

Custom Source

A custom source is your own implementation of the Source interface. A custom source’s class and its dependencies must be included in the agent’s classpath when starting the Flume agent. The type of the custom source is its FQCN.