1. Common Batch Patterns


XML

Java

某些批处理作业可以完全由 Spring Batch 中的 off-the-shelf 组件组装。例如,ItemReaderItemWriter_mplempleations 可以配置为涵盖各种场景。但是,对于大多数情况,必须编写自定义 code。 application 开发人员的主要 API 入口点是TaskletItemReaderItemWriter和各种 listener 接口。大多数简单的批处理作业可以使用来自 Spring Batch ItemReader的 off-the-shelf 输入,但通常情况是处理和编写中存在需要开发人员实现ItemWriterItemProcessor的自定义问题。

在本章中,我们提供了一些自定义业务逻辑中的 common 模式示例。这些示例主要 feature listener 接口。应该注意,如果合适,ItemReaderItemWriter也可以实现 listener 接口。

1.1. Logging Item 处理和失败

一个 common 用例是需要特殊处理 step,item item 中的错误,可能 logging 到特殊 channel 或将 record 插入数据库。 chunk-oriented Step(从 step 工厂 beans 创建)允许用户在read上使用简单的ItemReadListener表示错误,在write上使用ItemWriteListener表示错误。以下 code 代码段演示了一个记录读写失败的 listener:

public class ItemFailureLoggerListener extends ItemListenerSupport {

    private static Log logger = LogFactory.getLog("item.error");

    public void onReadError(Exception ex) {
        logger.error("Encountered error on read", e);
    }

    public void onWriteError(Exception ex, List<? extends Object> items) {
        logger.error("Encountered error on write", ex);
    }
}

实现了这个 listener 之后,它必须使用 step 注册,如下面的示例所示:

XML Configuration

<step id="simpleStep">
...
<listeners>
    <listener>
        <bean class="org.example...ItemFailureLoggerListener"/>
    </listener>
</listeners>
</step>

Java Configuration

@Bean
public Step simpleStep() {
        return this.stepBuilderFactory.get("simpleStep")
                                ...
                                .listener(new ItemFailureLoggerListener())
                                .build();
}

如果 listener 在onError()方法中执行任何操作,则它必须位于将要回滚的 transaction 中。如果需要在onError()方法中使用 transactional 资源(如数据库),请考虑在该方法中添加声明性 transaction(有关详细信息,请参阅 Spring Core Reference Guide),并为其传播属性赋予REQUIRES_NEW的值。

1.2. 出于商业原因手动停止 Job

Spring Batch 通过JobLauncher接口提供stop()方法,但这实际上是由 operator 而不是 application 程序员使用的。有时,从业务逻辑中停止 job 执行更方便或更有意义。

最简单的方法是抛出一个RuntimeException(既不能无限期重试也不能跳过)。对于 example,可以使用自定义 exception 类型,如下面的示例所示:

public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {

    @Override
    public T process(T item) throws Exception {
        if (isPoisonPill(item)) {
            throw new PoisonPillException("Poison pill detected: " + item);
        }
        return item;
    }
}

阻止 step 执行的另一种简单方法是从ItemReader return null,如下面的 example 所示:

public class EarlyCompletionItemReader implements ItemReader<T> {

    private ItemReader<T> delegate;

    public void setDelegate(ItemReader<T> delegate) { ... }

    public T read() throws Exception {
        T item = delegate.read();
        if (isEndItem(item)) {
            return null; // end the step here
        }
        return item;
    }

}

之前的 example 实际上依赖于CompletionPolicy策略的默认 implementation,当要处理的 item 为null时,它表示完整的批处理。可以实现更复杂的完成 policy 并通过SimpleStepFactoryBean注入Step,如下面的示例所示:

XML Configuration

<step id="simpleStep">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"
               chunk-completion-policy="completionPolicy"/>
    </tasklet>
</step>

<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>

Java Configuration

@Bean
public Step simpleStep() {
        return this.stepBuilderFactory.get("simpleStep")
                                .<String, String>chunk(new SpecialCompletionPolicy())
                                .reader(reader())
                                .writer(writer())
                                .build();
}

