26. Programming Model

This section describes Spring Cloud Stream’s programming model. Spring Cloud Stream provides a number of predefined annotations for declaring bound input and output channels as well as how to listen to channels.

26.1 Declaring and Binding Channels

26.1.1 Triggering Binding Via @EnableBinding

You can turn a Spring application into a Spring Cloud Stream application by applying the @EnableBinding annotation to one of the application’s configuration classes. The @EnableBinding annotation itself is meta-annotated with @Configuration and triggers the configuration of Spring Cloud Stream infrastructure:

...
@Import(...)
@Configuration
@EnableIntegration
public @interface EnableBinding {
    ...
    Class<?>[] value() default {};
}

The @EnableBinding annotation can take as parameters one or more interface classes that contain methods which represent bindable components (typically message channels).

The @EnableBinding annotation is only required on your Configuration classes, you can provide as many binding interfaces as you need, for instance: @EnableBinding(value={Orders.class, Payment.class} . Where both Order and Payment interfaces would declare @Input and @Output channels.

26.1.2 @Input and @Output

A Spring Cloud Stream application can have an arbitrary number of input and output channels defined in an interface as @Input and @Output methods:

public interface Barista {

    @Input
    SubscribableChannel orders();

    @Output
    MessageChannel hotDrinks();

    @Output
    MessageChannel coldDrinks();
}

Using this interface as a parameter to @EnableBinding will trigger the creation of three bound channels named orders , hotDrinks , and coldDrinks , respectively.

@EnableBinding(Barista.class)
public class CafeConfiguration {

   ...
}

In Spring Cloud Stream, the bindable MessageChannel components are the Spring Messaging MessageChannel (for outbound) and its extension SubscribableChannel (for inbound). Using the same mechanism other bindable components can be supported. KStream support in Spring Cloud Stream Kafka binder is one such example where KStream is used as inbound/outbound bindable components. In this documentation, we will continue to refer to MessageChannels as the bindable components.

Customizing Channel Names

Using the @Input and @Output annotations, you can specify a customized channel name for the channel, as shown in the following example:

public interface Barista {
    ...
    @Input("inboundOrders")
    SubscribableChannel orders();
}

In this example, the created bound channel will be named inboundOrders .

Source, Sink, and Processor

For easy addressing of the most common use cases, which involve either an input channel, an output channel, or both, Spring Cloud Stream provides three predefined interfaces out of the box.

Source can be used for an application which has a single outbound channel.

public interface Source {

  String OUTPUT = "output";

  @Output(Source.OUTPUT)
  MessageChannel output();

}

Sink can be used for an application which has a single inbound channel.

public interface Sink {

  String INPUT = "input";

  @Input(Sink.INPUT)
  SubscribableChannel input();

}

Processor can be used for an application which has both an inbound channel and an outbound channel.

public interface Processor extends Source, Sink {
}

Spring Cloud Stream provides no special handling for any of these interfaces; they are only provided out of the box.

26.1.3 Accessing Bound Channels

Injecting the Bound Interfaces

For each bound interface, Spring Cloud Stream will generate a bean that implements the interface. Invoking a @Input -annotated or @Output -annotated method of one of these beans will return the relevant bound channel.

The bean in the following example sends a message on the output channel when its hello method is invoked. It invokes output() on the injected Source bean to retrieve the target channel.

@Component
public class SendingBean {

    private Source source;

    @Autowired
    public SendingBean(Source source) {
        this.source = source;
    }

    public void sayHello(String name) {
         source.output().send(MessageBuilder.withPayload(name).build());
    }
}

Injecting Channels Directly

Bound channels can be also injected directly:

@Component
public class SendingBean {

    private MessageChannel output;

    @Autowired
    public SendingBean(MessageChannel output) {
        this.output = output;
    }

    public void sayHello(String name) {
         output.send(MessageBuilder.withPayload(name).build());
    }
}

If the name of the channel is customized on the declaring annotation, that name should be used instead of the method name. Given the following declaration:

public interface CustomSource {
    ...
    @Output("customOutput")
    MessageChannel output();
}

The channel will be injected as shown in the following example:

@Component
public class SendingBean {

    private MessageChannel output;

    @Autowired
    public SendingBean(@Qualifier("customOutput") MessageChannel output) {
        this.output = output;
    }

    public void sayHello(String name) {
         this.output.send(MessageBuilder.withPayload(name).build());
    }
}

26.1.4 Producing and Consuming Messages

You can write a Spring Cloud Stream application using either Spring Integration annotations or Spring Cloud Stream’s @StreamListener annotation. The @StreamListener annotation is modeled after other Spring Messaging annotations (such as @MessageMapping , @JmsListener , @RabbitListener , etc.) but adds content type management and type coercion features.

Native Spring Integration Support

Because Spring Cloud Stream is based on Spring Integration, Stream completely inherits Integration’s foundation and infrastructure as well as the component itself. For example, you can attach the output channel of a Source to a MessageSource :

@EnableBinding(Source.class)
public class TimerSource {

  @Value("${format}")
  private String format;

  @Bean
  @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "${fixedDelay}", maxMessagesPerPoll = "1"))
  public MessageSource<String> timerMessageSource() {
    return () -> new GenericMessage<>(new SimpleDateFormat(format).format(new Date()));
  }
}

