13. Spring Batch Integration

13.1. Spring Batch Integration Introduction

Many users of Spring Batch may encounter requirements that are outside the scope of Spring Batch, yet may be efficiently and concisely implemented using Spring Integration. Conversely, Spring Batch users may encounter Spring Batch requirements and need a way to efficiently integrate both frameworks. In this context several patterns and use-cases emerge and Spring Batch Integration will address those requirements.

The line between Spring Batch and Spring Integration is not always clear, but there are guidelines that one can follow. Principally, these are: think about granularity, and apply common patterns. Some of those common patterns are described in this reference manual section.

Adding messaging to a batch process enables automation of operations, and also separation and strategizing of key concerns. For example a message might trigger a job to execute, and then the sending of the message can be exposed in a variety of ways. Or when a job completes or fails that might trigger a message to be sent, and the consumers of those messages might have operational concerns that have nothing to do with the application itself. Messaging can also be embedded in a job, for example reading or writing items for processing via channels. Remote partitioning and remote chunking provide methods to distribute workloads over an number of workers.

Some key concepts that we will cover are:

13.1.1. Namespace Support

Since Spring Batch Integration 1.3, dedicated XML Namespace support was added, with the aim to provide an easier configuration experience. In order to activate the namespace, add the following namespace declarations to your Spring XML Application Context file:

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
  xsi:schemaLocation="
    http://www.springframework.org/schema/batch-integration
    http://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd">

    ...

</beans>

A fully configured Spring XML Application Context file for Spring Batch Integration may look like the following:

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:batch="http://www.springframework.org/schema/batch"
  xmlns:batch-int="http://www.springframework.org/schema/batch-integration"
  xsi:schemaLocation="
    http://www.springframework.org/schema/batch-integration
    http://www.springframework.org/schema/batch-integration/spring-batch-integration.xsd
    http://www.springframework.org/schema/batch
    http://www.springframework.org/schema/batch/spring-batch.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration.xsd">

    ...

</beans>

Appending version numbers to the referenced XSD file is also allowed but, as a version-less declaration will always use the latest schema, we generally don't recommend appending the version number to the XSD name. Adding a version number, for instance, would create possibly issues when updating the Spring Batch Integration dependencies as they may require more recent versions of the XML schema.

13.1.2. Launching Batch Jobs through Messages

When starting batch jobs using the core Spring Batch API you basically have 2 options:

  • Command line via the CommandLineJobRunner

  • Programatically via either JobOperator.start() or JobLauncher.run().

For example, you may want to use the CommandLineJobRunner when invoking Batch Jobs using a shell script. Alternatively, you may use the JobOperator directly, for example when using Spring Batch as part of a web application. However, what about more complex use-cases? Maybe you need to poll a remote (S)FTP server to retrieve the data for the Batch Job. Or your application has to support multiple different data sources simultaneously. For example, you may receive data files not only via the web, but also FTP etc. Maybe additional transformation of the input files is needed before invoking Spring Batch.

Therefore, it would be much more powerful to execute the batch job using Spring Integration and its numerous adapters. For example, you can use a File Inbound Channel Adapter to monitor a directory in the file-system and start the Batch Job as soon as the input file arrives. Additionally you can create Spring Integration flows that use multiple different adapters to easily ingest data for your Batch Jobs from multiple sources simultaneously using configuration only. Implementing all these scenarios with Spring Integration is easy as it allow for an decoupled event-driven execution of the JobLauncher.

Spring Batch Integration provides the JobLaunchingMessageHandler class that you can use to launch batch jobs. The input for the JobLaunchingMessageHandler is provided by a Spring Integration message, which payload is of type JobLaunchRequest. This class is a wrapper around the Job that needs to be launched as well as the JobParameters necessary to launch the Batch job.

The following image illustrates the typical Spring Integration message flow in order to start a Batch job. The EIP (Enterprise IntegrationPatterns) website provides a full overview of messaging icons and their descriptions.

Transforming a file into a JobLaunchRequest

package io.spring.sbi;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

import java.io.File;

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

The JobExecution Response

When a Batch Job is being executed, a JobExecution instance is returned. This instance can be used to determine the status of an execution. If a JobExecution was able to be created successfully, it will always be returned, regardless of whether or not the actual execution was successful.

The exact behavior on how the JobExecution instance is returned depends on the provided TaskExecutor. If a synchronous (single-threaded) TaskExecutor implementation is used, the JobExecution response is only returned after the job completes. When using an asynchronous TaskExecutor, the JobExecution instance is returned immediately. Users can then take the id of JobExecution instance (JobExecution.getJobId()) and query the JobRepository for the job's updated status using the JobExplorer. For more information, please refer to the Spring Batch reference documentation on Querying the Repository .