另一种方法是在StepExecution中设置一个 flag,在 item 处理之间由 framework 中的Step __mplementations 检查。要实现这个替代方案,我们需要访问当前的StepExecution,这可以通过实现StepListener并使用Step注册来实现。以下 example 显示_setistener _set flag:

public class CustomItemWriter extends ItemListenerSupport implements StepListener {

    private StepExecution stepExecution;

    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }

    public void afterRead(Object item) {
        if (isPoisonPill(item)) {
            stepExecution.setTerminateOnly(true);
       }
    }

}

设置 flag 时,默认行为是 step 抛出JobInterruptedException。可以通过StepInterruptionPolicy控制此行为。但是,唯一的选择是抛出或不抛出 exception,因此这始终是 job 的异常结束。

1.3. 添加页脚 Record

通常,在写入平 files 时,必须在完成所有处理之后将“页脚”记录附加到文件的末尾。这可以使用 Spring Batch 提供的FlatFileFooterCallback接口来实现。 FlatFileFooterCallback(及其对应的FlatFileHeaderCallback)是FlatFileItemWriter的可选 properties,可以添加到 item writer 中,如下面的 example 所示:

XML Configuration

<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator" ref="lineAggregator"/>
    <property name="headerCallback" ref="headerCallback" />
    <property name="footerCallback" ref="footerCallback" />
</bean>

Java Configuration

@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
        return new FlatFileItemWriterBuilder<String>()
                        .name("itemWriter")
                        .resource(outputResource)
                        .lineAggregator(lineAggregator())
                        .headerCallback(headerCallback())
                        .footerCallback(footerCallback())
                        .build();
}

页脚回调接口只有一个在必须写入页脚时调用的方法,如以下接口定义所示:

public interface FlatFileFooterCallback {

    void writeFooter(Writer writer) throws IOException;

}

1.3.1. 编写摘要页脚

涉及页脚记录的 common 要求是在输出 process 期间聚合信息,并将此信息附加到文件的末尾。此页脚通常用作文件的摘要或提供校验和。

例如,如果批 job 正在将Trade记录写入平面文件,并且要求所有Trades的总金额放在页脚中,则可以使用以下ItemWriter implementation:

public class TradeItemWriter implements ItemWriter<Trade>,
                                        FlatFileFooterCallback {

    private ItemWriter<Trade> delegate;

    private BigDecimal totalAmount = BigDecimal.ZERO;

    public void write(List<? extends Trade> items) throws Exception {
        BigDecimal chunkTotal = BigDecimal.ZERO;
        for (Trade trade : items) {
            chunkTotal = chunkTotal.add(trade.getAmount());
        }

        delegate.write(items);

        // After successfully writing all items
        totalAmount = totalAmount.add(chunkTotal);
    }

    public void writeFooter(Writer writer) throws IOException {
        writer.write("Total Amount Processed: " + totalAmount);
    }

    public void setDelegate(ItemWriter delegate) {...}
}

TradeItemWriter 存储一个totalAmount value,随着每个Trade item 写入amount而增加。在处理完最后一个Trade之后,framework calls writeFootertotalAmount放入文件中。请注意,write方法使用临时变量chunkTotal,该变量存储块中Trade金额的总和。这样做是为了确保如果在write方法中发生跳过,则totalAmount保持不变。只有在write方法结束时,一旦我们确保没有抛出 exceptions,我们就会更新totalAmount

在要调用writeFooter方法的 order 中,TradeItemWriter(实现FlatFileFooterCallback)必须作为footerCallback连接到FlatFileItemWriter。以下 example 显示了如何执行此操作:

XML Configuration

<bean id="tradeItemWriter" class="..TradeItemWriter">
    <property name="delegate" ref="flatFileItemWriter" />
</bean>

<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
   <property name="resource" ref="outputResource" />
   <property name="lineAggregator" ref="lineAggregator"/>
   <property name="footerCallback" ref="tradeItemWriter" />
</bean>

Java Configuration