Or you can use a processor’s channels in a transformer:

@EnableBinding(Processor.class)
public class TransformProcessor {
  @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
  public Object transform(String message) {
    return message.toUpperCase();
  }
}

It’s important to understant that when you consume from the same binding using @StreamListener a pubsub model is used, where each method annotated with @StreamListener receives it’s own copy of the message, each one has its own consumer group. However, if you share a bindable channel as an input for @Aggregator , @Transformer or @ServiceActivator , those will consume in a competing model, no individual consumer group is created for each subscription.

Spring Integration Error Channel Support

Spring Cloud Stream supports publishing error messages received by the Spring Integration global error channel. Error messages sent to the errorChannel can be published to a specific destination at the broker by configuring a binding for the outbound target named error . For example, to publish error messages to a broker destination named "myErrors", provide the following property: spring.cloud.stream.bindings.error.destination=myErrors .

Message Channel Binders and Error Channels

Starting with version 1.3, some MessageChannel - based binders publish errors to a discrete error channel for each destination. In addition, these error channels are bridged to the global Spring Integration errorChannel mentioned above. You can therefore consume errors for specific destinations and/or for all destinations, using a standard Spring Integration flow ( IntegrationFlow , @ServiceActivator , etc).

On the consumer side, the listener thread catches any exceptions and forwards an ErrorMessage to the destination’s error channel. The payload of the message is a MessagingException with the normal failedMessage and cause properties. Usually, the raw data received from the broker is included in a header. For binders that support (and are configured with) a dead letter destination; a MessagePublishingErrorHandler is subscribed to the channel, and the raw data is forwarded to the dead letter destination.

On the producer side; for binders that support some kind of async result after publishing messages (e.g. RabbitMQ, Kafka), you can enable an error channel by setting the …producer.errorChannelEnabled to true . The payload of the ErrorMessage depends on the binder implementation but will be a MessagingException with the normal failedMessage property, as well as additional properties about the failure. Refer to the binder documentation for complete details.

Using @StreamListener for Automatic Content Type Handling

Complementary to its Spring Integration support, Spring Cloud Stream provides its own @StreamListener annotation, modeled after other Spring Messaging annotations (e.g. @MessageMapping , @JmsListener , @RabbitListener , etc.). The @StreamListener annotation provides a simpler model for handling inbound messages, especially when dealing with use cases that involve content type management and type coercion.

Spring Cloud Stream provides an extensible MessageConverter mechanism for handling data conversion by bound channels and for, in this case, dispatching to methods annotated with @StreamListener . The following is an example of an application which processes external Vote events:

@EnableBinding(Sink.class)
public class VoteHandler {

  @Autowired
  VotingService votingService;

  @StreamListener(Sink.INPUT)
  public void handle(Vote vote) {
    votingService.record(vote);
  }
}

The distinction between @StreamListener and a Spring Integration @ServiceActivator is seen when considering an inbound Message that has a String payload and a contentType header of application/json . In the case of @StreamListener , the MessageConverter mechanism will use the contentType header to parse the String payload into a Vote object.

As with other Spring Messaging methods, method arguments can be annotated with @Payload , @Headers and @Header .

For methods which return data, you must use the @SendTo annotation to specify the output binding destination for data returned by the method:

@EnableBinding(Processor.class)
public class TransformProcessor {