The following configuration will create a file inbound-channel-adapter to listen for CSV files in the provided directory, hand them off to our transformer (FileMessageToJobRequest), launch the job via the Job Launching Gateway then simply log the output of the JobExecution via the logging-channel-adapter.

Spring Batch Integration Configuration

<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>

<int-file:inbound-channel-adapter id="filePoller"
    channel="inboundFileChannel"
    directory="file:/tmp/myfiles/"
    filename-pattern="*.csv">
  <int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>

<int:transformer input-channel="inboundFileChannel"
    output-channel="outboundJobRequestChannel">
  <bean class="io.spring.sbi.FileMessageToJobRequest">
    <property name="job" ref="personJob"/>
    <property name="fileParameterName" value="input.file.name"/>
  </bean>
</int:transformer>

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel"/>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

Now that we are polling for files and launching jobs, we need to configure for example our Spring Batch ItemReader to utilize found file represented by the job parameter "input.file.name":

Example ItemReader Configuration

<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
    scope="step">
  <property name="resource" value="file://#{jobParameters['input.file.name']}"/>
    ...
</bean>

The main points of interest here are injecting the value of #{jobParameters['input.file.name']} as the Resource property value and setting the ItemReader bean to be of Step scope to take advantage of the late binding support which allows access to the jobParameters variable.

Available Attributes of the Job-Launching Gateway
  • id Identifies the underlying Spring bean definition, which is an instance of either:

    • EventDrivenConsumer

    • PollingConsumer

    The exact implementation depends on whether the component's input channel is a:

    • SubscribableChannel or

    • PollableChannel

  • auto-startup Boolean flag to indicate that the endpoint should start automatically on startup. The default istrue.

  • request-channel The input MessageChannel of this endpoint.

  • reply-channel Message Channel to which the resulting JobExecution payload will be sent.

  • reply-timeout Allows you to specify how long this gateway will wait for the reply message to be sent successfully to the reply channel before throwing an exception. This attribute only applies when the channel might block, for example when using a bounded queue channel that is currently full. Also, keep in mind that when sending to a DirectChannel, the invocation will occur in the sender's thread. Therefore, the failing of the send operation may be caused by other components further downstream. The reply-timeout attribute maps to the sendTimeout property of the underlying MessagingTemplate instance. The attribute will default, if not specified, to-1, meaning that by default, the Gateway will wait indefinitely. The value is specified in milliseconds.

  • job-launcher Pass in a custom JobLauncher bean reference. This attribute is optional. If not specified the adapter will re-use the instance that is registered under the id jobLauncher. If no default instance exists an exception is thrown.

  • order Specifies the order for invocation when this endpoint is connected as a subscriber to a SubscribableChannel.

Sub-Elements

When this Gateway is receiving messages from a PollableChannel, you must either provide a global default Poller or provide a Poller sub-element to the Job Launching Gateway:

<batch-int:job-launching-gateway request-channel="queueChannel"
    reply-channel="replyChannel" job-launcher="jobLauncher">
  <int:poller fixed-rate="1000"/>
</batch-int:job-launching-gateway>

13.1.3. Providing Feedback with Informational Messages

As Spring Batch jobs can run for long times, providing progress information will be critical. For example, stake-holders may want to be notified if a some or all parts of a Batch Job has failed. Spring Batch provides support for this information being gathered through:

  • Active polling or

  • Event-driven, using listeners.

When starting a Spring Batch job asynchronously, e.g. by using the Job Launching Gateway, a JobExecution instance is returned. Thus, JobExecution.getJobId() can be used to continuously poll for status updates by retrieving updated instances of the JobExecution from the JobRepository using the JobExplorer. However, this is considered sub-optimal and an event-driven approach should be preferred.

Therefore, Spring Batch provides listeners such as:

  • StepListener

  • ChunkListener

  • JobExecutionListener

In the following example, a Spring Batch job was configured with a StepExecutionListener. Thus, Spring Integration will receive and process any step before/after step events. For example, the received StepExecution can be inspected using a Router. Based on the results of that inspection, various things can occur for example routing a message to a Mail Outbound Channel Adapter, so that an Email notification can be send out based on some condition.

Below is an example of how a listener is configured to send a message to a Gateway for StepExecution events and log its output to a logging-channel-adapter:

First create the notifications integration beans:

<int:channel id="stepExecutionsChannel"/>