@Bean
public TradeItemWriter tradeItemWriter() {
        TradeItemWriter itemWriter = new TradeItemWriter();

        itemWriter.setDelegate(flatFileItemWriter(null));

        return itemWriter;
}

@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
        return new FlatFileItemWriterBuilder<String>()
                        .name("itemWriter")
                        .resource(outputResource)
                        .lineAggregator(lineAggregator())
                        .footerCallback(tradeItemWriter())
                        .build();
}

到目前为止,TradeItemWriter的编写方式只有在Step不可重启时才能正常工作。这是因为 class 是有状态的(因为它存储totalAmount),但是totalAmount不会持久保存到数据库。因此,无法在重新启动的 event 中检索它。为了使 class 可以重新启动,ItemStream接口应该与方法openupdate一起实现,如下面的 example 所示:

public void open(ExecutionContext executionContext) {
    if (executionContext.containsKey("total.amount") {
        totalAmount = (BigDecimal) executionContext.get("total.amount");
    }
}

public void update(ExecutionContext executionContext) {
    executionContext.put("total.amount", totalAmount);
}

更新方法在 object 持久保存到数据库之前,将totalAmount的最新 version 存储到ExecutionContext。 open 方法从ExecutionContext中检索任何现有的totalAmount,并将其用作处理的起始点,允许TradeItemWriter在重新启动时从上一次运行。

1.4. 基于驱动查询的 ItemReaders

在关于 readers 和 writers 的章节中,讨论了使用分页的数据库输入。许多数据库供应商(例如 DB2)具有非常悲观的锁定策略,如果读取的 table 也需要由 online application 的其他部分使用,则可能会导致问题。此外,在非常大的数据集上打开游标可能会导致某些供应商的数据库出现问题。因此,许多项目更喜欢使用“驾驶查询”方法来读取数据。这种方法通过迭代键而不是需要返回的整个 object 来工作,如下图所示:

图 1.驾驶查询 Job

如您所见,上图中显示的 example 使用与 cursor-based example 中使用的相同的'FOO'table。但是,不是选择整行,而是在 SQL 语句中仅选择了 ID。因此,不是从read返回FOO object,而是返回Integer。然后可以使用此数字查询“详细信息”,这是一个完整的Foo object,如下图所示:

图 2.驱动查询 Example

应该使用ItemProcessor将从驱动查询获得的 key 转换为完整的'Foo'object。现有的 DAO 可用于根据 key 查询完整的 object。

1.5. Multi-Line 记录

虽然通常情况下使用平 files,每个 record 都被限制在一个 line 中,但是文件通常是一个文件可能包含跨越多个 lines 的记录,并且有多种格式。以下摘录自文件显示了这种安排的一个例子:

HEA;0013100345;2007-02-15
NCU;Smith;Peter;;T;20014539;F
BAD;;Oak Street 31/A;;Small Town;00235;IL;US
FOT;2;2;267.34

以“HEA”开头的 line 和以“FOT”开头的 line 之间的所有内容都被视为一个 record。必须在 order 中进行一些注意事项才能正确处理这种情况:

  • 不必在 time 读取一个 record,而是必须将 multi-line record 的每个 line 读作 group,以便它可以完整地传递给ItemWriter

  • 每个 line 类型可能需要以不同方式进行标记。

因为单个 record spans 多个 lines,并且因为我们可能不知道有多少 lines,所以ItemReader必须小心,始终读取整个 record。为了做到这一点,自定义ItemReader应该被实现为FlatFileItemReader的 wrapper,如下面的 example 所示:

XML Configuration

<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
    <property name="delegate">
        <bean class="org.springframework.batch.item.file.FlatFileItemReader">
            <property name="resource" value="data/iosample/input/multiLine.txt" />
            <property name="lineMapper">
                <bean class="org.spr...DefaultLineMapper">
                    <property name="lineTokenizer" ref="orderFileTokenizer"/>
                    <property name="fieldSetMapper" ref="orderFieldSetMapper"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

Java Configuration

@Bean
public MultiLineTradeItemReader itemReader() {
        MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();

        itemReader.setDelegate(flatFileItemReader());

        return itemReader;
}

@Bean
public FlatFileItemReader flatFileItemReader() {
        FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<Trade>()
                        .name("flatFileItemReader")
                        .resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
                        .lineTokenizer(orderFileTokenizer())
                        .fieldSetMapper(orderFieldSetMapper())
                        .build();
        return reader;
}

为了确保正确地标记每个 line,这对于 fixed-length 输入尤为重要,PatternMatchingCompositeLineTokenizer可以在委托FlatFileItemReader上使用。有关详细信息,请参阅Readers 和 Writers 章节中的 FlatFileItemReader。然后委托 reader 使用PassThroughFieldSetMapper为每个 line 传递FieldSet回到包装ItemReader,如下面的 example 所示:

XML 内容

<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
    <property name="tokenizers">
        <map>
            <entry key="HEA*" value-ref="headerRecordTokenizer" />
            <entry key="FOT*" value-ref="footerRecordTokenizer" />
            <entry key="NCU*" value-ref="customerLineTokenizer" />
            <entry key="BAD*" value-ref="billingAddressLineTokenizer" />
        </map>
    </property>
</bean>

Java 内容

@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
        PatternMatchingCompositeLineTokenizer tokenizer =
                        new PatternMatchingCompositeLineTokenizer();

        Map<String, LineTokenizer> tokenizers = new HashMap<>(4);

        tokenizers.put("HEA*", headerRecordTokenizer());
        tokenizers.put("FOT*", footerRecordTokenizer());
        tokenizers.put("NCU*", customerLineTokenizer());
        tokenizers.put("BAD*", billingAddressLineTokenizer());

        tokenizer.setTokenizers(tokenizers);

        return tokenizer;
}