  @Autowired
  VotingService votingService;

  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public VoteResult handle(Vote vote) {
    return votingService.record(vote);
  }
}

Using @StreamListener for dispatching messages to multiple methods

Since version 1.2, Spring Cloud Stream supports dispatching messages to multiple @StreamListener methods registered on an input channel, based on a condition.

In order to be eligible to support conditional dispatching, a method must satisfy the follow conditions:

  • it must not return a value

  • it must be an individual message handling method (reactive API methods are not supported)

The condition is specified via a SpEL expression in the condition attribute of the annotation and is evaluated for each message. All the handlers that match the condition will be invoked in the same thread and no assumption must be made about the order in which the invocations take place.

An example of using @StreamListener with dispatching conditions can be seen below. In this example, all the messages bearing a header type with the value foo will be dispatched to the receiveFoo method, and all the messages bearing a header type with the value bar will be dispatched to the receiveBar method.

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='foo'")
    public void receiveFoo(@Payload FooPojo fooPojo) {
       // handle the message
    }

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bar'")
    public void receiveBar(@Payload BarPojo barPojo) {
       // handle the message
    }
}

Dispatching via @StreamListener conditions is only supported for handlers of individual messages, and not for reactive programming support (described below).

26.1.5 Reactive Programming Support

Spring Cloud Stream also supports the use of reactive APIs where incoming and outgoing data is handled as continuous data flows. Support for reactive APIs is available via the spring-cloud-stream-reactive , which needs to be added explicitly to your project.

The programming model with reactive APIs is declarative, where instead of specifying how each individual message should be handled, you can use operators that describe functional transformations from inbound to outbound data flows.

Spring Cloud Stream supports the following reactive APIs:

  • Reactor

  • RxJava 1.x

In the future, it is intended to support a more generic model based on Reactive Streams.

The reactive programming model is also using the @StreamListener annotation for setting up reactive handlers. The differences are that:

  • the @StreamListener annotation must not specify an input or output, as they are provided as arguments and return values from the method;

  • the arguments of the method must be annotated with @Input and @Output indicating which input or output will the incoming and respectively outgoing data flows connect to;

  • the return value of the method, if any, will be annotated with @Output , indicating the input where data shall be sent.

Reactive programming support requires Java 1.8.

As of Spring Cloud Stream 1.1.1 and later (starting with release train Brooklyn.SR2), reactive programming support requires the use of Reactor 3.0.4.RELEASE and higher. Earlier Reactor versions (including 3.0.1.RELEASE, 3.0.2.RELEASE and 3.0.3.RELEASE) are not supported. spring-cloud-stream-reactive will transitively retrieve the proper version, but it is possible for the project structure to manage the version of the io.projectreactor:reactor-core to an earlier release, especially when using Maven. This is the case for projects generated via Spring Initializr with Spring Boot 1.x, which will override the Reactor version to 2.0.8.RELEASE . In such cases you must ensure that the proper version of the artifact is released. This can be simply achieved by adding a direct dependency on io.projectreactor:reactor-core with a version of 3.0.4.RELEASE or later to your project.

The use of term reactive is currently referring to the reactive APIs being used and not to the execution model being reactive (i.e. the bound endpoints are still using a 'push' rather than 'pull' model). While some backpressure support is provided by the use of Reactor, we do intend on the long run to support entirely reactive pipelines by the use of native reactive clients for the connected middleware.

Reactor-based handlers

A Reactor based handler can have the following argument types:

  • For arguments annotated with @Input , it supports the Reactor type Flux . The parameterization of the inbound Flux follows the same rules as in the case of individual message handling: it can be the entire Message , a POJO which can be the Message payload, or a POJO which is the result of a transformation based on the Message content-type header. Multiple inputs are provided;

  • For arguments annotated with Output , it supports the type FluxSender which connects a Flux produced by the method with an output. Generally speaking, specifying outputs as arguments is only recommended when the method can have multiple outputs;

A Reactor based handler supports a return type of Flux , case in which it must be annotated with @Output . We recommend using the return value of the method when a single output flux is available.

Here is an example of a simple Reactor-based Processor.

@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {

  @StreamListener
  @Output(Processor.OUTPUT)
  public Flux<String> receive(@Input(Processor.INPUT) Flux<String> input) {
    return input.map(s -> s.toUpperCase());
  }
}

