28. Configuration Options

Spring Cloud Stream supports general configuration options as well as configuration for bindings and binders. Some binders allow additional binding properties to support middleware-specific features.

Configuration options can be provided to Spring Cloud Stream applications via any mechanism supported by Spring Boot. This includes application arguments, environment variables, and YAML or .properties files.

28.1 Spring Cloud Stream Properties

  • spring.cloud.stream.instanceCount

    • The number of deployed instances of an application. Must be set for partitioning and if using Kafka.

Default: 1 .

  • spring.cloud.stream.instanceIndex

    • The instance index of the application: a number from 0 to instanceCount -1. Used for partitioning and with Kafka. Automatically set in Cloud Foundry to match the application’s instance index.
  • spring.cloud.stream.dynamicDestinations

    • A list of destinations that can be bound dynamically (for example, in a dynamic routing scenario). If set, only listed destinations can be bound.

Default: empty (allowing any destination to be bound).

Default: empty.

  • spring.cloud.stream.overrideCloudConnectors

    • This property is only applicable when the cloud profile is active and Spring Cloud Connectors are provided with the application. If the property is false (the default), the binder will detect a suitable bound service (e.g. a RabbitMQ service bound in Cloud Foundry for the RabbitMQ binder) and will use it for creating connections (usually via Spring Cloud Connectors). When set to true, this property instructs binders to completely ignore the bound services and rely on Spring Boot properties (e.g. relying on the spring.rabbitmq.* properties provided in the environment for the RabbitMQ binder). The typical usage of this property is to be nested in a customized environment when connecting to multiple systems.

Default: false.

28.2 Binding Properties

Binding properties are supplied using the format spring.cloud.stream.bindings.<channelName>.<property>=<value> . The <channelName> represents the name of the channel being configured (e.g., output for a Source ).

To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format spring.cloud.stream.default.<property>=<value> .

In what follows, we indicate where we have omitted the spring.cloud.stream.bindings.<channelName>. prefix and focus just on the property name, with the understanding that the prefix will be included at runtime.

28.2.1 Properties for Use of Spring Cloud Stream

The following binding properties are available for both input and output bindings and must be prefixed with spring.cloud.stream.bindings.<channelName>. , e.g. spring.cloud.stream.bindings.input.destination=ticktock .

Default values can be set by using the prefix spring.cloud.stream.default , e.g. spring.cloud.stream.default.contentType=application/json .

  • destination

    • The target destination of a channel on the bound middleware (e.g., the RabbitMQ exchange or Kafka topic). If the channel is bound as a consumer, it could be bound to multiple destinations and the destination names can be specified as comma separated String values. If not set, the channel name is used instead. The default value of this property cannot be overridden.
  • group

    • The consumer group of the channel. Applies only to inbound bindings. See Consumer Groups.

Default: null (indicating an anonymous consumer).

  • contentType

    • The content type of the channel.

Default: null (so that no type coercion is performed).

Default: null (the default binder will be used, if one exists).

28.2.2 Consumer properties

The following binding properties are available for input bindings only and must be prefixed with spring.cloud.stream.bindings.<channelName>.consumer. , e.g. spring.cloud.stream.bindings.input.consumer.concurrency=3 .

Default values can be set by using the prefix spring.cloud.stream.default.consumer , e.g. spring.cloud.stream.default.consumer.headerMode=raw .

  • concurrency

    • The concurrency of the inbound consumer.

Default: 1 .

  • partitioned

    • Whether the consumer receives data from a partitioned producer.

Default: false .

  • headerMode

    • When set to raw , disables header parsing on input. Effective only for messaging middleware that does not support message headers natively and requires header embedding. Useful when inbound data is coming from outside Spring Cloud Stream applications.

Default: embeddedHeaders .

  • maxAttempts

    • If processing fails, the number of attempts to process the message (including the first). Set to 1 to disable retry.

Default: 3 .

  • backOffInitialInterval

    • The backoff initial interval on retry.

Default: 1000 .

  • backOffMaxInterval

    • The maximum backoff interval.

Default: 10000 .

  • backOffMultiplier

    • The backoff multiplier.

Default: 2.0 .

  • instanceIndex

    • When set to a value greater than equal to zero, allows customizing the instance index of this consumer (if different from spring.cloud.stream.instanceIndex ). When set to a negative value, it will default to spring.cloud.stream.instanceIndex .

Default: -1 .

  • instanceCount

    • When set to a value greater than equal to zero, allows customizing the instance count of this consumer (if different from spring.cloud.stream.instanceCount ). When set to a negative value, it will default to spring.cloud.stream.instanceCount .

Default: -1 .

28.2.3 Producer Properties

The following binding properties are available for output bindings only and must be prefixed with spring.cloud.stream.bindings.<channelName>.producer. , e.g. spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id .