这个 wrapper 必须能够识别 record 的结尾,以便它可以在其委托上不断调用read(),直到到达结尾。对于每个读取的 line,wrapper 应该_build item 返回。到达页脚后,可以返回 item 以传递到ItemProcessorItemWriter,如下面的示例所示:

private FlatFileItemReader<FieldSet> delegate;

public Trade read() throws Exception {
    Trade t = null;

    for (FieldSet line = null; (line = this.delegate.read()) != null;) {
        String prefix = line.readString(0);
        if (prefix.equals("HEA")) {
            t = new Trade(); // Record must start with header
        }
        else if (prefix.equals("NCU")) {
            Assert.notNull(t, "No header was found.");
            t.setLast(line.readString(1));
            t.setFirst(line.readString(2));
            ...
        }
        else if (prefix.equals("BAD")) {
            Assert.notNull(t, "No header was found.");
            t.setCity(line.readString(4));
            t.setState(line.readString(6));
          ...
        }
        else if (prefix.equals("FOT")) {
            return t; // Record must end with footer
        }
    }
    Assert.isNull(t, "No 'END' was found.");
    return null;
}

1.6. 执行系统命令

许多批处理作业都要求从批处理 job 中调用外部命令。这样的 process 可以由调度程序单独启动,但是关于 run 的 common 元数据的优点将会丢失。此外,multi-step job 也需要分成多个作业。

因为需要如此 common,Spring Batch 为调用系统命令提供了Tasklet implementation,如下面的示例所示:

XML Configuration

<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
    <property name="command" value="echo hello" />
    <!-- 5 second timeout for the command to complete -->
    <property name="timeout" value="5000" />
</bean>

Java Configuration

@Bean
public SystemCommandTasklet tasklet() {
        SystemCommandTasklet tasklet = new SystemCommandTasklet();

        tasklet.setCommand("echo hello");
        tasklet.setTimeout(5000);

        return tasklet;
}

1.7. 处理 Step 完成时没有找到输入