The same processor using output arguments looks like this:

@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {

  @StreamListener
  public void receive(@Input(Processor.INPUT) Flux<String> input,
     @Output(Processor.OUTPUT) FluxSender output) {
     output.send(input.map(s -> s.toUpperCase()));
  }
}

RxJava 1.x support

RxJava 1.x handlers follow the same rules as Reactor-based one, but will use Observable and ObservableSender arguments and return types.

So the first example above will become:

@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {

  @StreamListener
  @Output(Processor.OUTPUT)
  public Observable<String> receive(@Input(Processor.INPUT) Observable<String> input) {
    return input.map(s -> s.toUpperCase());
  }
}

The second example above will become:

@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {

  @StreamListener
  public void receive(@Input(Processor.INPUT) Observable<String> input,
     @Output(Processor.OUTPUT) ObservableSender output) {
     output.send(input.map(s -> s.toUpperCase()));
  }
}

Reactive Sources

Spring Cloud Stream reactive support also provides the ability for creating reactive sources through the StreamEmitter annotation. Using StreamEmitter annotation, a regular source may be converted to a reactive one. StreamEmitter is a method level annotation that marks a method to be an emitter to outputs declared via EnableBinding. It is not allowed to use the Input annotation along with StreamEmitter, as the methods marked with this annotation are not listening from any input, rather generating to an output. Following the same programming model used in StreamListener, StreamEmitter also allows flexible ways of using the Output annotation depending on whether the method has any arguments, return type etc.

Here are some examples of using StreamEmitter in various styles.

The following example will emit the "Hello World" message every millisecond and publish to a Flux. In this case, the resulting messages in Flux will be sent to the output channel of the Source.

@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {

  @StreamEmitter
  @Output(Source.OUTPUT)
  public Flux<String> emit() {
    return Flux.intervalMillis(1)
            .map(l -> "Hello World");
  }
}

Following is another flavor of the same sample as above. Instead of returning a Flux, this method uses a FluxSender to programmatically send Flux from a source.

@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {

  @StreamEmitter
  @Output(Source.OUTPUT)
  public void emit(FluxSender output) {
    output.send(Flux.intervalMillis(1)
            .map(l -> "Hello World"));
  }
}

Following is exactly same as the above snippet in functionality and style. However, instead of using an explicit Output annotation at the method level, it is used as the method parameter level.

@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {

  @StreamEmitter
  public void emit(@Output(Source.OUTPUT) FluxSender output) {
    output.send(Flux.intervalMillis(1)
            .map(l -> "Hello World"));
  }
}

Here is yet another flavor of writing reacting sources using the Reactive Streams Publisher API and the support for it in the Spring Integration Java DSL . The Publisher is still using Reactor Flux under the hood, but from an application perspective, that is transparent to the user and only needs Reactive Streams and Java DSL for Spring Integration.

@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {

  @StreamEmitter
  @Output(Source.OUTPUT)
  @Bean
  public Publisher<Message<String>> emit() {
    return IntegrationFlows.from(() ->
                new GenericMessage<>("Hello World"),
        e -> e.poller(p -> p.fixedDelay(1)))
        .toReactivePublisher();
  }
}

26.1.6 Aggregation

Spring Cloud Stream provides support for aggregating multiple applications together, connecting their input and output channels directly and avoiding the additional cost of exchanging messages via a broker. As of version 1.0 of Spring Cloud Stream, aggregation is supported only for the following types of applications:

  • sources - applications with a single output channel named output , typically having a single binding of the type org.springframework.cloud.stream.messaging.Source

  • sinks - applications with a single input channel named input , typically having a single binding of the type org.springframework.cloud.stream.messaging.Sink

  • processors - applications with a single input channel named input and a single output channel named output , typically having a single binding of the type org.springframework.cloud.stream.messaging.Processor .

They can be aggregated together by creating a sequence of interconnected applications, in which the output channel of an element in the sequence is connected to the input channel of the next element, if it exists. A sequence can start with either a source or a processor, it can contain an arbitrary number of processors and must end with either a processor or a sink.

Depending on the nature of the starting and ending element, the sequence may have one or more bindable channels, as follows:

  • if the sequence starts with a source and ends with a sink, all communication between the applications is direct and no channels will be bound

  • if the sequence starts with a processor, then its input channel will become the input channel of the aggregate and will be bound accordingly

  • if the sequence ends with a processor, then its output channel will become the output channel of the aggregate and will be bound accordingly