<int:gateway id="notificationExecutionsListener"
    service-interface="org.springframework.batch.core.StepExecutionListener"
    default-request-channel="stepExecutionsChannel"/>

<int:logging-channel-adapter channel="stepExecutionsChannel"/>

Then modify your job to add a step level listener:

<job id="importPayments">
    <step id="step1">
        <tasklet ../>
            <chunk ../>
            <listeners>
                <listener ref="notificationExecutionsListener"/>
            </listeners>
        </tasklet>
        ...
    </step>
</job>

13.1.4. Asynchronous Processors

Asynchronous Processors help you to to scale the processing of items. In the asynchronous processor use-case, an AsyncItemProcessor serves as a dispatcher, executing the ItemProcessor's logic for an item on a new thread. The Future is passed to the AsynchItemWriter to be written once the processor completes.

Therefore, you can increase performance by using asynchronous item processing, basically allowing you to implement fork-join scenarios. The AsyncItemWriter will gather the results and write back the chunk as soon as all the results become available.

Configuration of both the AsyncItemProcessor and AsyncItemWriter are simple, first the AsyncItemProcessor:

<bean id="processor"
    class="org.springframework.batch.integration.async.AsyncItemProcessor">
  <property name="delegate">
    <bean class="your.ItemProcessor"/>
  </property>
  <property name="taskExecutor">
    <bean class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
  </property>
</bean>

The property "delegate" is actually a reference to your ItemProcessor bean and the "taskExecutor" property is a reference to the TaskExecutor of your choice.

Then we configure the AsyncItemWriter:

<bean id="itemWriter"
    class="org.springframework.batch.integration.async.AsyncItemWriter">
  <property name="delegate">
    <bean id="itemWriter" class="your.ItemWriter"/>
  </property>
</bean>

Again, the property "delegate" is actually a reference to your ItemWriter bean.

13.1.5. Externalizing Batch Process Execution

The integration approaches discussed so far suggest use-cases where Spring Integration wraps Spring Batch like an outer-shell. However, Spring Batch can also use Spring Integration internally. Using this approach, Spring Batch users can delegate the processing of items or even chunks to outside processes. This allows you to offload complex processing. Spring Batch Integration provides dedicated support for:

  • Remote Chunking

  • Remote Partitioning

Remote Chunking

Taking things one step further, one can also externalize the chunk processing using the ChunkMessageChannelItemWriter which is provided by Spring Batch Integration which will send items out and collect the result. Once sent, Spring Batch will continue the process of reading and grouping items, without waiting for the results. Rather it is the responsibility of the ChunkMessageChannelItemWriter to gather the results and integrate them back into the Spring Batch process.

Using Spring Integration you have full control over the concurrency of your processes, for instance by using a QueueChannel instead of a DirectChannel. Furthermore, by relying on Spring Integration's rich collection of Channel Adapters (E.g. JMS or AMQP), you can distribute chunks of a Batch job to external systems for processing.

A simple job with a step to be remotely chunked would have a configuration similar to the following:

<job id="personJob">
  <step id="step1">
    <tasklet>
      <chunk reader="itemReader" writer="itemWriter" commit-interval="200"/>
    </tasklet>
    ...
  </step>
</job>

The ItemReader reference would point to the bean you would like to use for reading data on the master. The ItemWriter reference points to a special ItemWriter "ChunkMessageChannelItemWriter" as described above. The processor (if any) is left off the master configuration as it is configured on the slave. The following configuration provides a basic master setup. It's advised to check any additional component properties such as throttle limits and so on when implementing your use case.

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int-jms:outbound-channel-adapter id="requests" destination-name="requests"/>

<bean id="messagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate">
  <property name="defaultChannel" ref="requests"/>
  <property name="receiveTimeout" value="2000"/>
</bean>

<bean id="itemWriter"
    class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
    scope="step">
  <property name="messagingOperations" ref="messagingTemplate"/>
  <property name="replyChannel" ref="replies"/>
</bean>

<bean id="chunkHandler"
    class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean">
  <property name="chunkWriter" ref="itemWriter"/>
  <property name="step" ref="step1"/>
</bean>

<int:channel id="replies">
  <int:queue/>
</int:channel>

<int-jms:message-driven-channel-adapter id="jmsReplies"
    destination-name="replies"
    channel="replies"/>

This configuration provides us with a number of beans. We configure our messaging middleware using ActiveMQ and inbound/outbound JMS adapters provided by Spring Integration. As shown, our itemWriter bean which is referenced by our job step utilizes the ChunkMessageChannelItemWriter for writing chunks over the configured middleware.

