28. Configuration Options
Spring Cloud Stream supports general configuration options as well as configuration for bindings and binders. Some binders allow additional binding properties to support middleware-specific features.
Configuration options can be provided to Spring Cloud Stream applications via any mechanism supported by Spring Boot. This includes application arguments, environment variables, and YAML or .properties files.
28.1 Spring Cloud Stream Properties
-
spring.cloud.stream.instanceCount
- The number of deployed instances of an application. Must be set for partitioning and if using Kafka.
Default: 1
.
-
spring.cloud.stream.instanceIndex
- The instance index of the application: a number from
0
toinstanceCount
-1. Used for partitioning and with Kafka. Automatically set in Cloud Foundry to match the application’s instance index.
- The instance index of the application: a number from
-
spring.cloud.stream.dynamicDestinations
- A list of destinations that can be bound dynamically (for example, in a dynamic routing scenario). If set, only listed destinations can be bound.
Default: empty (allowing any destination to be bound).
-
spring.cloud.stream.defaultBinder
- The default binder to use, if multiple binders are configured. See Multiple Binders on the Classpath.
Default: empty.
-
spring.cloud.stream.overrideCloudConnectors
- This property is only applicable when the
cloud
profile is active and Spring Cloud Connectors are provided with the application. If the property is false (the default), the binder will detect a suitable bound service (e.g. a RabbitMQ service bound in Cloud Foundry for the RabbitMQ binder) and will use it for creating connections (usually via Spring Cloud Connectors). When set to true, this property instructs binders to completely ignore the bound services and rely on Spring Boot properties (e.g. relying on thespring.rabbitmq.*
properties provided in the environment for the RabbitMQ binder). The typical usage of this property is to be nested in a customized environment when connecting to multiple systems.
- This property is only applicable when the
Default: false.
28.2 Binding Properties
Binding properties are supplied using the format spring.cloud.stream.bindings.<channelName>.<property>=<value>
. The <channelName>
represents the name of the channel being configured (e.g., output
for a Source
).
To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format spring.cloud.stream.default.<property>=<value>
.
In what follows, we indicate where we have omitted the spring.cloud.stream.bindings.<channelName>.
prefix and focus just on the property name, with the understanding that the prefix will be included at runtime.
28.2.1 Properties for Use of Spring Cloud Stream
The following binding properties are available for both input and output bindings and must be prefixed with spring.cloud.stream.bindings.<channelName>.
, e.g. spring.cloud.stream.bindings.input.destination=ticktock
.
Default values can be set by using the prefix spring.cloud.stream.default
, e.g. spring.cloud.stream.default.contentType=application/json
.
-
destination
- The target destination of a channel on the bound middleware (e.g., the RabbitMQ exchange or Kafka topic). If the channel is bound as a consumer, it could be bound to multiple destinations and the destination names can be specified as comma separated String values. If not set, the channel name is used instead. The default value of this property cannot be overridden.
-
group
- The consumer group of the channel. Applies only to inbound bindings. See Consumer Groups.
Default: null (indicating an anonymous consumer).
-
contentType
- The content type of the channel.
Default: null (so that no type coercion is performed).
-
binder
- The binder used by this binding. See Section 27.4, “Multiple Binders on the Classpath” for details.
Default: null (the default binder will be used, if one exists).
28.2.2 Consumer properties
The following binding properties are available for input bindings only and must be prefixed with spring.cloud.stream.bindings.<channelName>.consumer.
, e.g. spring.cloud.stream.bindings.input.consumer.concurrency=3
.
Default values can be set by using the prefix spring.cloud.stream.default.consumer
, e.g. spring.cloud.stream.default.consumer.headerMode=raw
.
-
concurrency
- The concurrency of the inbound consumer.
Default: 1
.
-
partitioned
- Whether the consumer receives data from a partitioned producer.
Default: false
.
-
headerMode
- When set to
raw
, disables header parsing on input. Effective only for messaging middleware that does not support message headers natively and requires header embedding. Useful when inbound data is coming from outside Spring Cloud Stream applications.
- When set to
Default: embeddedHeaders
.
-
maxAttempts
- If processing fails, the number of attempts to process the message (including the first). Set to 1 to disable retry.
Default: 3
.
-
backOffInitialInterval
- The backoff initial interval on retry.
Default: 1000
.
-
backOffMaxInterval
- The maximum backoff interval.
Default: 10000
.
-
backOffMultiplier
- The backoff multiplier.
Default: 2.0
.
-
instanceIndex
- When set to a value greater than equal to zero, allows customizing the instance index of this consumer (if different from
spring.cloud.stream.instanceIndex
). When set to a negative value, it will default tospring.cloud.stream.instanceIndex
.
- When set to a value greater than equal to zero, allows customizing the instance index of this consumer (if different from
Default: -1
.
-
instanceCount
- When set to a value greater than equal to zero, allows customizing the instance count of this consumer (if different from
spring.cloud.stream.instanceCount
). When set to a negative value, it will default tospring.cloud.stream.instanceCount
.
- When set to a value greater than equal to zero, allows customizing the instance count of this consumer (if different from
Default: -1
.
28.2.3 Producer Properties
The following binding properties are available for output bindings only and must be prefixed with spring.cloud.stream.bindings.<channelName>.producer.
, e.g. spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id
.
Default values can be set by using the prefix spring.cloud.stream.default.producer
, e.g. spring.cloud.stream.default.producer.partitionKeyExpression=payload.id
.
-
partitionKeyExpression
- A SpEL expression that determines how to partition outbound data. If set, or if
partitionKeyExtractorClass
is set, outbound data on this channel will be partitioned, andpartitionCount
must be set to a value greater than 1 to be effective. The two options are mutually exclusive. See Section 25.5, “Partitioning Support”.
- A SpEL expression that determines how to partition outbound data. If set, or if
Default: null.
-
partitionKeyExtractorClass
- A
PartitionKeyExtractorStrategy
implementation. If set, or ifpartitionKeyExpression
is set, outbound data on this channel will be partitioned, andpartitionCount
must be set to a value greater than 1 to be effective. The two options are mutually exclusive. See Section 25.5, “Partitioning Support”.
- A
Default: null.
-
partitionSelectorClass
- A
PartitionSelectorStrategy
implementation. Mutually exclusive withpartitionSelectorExpression
. If neither is set, the partition will be selected as thehashCode(key) % partitionCount
, wherekey
is computed via eitherpartitionKeyExpression
orpartitionKeyExtractorClass
.
- A
Default: null.
-
partitionSelectorExpression
- A SpEL expression for customizing partition selection. Mutually exclusive with
partitionSelectorClass
. If neither is set, the partition will be selected as thehashCode(key) % partitionCount
, wherekey
is computed via eitherpartitionKeyExpression
orpartitionKeyExtractorClass
.
- A SpEL expression for customizing partition selection. Mutually exclusive with
Default: null.
-
partitionCount
- The number of target partitions for the data, if partitioning is enabled. Must be set to a value greater than 1 if the producer is partitioned. On Kafka, interpreted as a hint; the larger of this and the partition count of the target topic is used instead.
Default: 1
.
-
requiredGroups
- A comma-separated list of groups to which the producer must ensure message delivery even if they start after it has been created (e.g., by pre-creating durable queues in RabbitMQ).
-
headerMode
- When set to
raw
, disables header embedding on output. Effective only for messaging middleware that does not support message headers natively and requires header embedding. Useful when producing data for non-Spring Cloud Stream applications.
- When set to
Default: embeddedHeaders
.
-
useNativeEncoding
- When set to
true
, the outbound message is serialized directly by client library, which must be configured correspondingly (e.g. setting an appropriate Kafka producer value serializer). When this configuration is being used, the outbound message marshalling is not based on thecontentType
of the binding. When native encoding is used, it is the responsibility of the consumer to use appropriate decoder (ex: Kafka consumer value de-serializer) to deserialize the inbound message. Also, when native encoding/decoding is used theheaderMode
property is ignored and headers will not be embedded into the message.
- When set to
Default: false
.
-
errorChannelEnabled
- When set to
true
, if the binder supports async send results; send failures will be sent to an error channel for the destination. See the section called “Message Channel Binders and Error Channels” for more information.
- When set to
Default: false
.
28.3 Using dynamically bound destinations
Besides the channels defined via @EnableBinding
, Spring Cloud Stream allows applications to send messages to dynamically bound destinations. This is useful, for example, when the target destination needs to be determined at runtime. Applications can do so by using the BinderAwareChannelResolver
bean, registered automatically by the @EnableBinding
annotation.
The property 'spring.cloud.stream.dynamicDestinations' can be used for restricting the dynamic destination names to a set known beforehand (whitelisting). If the property is not set, any destination can be bound dynamicaly.
The BinderAwareChannelResolver
can be used directly as in the following example, in which a REST controller uses a path variable to decide the target channel.
@EnableBinding
@Controller
public class SourceWithDynamicDestination {
@Autowired
private BinderAwareChannelResolver resolver;
@RequestMapping(path = "/{target}", method = POST, consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @PathVariable("target") target,
@RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
sendMessage(body, target, contentType);
}
private void sendMessage(String body, String target, Object contentType) {
resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
}
After starting the application on the default port 8080, when sending the following data:
curl -H "Content-Type: application/json" -X POST -d "customer-1" http://localhost:8080/customers
curl -H "Content-Type: application/json" -X POST -d "order-1" http://localhost:8080/orders
The destinations 'customers' and 'orders' are created in the broker (for example: exchange in case of Rabbit or topic in case of Kafka) with the names 'customers' and 'orders', and the data is published to the appropriate destinations.
The BinderAwareChannelResolver
is a general purpose Spring Integration DestinationResolver
and can be injected in other components. For example, in a router using a SpEL expression based on the target
field of an incoming JSON message.
@EnableBinding
@Controller
public class SourceWithDynamicDestination {
@Autowired
private BinderAwareChannelResolver resolver;
@RequestMapping(path = "/", method = POST, consumes = "application/json")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
sendMessage(body, contentType);
}
private void sendMessage(Object body, Object contentType) {
routerChannel().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
@Bean(name = "routerChannel")
public MessageChannel routerChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "routerChannel")
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router =
new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.target"));
router.setDefaultOutputChannelName("default-output");
router.setChannelResolver(resolver);
return router;
}
}