Aggregation is performed using the AggregateApplicationBuilder utility class, as in the following example. Let’s consider a project in which we have source, processor and a sink, which may be defined in the project, or may be contained in one of the project’s dependencies.

Each component (source, sink or processor) in an aggregate application must be provided in a separate package if the configuration classes use @SpringBootApplication . This is required to avoid cross-talk between applications, due to the classpath scanning performed by @SpringBootApplication on the configuration classes inside the same package. In the example below, it can be seen that the Source, Processor and Sink application classes are grouped in separate packages. A possible alternative is to provide the source, sink or processor configuration in a separate @Configuration class, avoid the use of @SpringBootApplication / @ComponentScan and use those for aggregation.

package com.app.mysink;

@SpringBootApplication
@EnableBinding(Sink.class)
public class SinkApplication {

	private static Logger logger = LoggerFactory.getLogger(SinkApplication.class);

	@ServiceActivator(inputChannel=Sink.INPUT)
	public void loggerSink(Object payload) {
		logger.info("Received: " + payload);
	}
}
package com.app.myprocessor;

// Imports omitted

@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApplication {

	@Transformer
	public String loggerSink(String payload) {
		return payload.toUpperCase();
	}
}
package com.app.mysource;

// Imports omitted

@SpringBootApplication
@EnableBinding(Source.class)
public class SourceApplication {

	@InboundChannelAdapter(value = Source.OUTPUT)
	public String timerMessageSource() {
		return new SimpleDateFormat().format(new Date());
	}
}

Each configuration can be used for running a separate component, but in this case they can be aggregated together as follows:

package com.app;

// Imports omitted

@SpringBootApplication
public class SampleAggregateApplication {

	public static void main(String[] args) {
		new AggregateApplicationBuilder()
			.from(SourceApplication.class).args("--fixedDelay=5000")
			.via(ProcessorApplication.class)
			.to(SinkApplication.class).args("--debug=true").run(args);
	}
}

The starting component of the sequence is provided as argument to the from() method. The ending component of the sequence is provided as argument to the to() method. Intermediate processors are provided as argument to the via() method. Multiple processors of the same type can be chained together (e.g. for pipelining transformations with different configurations). For each component, the builder can provide runtime arguments for Spring Boot configuration.

Configuring aggregate application

Spring Cloud Stream supports passing properties for the individual applications inside the aggregate application using 'namespace' as prefix.

The namespace can be set for applications as follows:

@SpringBootApplication
public class SampleAggregateApplication {

	public static void main(String[] args) {
		new AggregateApplicationBuilder()
			.from(SourceApplication.class).namespace("source").args("--fixedDelay=5000")
			.via(ProcessorApplication.class).namespace("processor1")
			.to(SinkApplication.class).namespace("sink").args("--debug=true").run(args);
	}
}

Once the 'namespace' is set for the individual applications, the application properties with the namespace as prefix can be passed to the aggregate application using any supported property source (commandline, environment properties etc.,)

For instance, to override the default fixedDelay and debug properties of 'source' and 'sink' applications:

java -jar target/MyAggregateApplication-0.0.1-SNAPSHOT.jar --source.fixedDelay=10000 --sink.debug=false

Configuring binding service properties for non self contained aggregate application

The non self-contained aggregate application is bound to external broker via either or both the inbound/outbound components (typically, message channels) of the aggregate application while the applications inside the aggregate application are directly bound. For example: a source application’s output and a processor application’s input are directly bound while the processor’s output channel is bound to an external destination at the broker. When passing the binding service properties for non-self contained aggregate application, it is required to pass the binding service properties to the aggregate application instead of setting them as 'args' to individual child application. For instance,

@SpringBootApplication
public class SampleAggregateApplication {

	public static void main(String[] args) {
		new AggregateApplicationBuilder()
			.from(SourceApplication.class).namespace("source").args("--fixedDelay=5000")
			.via(ProcessorApplication.class).namespace("processor1").args("--debug=true").run(args);
	}
}

The binding properties like --spring.cloud.stream.bindings.output.destination=processor-output need to be specified as one of the external configuration properties (cmdline arg etc.,).