Default values can be set by using the prefix spring.cloud.stream.default.producer , e.g. spring.cloud.stream.default.producer.partitionKeyExpression=payload.id .

  • partitionKeyExpression

    • A SpEL expression that determines how to partition outbound data. If set, or if partitionKeyExtractorClass is set, outbound data on this channel will be partitioned, and partitionCount must be set to a value greater than 1 to be effective. The two options are mutually exclusive. See Section 25.5, “Partitioning Support”.

Default: null.

  • partitionKeyExtractorClass

    • A PartitionKeyExtractorStrategy implementation. If set, or if partitionKeyExpression is set, outbound data on this channel will be partitioned, and partitionCount must be set to a value greater than 1 to be effective. The two options are mutually exclusive. See Section 25.5, “Partitioning Support”.

Default: null.

  • partitionSelectorClass

    • A PartitionSelectorStrategy implementation. Mutually exclusive with partitionSelectorExpression . If neither is set, the partition will be selected as the hashCode(key) % partitionCount , where key is computed via either partitionKeyExpression or partitionKeyExtractorClass .

Default: null.

  • partitionSelectorExpression

    • A SpEL expression for customizing partition selection. Mutually exclusive with partitionSelectorClass . If neither is set, the partition will be selected as the hashCode(key) % partitionCount , where key is computed via either partitionKeyExpression or partitionKeyExtractorClass .

Default: null.

  • partitionCount

    • The number of target partitions for the data, if partitioning is enabled. Must be set to a value greater than 1 if the producer is partitioned. On Kafka, interpreted as a hint; the larger of this and the partition count of the target topic is used instead.

Default: 1 .

  • requiredGroups

    • A comma-separated list of groups to which the producer must ensure message delivery even if they start after it has been created (e.g., by pre-creating durable queues in RabbitMQ).
  • headerMode

    • When set to raw , disables header embedding on output. Effective only for messaging middleware that does not support message headers natively and requires header embedding. Useful when producing data for non-Spring Cloud Stream applications.

Default: embeddedHeaders .

  • useNativeEncoding

    • When set to true , the outbound message is serialized directly by client library, which must be configured correspondingly (e.g. setting an appropriate Kafka producer value serializer). When this configuration is being used, the outbound message marshalling is not based on the contentType of the binding. When native encoding is used, it is the responsibility of the consumer to use appropriate decoder (ex: Kafka consumer value de-serializer) to deserialize the inbound message. Also, when native encoding/decoding is used the headerMode property is ignored and headers will not be embedded into the message.

Default: false .

Default: false .

28.3 Using dynamically bound destinations

Besides the channels defined via @EnableBinding , Spring Cloud Stream allows applications to send messages to dynamically bound destinations. This is useful, for example, when the target destination needs to be determined at runtime. Applications can do so by using the BinderAwareChannelResolver bean, registered automatically by the @EnableBinding annotation.

The property 'spring.cloud.stream.dynamicDestinations' can be used for restricting the dynamic destination names to a set known beforehand (whitelisting). If the property is not set, any destination can be bound dynamicaly.

The BinderAwareChannelResolver can be used directly as in the following example, in which a REST controller uses a path variable to decide the target channel.

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

	@Autowired
	private BinderAwareChannelResolver resolver;

	@RequestMapping(path = "/{target}", method = POST, consumes = "*/*")
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void handleRequest(@RequestBody String body, @PathVariable("target") target,
	       @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
		sendMessage(body, target, contentType);
	}

	private void sendMessage(String body, String target, Object contentType) {
		resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
				new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
	}
}

After starting the application on the default port 8080, when sending the following data:

curl -H "Content-Type: application/json" -X POST -d "customer-1" http://localhost:8080/customers

curl -H "Content-Type: application/json" -X POST -d "order-1" http://localhost:8080/orders

The destinations 'customers' and 'orders' are created in the broker (for example: exchange in case of Rabbit or topic in case of Kafka) with the names 'customers' and 'orders', and the data is published to the appropriate destinations.

The BinderAwareChannelResolver is a general purpose Spring Integration DestinationResolver and can be injected in other components. For example, in a router using a SpEL expression based on the target field of an incoming JSON message.

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

	@Autowired
	private BinderAwareChannelResolver resolver;

	@RequestMapping(path = "/", method = POST, consumes = "application/json")
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
		sendMessage(body, contentType);
	}

	private void sendMessage(Object body, Object contentType) {
		routerChannel().send(MessageBuilder.createMessage(body,
				new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
	}

	@Bean(name = "routerChannel")
	public MessageChannel routerChannel() {
		return new DirectChannel();
	}

	@Bean
	@ServiceActivator(inputChannel = "routerChannel")
	public ExpressionEvaluatingRouter router() {
        ExpressionEvaluatingRouter router =
            new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.target"));
		router.setDefaultOutputChannelName("default-output");
		router.setChannelResolver(resolver);
		return router;
	}
}