在许多批处理方案中,在数据库或文件中找不到要处理的行也不例外。简单地认为Step没有找到工作并且完成了 0 项读取。 Spring Batch 中开箱即用的所有ItemReader __mplement 都默认为这种方法。如果即使存在输入也没有写出任何内容(如果文件名称错误或出现类似问题,通常会发生这种情况),这可能会导致一些混淆。出于这个原因,应该检查元数据本身以确定 framework 被处理的工作量。但是,如果没有输入被视为特殊情况怎么办?在这种情况下,以编程方式检查元数据中没有处理的项目并导致失败是最佳解决方案。因为这是一个 common 用例,Spring Batch 提供了一个完全具有此功能的 listener,如NoWorkFoundStepExecutionListener的 class 定义所示:

public class NoWorkFoundStepExecutionListener extends StepExecutionListenerSupport {

    public ExitStatus afterStep(StepExecution stepExecution) {
        if (stepExecution.getReadCount() == 0) {
            return ExitStatus.FAILED;
        }
        return null;
    }

}

前面的StepExecutionListener在'afterStep'阶段检查StepExecutionreadCount property,以确定是否没有读取任何项目。如果是这种情况,则返回 FAILED 的退出 code,表示Step应该失败。否则,返回null,这不会影响Step的状态。

1.8. 将数据传递给未来的步骤

将信息从一个 step 传递到另一个 step 通常很有用。这可以通过ExecutionContext来完成。问题是有两个ExecutionContexts:一个在Step level,一个在Job level。 Step ExecutionContext仍然只是 step 的 long,而Job ExecutionContext仍然贯穿整个Job。另一方面,Step ExecutionContext每更新Step提交一个块,而Job ExecutionContext仅在每个Step结束时更新。

这种分离的结果是所有数据必须在Step执行时放在Step ExecutionContext中。这样做可确保在Step运行时正确存储数据。如果数据存储到Job ExecutionContext,则在Step执行期间不会保留数据。如果Step失败,则该数据将丢失。

public class SavingItemWriter implements ItemWriter<Object> {
    private StepExecution stepExecution;

    public void write(List<? extends Object> items) throws Exception {
        // ...

        ExecutionContext stepContext = this.stepExecution.getExecutionContext();
        stepContext.put("someKey", someObject);
    }

    @BeforeStep
    public void saveStepExecution(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }
}

为了使数据可用于将来Steps,必须在 step 完成后将其“提升”为Job ExecutionContext。 Spring Batch 为此提供了ExecutionContextPromotionListener。必须使用与必须提升的ExecutionContext中的数据相关的键配置 listener。它也可以选择配置一个应该进行促销的退出 code 模式列表(COMPLETED是默认值)。与所有 listeners 一样,它必须在Step上注册,如下面的示例所示:

XML Configuration

<job id="job1">
    <step id="step1">
        <tasklet>
            <chunk reader="reader" writer="savingWriter" commit-interval="10"/>
        </tasklet>
        <listeners>
            <listener ref="promotionListener"/>
        </listeners>
    </step>

    <step id="step2">
       ...
    </step>
</job>

<beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
    <beans:property name="keys">
        <list>
            <value>someKey</value>
        </list>
    </beans:property>
</beans:bean>

Java Configuration

@Bean
public Job job1() {
        return this.jobBuilderFactory.get("job1")
                                .start(step1())
                                .next(step1())
                                .build();
}

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(10)
                                .reader(reader())
                                .writer(savingWriter())
                                .listener(promotionListener())
                                .build();
}

@Bean
public ExecutionContextPromotionListener promotionListener() {
        ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();

        listener.setKeys(new String[] {"someKey" });

        return listener;
}

最后,必须从Job ExecutionContext中检索保存的值,如下面的示例所示:

public class RetrievingItemWriter implements ItemWriter<Object> {
    private Object someObject;

    public void write(List<? extends Object> items) throws Exception {
        // ...
    }

    @BeforeStep
    public void retrieveInterstepData(StepExecution stepExecution) {
        JobExecution jobExecution = stepExecution.getJobExecution();
        ExecutionContext jobContext = jobExecution.getExecutionContext();
        this.someObject = jobContext.get("someKey");
    }
}