1. Spring Batch 4.1 的新功能

Spring Batch 4.1 版本添加了以下 features:

  • 一个新的@SpringBatchTest annotation,用于简化测试批处理组件

  • 一个新的@EnableBatchIntegration annotation 来简化 remote 分块和分区 configuration

  • 一个新的JsonItemReaderJsonFileItemWriter来支持 JSON 格式

  • 使用 Bean Validation API 添加对验证项目的支持

  • 添加对 JSR-305 注释的支持

  • FlatFileItemWriterBuilder API 的增强功能

1.1. @SpringBatchTest Annotation

Spring Batch 提供了一些不错的实用程序 classes(例如JobLauncherTestUtilsJobRepositoryTestUtils)和测试执行 listeners(StepScopeTestExecutionListenerJobScopeTestExecutionListener)来测试批处理组件。但是,为了使用这些实用程序,您必须显式配置它们。此版本引入了一个名为@SpringBatchTest的新注释,它自动将实用程序 beans 和 listeners 添加到 test context 并使它们可用于自动装配,如下面的 example 所示:

@RunWith(SpringRunner.class)
@SpringBatchTest
@ContextConfiguration(classes = {JobConfiguration.class})
public class JobTest {

   @Autowired
   private JobLauncherTestUtils jobLauncherTestUtils;

   @Autowired
   private JobRepositoryTestUtils jobRepositoryTestUtils;

   @Before
   public void clearMetadata() {
      jobRepositoryTestUtils.removeJobExecutions();
   }

   @Test
   public void testJob() throws Exception {
      // given
      JobParameters jobParameters =
            jobLauncherTestUtils.getUniqueJobParameters();

      // when
      JobExecution jobExecution =
            jobLauncherTestUtils.launchJob(jobParameters);

      // then
      Assert.assertEquals(ExitStatus.COMPLETED,
                          jobExecution.getExitStatus());
   }

}

有关此新注释的更多详细信息,请参阅单元测试章节。

1.2. @EnableBatchIntegration Annotation

设置 remote chunking job 需要定义一些 beans:

  • 用于从消息传递中间件(JMS,AMQP 和其他)获取连接的连接工厂

  • 将来自 master 的请求发送给 workers 并再次返回

  • 输入 channel 和 Spring Integration 的输出 channel 以从消息传递中间件获取消息

  • master 端的一个特殊 item writer(ChunkMessageChannelItemWriter)知道如何将数据块发送给 workers 进行处理和写入

  • worker 端的消息 listener(ChunkProcessorChunkHandler)用于从 master 接收数据

乍一看这可能有点令人生畏。此版本引入了一个名为@EnableBatchIntegration的新注释以及新的 API(RemoteChunkingMasterStepBuilderRemoteChunkingWorkerBuilder)来简化 configuration。以下 example 显示了如何使用新的 annotation 和 API:

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemoteChunkingAppConfig {

   @Autowired
   private RemoteChunkingMasterStepBuilderFactory masterStepBuilderFactory;

   @Autowired
   private RemoteChunkingWorkerBuilder workerBuilder;

   @Bean
   public TaskletStep masterStep() {
         return this.masterStepBuilderFactory
                         .get("masterStep")
                         .chunk(100)
                         .reader(itemReader())
                         .outputChannel(outgoingRequestsToWorkers())
                         .inputChannel(incomingRepliesFromWorkers())
                         .build();
   }

   @Bean
   public IntegrationFlow worker() {
         return this.workerBuilder
                         .itemProcessor(itemProcessor())
                         .itemWriter(itemWriter())
                         .inputChannel(incomingRequestsFromMaster())
                         .outputChannel(outgoingRepliesToMaster())
                         .build();
   }

   // Middleware beans setup omitted
}

这个新的注释和构建器负责配置基础架构 beans 的繁重工作。您现在可以在 worker 端轻松配置 master step 以及 Spring Integration 流程。您可以在samples 模块中找到使用这些新 API 的 remote chunking sample,以及Spring Batch Integration章节中的更多详细信息。

就像 remote chunking configuration 简化一样,这个 version 也引入了新的 API 来简化 remote 分区设置:RemotePartitioningMasterStepBuilderRemotePartitioningWorkerStepBuilder。如果@EnableBatchIntegration存在,那么可以在 configuration class 中自动连接,如下面的 example 所示:

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
public class RemotePartitioningAppConfig {