Now lets move on to the slave configuration:

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>

<int:channel id="requests"/>
<int:channel id="replies"/>

<int-jms:message-driven-channel-adapter id="jmsIn"
    destination-name="requests"
    channel="requests"/>

<int-jms:outbound-channel-adapter id="outgoingReplies"
    destination-name="replies"
    channel="replies">
</int-jms:outbound-channel-adapter>

<int:service-activator id="serviceActivator"
    input-channel="requests"
    output-channel="replies"
    ref="chunkProcessorChunkHandler"
    method="handleChunk"/>

<bean id="chunkProcessorChunkHandler"
    class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
  <property name="chunkProcessor">
    <bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
      <property name="itemWriter">
        <bean class="io.spring.sbi.PersonItemWriter"/>
      </property>
      <property name="itemProcessor">
        <bean class="io.spring.sbi.PersonItemProcessor"/>
      </property>
    </bean>
  </property>
</bean>

Most of these configuration items should look familiar from the master configuration. Slaves do not need access to things like the Spring Batch JobRepository nor access to the actual job configuration file. The main bean of interest is the "chunkProcessorChunkHandler". The chunkProcessor property of ChunkProcessorChunkHandler takes a configured SimpleChunkProcessor which is where you would provide a reference to your ItemWriter and optionally your ItemProcessor that will run on the slave when it receives chunks from the master.

For more information, please also consult the Spring Batch manual, specifically the chapter on Remote Chunking .

Remote Partitioning

Remote Partitioning, on the other hand, is useful when the problem is not the processing of items, but the associated I/O represents the bottleneck. Using Remote Partitioning, work can be farmed out to slaves that execute complete Spring Batch steps. Thus, each slave has its own ItemReader, ItemProcessor and ItemWriter. For this purpose, Spring Batch Integration provides the MessageChannelPartitionHandler.

This implementation of the PartitionHandler interface uses MessageChannel instances to send instructions to remote workers and receive their responses. This provides a nice abstraction from the transports (E.g. JMS or AMQP) being used to communicate with the remote workers.

The reference manual section Remote Partitioning provides an overview of the concepts and components needed to configure Remote Partitioning and shows an example of using the default TaskExecutorPartitionHandler to partition in separate local threads of execution. For Remote Partitioning to multiple JVM's, two additional components are required:

  • Remoting fabric or grid environment

  • A PartitionHandler implementation that supports the desired remoting fabric or grid environment

Similar to Remote Chunking JMS can be used as the "remoting fabric" and the PartitionHandler implementation to be used as described above is the MessageChannelPartitionHandler. The example shown below assumes an existing partitioned job and focuses on the MessageChannelPartitionHandler and JMS configuration:

<bean id="partitionHandler"
   class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
  <property name="stepName" value="step1"/>
  <property name="gridSize" value="3"/>
  <property name="replyChannel" ref="outbound-replies"/>
  <property name="messagingOperations">
    <bean class="org.springframework.integration.core.MessagingTemplate">
      <property name="defaultChannel" ref="outbound-requests"/>
      <property name="receiveTimeout" value="100000"/>
    </bean>
  </property>
</bean>

<int:channel id="outbound-requests"/>
<int-jms:outbound-channel-adapter destination="requestsQueue"
    channel="outbound-requests"/>

<int:channel id="inbound-requests"/>
<int-jms:message-driven-channel-adapter destination="requestsQueue"
    channel="inbound-requests"/>

<bean id="stepExecutionRequestHandler"
    class="org.springframework.batch.integration.partition.StepExecutionRequestHandler">
  <property name="jobExplorer" ref="jobExplorer"/>
  <property name="stepLocator" ref="stepLocator"/>
</bean>

<int:service-activator ref="stepExecutionRequestHandler" input-channel="inbound-requests"
    output-channel="outbound-staging"/>

<int:channel id="outbound-staging"/>
<int-jms:outbound-channel-adapter destination="stagingQueue"
    channel="outbound-staging"/>

<int:channel id="inbound-staging"/>
<int-jms:message-driven-channel-adapter destination="stagingQueue"
    channel="inbound-staging"/>

<int:aggregator ref="partitionHandler" input-channel="inbound-staging"
    output-channel="outbound-replies"/>

<int:channel id="outbound-replies">
  <int:queue/>
</int:channel>

<bean id="stepLocator"
    class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />

Also ensure the partition handler attribute maps to the partitionHandler bean:

<job id="personJob">
  <step id="step1.master">
    <partition partitioner="partitioner" handler="partitionHandler"/>
    ...
  </step>
</job>