   @Autowired
   private RemotePartitioningMasterStepBuilderFactory masterStepBuilderFactory;

   @Autowired
   private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;

   @Bean
   public Step masterStep() {
            return this.masterStepBuilderFactory
               .get("masterStep")
               .partitioner("workerStep", partitioner())
               .gridSize(10)
               .outputChannel(outgoingRequestsToWorkers())
               .inputChannel(incomingRepliesFromWorkers())
               .build();
   }

   @Bean
   public Step workerStep() {
            return this.workerStepBuilderFactory
               .get("workerStep")
               .inputChannel(incomingRequestsFromMaster())
               .outputChannel(outgoingRepliesToMaster())
               .chunk(100)
               .reader(itemReader())
               .processor(itemProcessor())
               .writer(itemWriter())
               .build();
   }

   // Middleware beans setup omitted
}

您可以在Spring Batch Integration章节中找到有关这些新 API 的更多详细信息。

1.3. JSON 支持

Spring Batch 4.1 添加了对 JSON 格式的支持。此版本引入了一个新的 item reader,它可以按以下格式读取 JSON 资源:

[
  {
    "isin": "123",
    "quantity": 1,
    "price": 1.2,
    "customer": "foo"
  },
  {
    "isin": "456",
    "quantity": 2,
    "price": 1.4,
    "customer": "bar"
  }
]

类似于StaxEventItemReader for XML,新的JsonItemReader使用流 API 来读取块中的 JSON objects。 Spring Batch 支持两个 libraries:

  • Jackson

  • GSON

要添加其他 libraries,可以实现JsonObjectReader接口。

通过JsonFileItemWriter也支持编写 JSON 数据。有关 JSON 支持的更多详细信息,请参阅ItemReaders 和 ItemWriters章节。

1.4. Bean Validation API 支持

此版本带来了一个名为BeanValidatingItemProcessor的新ValidatingItemProcessor implementation,它允许您验证使用 Bean Validation API(JSR-303)annotations 注释的项目。对于 example,给定以下类型Person

class Person {

    @NotEmpty
    private String name;

    public Person(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

}

您可以通过在 application context 中声明BeanValidatingItemProcessor bean 并在 chunk-oriented step 中将其注册为处理器来验证项目:

@Bean
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
        BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
        beanValidatingItemProcessor.setFilter(true);

        return beanValidatingItemProcessor;
}

1.5. JSR-305 支持

此版本增加了对 JSR-305 注释的支持。它利用 Spring Framework 的Null-safety 注释并将它们添加到 Spring Batch 的所有公共 API 上。

这些注释不仅在使用 Spring Batch API 时强制执行 null-safety,而且还可以由 IDE 用于提供与可为空性相关的有用信息。例如,如果用户想要实现ItemReader接口,任何支持 JSR-305 annotations 的 IDE 都将生成如下内容:

public class MyItemReader implements ItemReader<String> {

        @Nullable
        public String read() throws Exception {
                return null;
        }

}

read方法上出现的@Nullable 注释清楚地表明此方法的 contract 表示它可能_ret null。这强制执行 Javadoc 中的说法,当数据源耗尽时,read方法应该_ret 。

1.6. FlatFileItemWriterBuilder 增强功能

此版本中添加的另一个小 feature 是用于编写平面文件的 configuration 的简化。具体来说,这些更新简化了分隔和固定宽度文件的 configuration。以下是更改前后的示例。

// Before
@Bean
public FlatFileItemWriter<Item> itemWriter(Resource resource) {
        BeanWrapperFieldExtractor<Item> fieldExtractor =
            new BeanWrapperFieldExtractor<Item>();
        fieldExtractor.setNames(new String[] {"field1", "field2", "field3"});
        fieldExtractor.afterPropertiesSet();

        DelimitedLineAggregator aggregator = new DelimitedLineAggregator();
        aggregator.setFieldExtractor(fieldExtractor);
        aggregator.setDelimiter(";");

        return new FlatFileItemWriterBuilder<Item>()
                        .name("itemWriter")
                        .resource(resource)
                        .lineAggregator(aggregator)
                        .build();
}

// After
@Bean
public FlatFileItemWriter<Item> itemWriter(Resource resource) {
        return new FlatFileItemWriterBuilder<Item>()
                        .name("itemWriter")
                        .resource(resource)
                        .delimited()
                        .delimiter(";")
                        .names(new String[] {"field1", "field2", "field3"})
                        .build();
}