8. 重复

8.1 RepeatTemplate

批处理是关于重复的操作 - 作为简单的优化,或作为 job 的一部分。为了策略化和概括重复以及提供迭代器 framework 的数量,Spring Batch 具有RepeatOperations接口。 RepeatOperations接口如下所示:

public interface RepeatOperations {

    RepeatStatus iterate(RepeatCallback callback) throws RepeatException;

}

回调是一个简单的接口,允许您插入一些要重复的业务逻辑:

public interface RepeatCallback {

    RepeatStatus doInIteration(RepeatContext context) throws Exception;

}

重复执行回调,直到 implementation 决定迭代应该结束。这些接口中的 return value 是一个枚举,可以是RepeatStatus.CONTINUABLERepeatStatus.FINISHEDRepeatStatus向重复操作的调用者传达关于是否还有其他工作要做的信息。一般来说,RepeatOperations的实现应该检查RepeatStatus并将其用作结束迭代的决定的一部分。任何希望向调用者发出信号表明没有更多工作要做的回调可以 return RepeatStatus.FINISHED

RepeatOperations的最简单的通用 implementation 是RepeatTemplate。它可以像这样使用:

RepeatTemplate template = new RepeatTemplate();

template.setCompletionPolicy(new FixedChunkSizeCompletionPolicy(2));

template.iterate(new RepeatCallback() {

    public ExitStatus doInIteration(RepeatContext context) {
        // Do stuff in batch...
        return ExitStatus.CONTINUABLE;
    }

});

在 example 我们 return RepeatStatus.CONTINUABLE表明还有更多工作要做。如果回调函数想要向调用者发出信号表明没有其他工作要做,那么回调函数也可以回显ExitStatus.FINISHED。一些迭代可以通过在回调中完成的工作固有的考虑来终止,其他迭代实际上是无限循环,就回调而言,完成决策被委托给外部 policy,如上面的情况。

8.1.1 RepeatContext

RepeatCallback的方法参数是RepeatContext。许多回调将简单地忽略 context,但如果需要,它可以用作属性包来在迭代期间存储瞬态数据。返回iterate方法后,context 将不再存在。

如果正在进行嵌套迭代,则RepeatContext将具有 parent context。 parent context 偶尔用于存储需要在 calls 和iterate之间共享的数据。例如,如果要计算迭代中 event 的出现次数并在后续 calls 中记住它,就会出现这种情况。

8.1.2 RepeatStatus

RepeatStatus是 Spring Batch 用于指示处理是否已完成的枚举。这些是可能的RepeatStatus值:

表格 1_.ExitStatus Properties

描述
可持续还有更多工作要做。
FINISHED不应再发生重复。

RepeatStatus值也可以使用RepeatStatus中的and()方法与逻辑 AND 运算组合。这样做的结果是在可持续的 flag 上进行逻辑 AND。换句话说,如果任一状态为FINISHED,则结果为FINISHED

8.2 完成政策

RepeatTemplate内,iterate方法中循环的终止由CompletionPolicy确定,CompletionPolicy也是RepeatContext的工厂。 RepeatTemplate有责任使用当前的 policy 创建RepeatContext并在迭代的每个阶段将其传递给RepeatCallback。回调完成doInIteration后,RepeatTemplate必须调用CompletionPolicy,要求它更新 state(将存储在RepeatContext中)。然后,如果迭代完成,它会询问 policy。

Spring Batch 提供了CompletionPolicy的一些简单的通用 implementations。 SimpleCompletionPolicy只允许执行最多固定次数(RepeatStatus.FINISHED强制在任何 time 时间提前完成)。

用户可能需要实现自己的完成 policies 以进行更复杂的决策。例如,一个批处理窗口阻止批处理作业在使用在线系统后执行,这需要一个自定义 policy。

8.3 Exception 处理

如果在RepeatCallback中抛出了 exception,则RepeatTemplate会查询ExceptionHandler,这可以决定是否 exception。

public interface ExceptionHandler {

    void handleException(RepeatContext context, Throwable throwable)
        throws RuntimeException;

}

common 用例是计算给定类型的 exceptions 的数量,并在达到限制时失败。为此,Spring Batch 提供了SimpleLimitExceptionHandler和稍微灵活的RethrowOnThresholdExceptionHandlerSimpleLimitExceptionHandler有一个限制 property 和一个应该与当前 exception 进行比较的 exception 类型 - 所有类型的子类也被计算在内。 在达到限制之前,将忽略给定类型的异常,然后重新抛出。其他类型的那些总是被重新抛出。

SimpleLimitExceptionHandler的一个重要的可选 property 是 boolean flag useParent。默认情况下为 false,因此仅在当前RepeatContext中考虑限制。当设置为 true 时,限制将在嵌套迭代中保持在兄弟上下文中(e.g .step 中的一组块)。

8.4 Listeners

通常,能够在多个不同的迭代中接收针对交叉切割问题的额外回调是有用的。为此,Spring Batch 提供了RepeatListener接口。 RepeatTemplate允许用户注册RepeatListener,并且它们将在迭代期间使用RepeatContextRepeatStatus进行回调。

界面如下所示:

public interface RepeatListener {
    void before(RepeatContext context);
    void after(RepeatContext context, RepeatStatus result);
    void open(RepeatContext context);
    void onError(RepeatContext context, Throwable e);
    void close(RepeatContext context);
}

openclose回调在整个迭代之前和之后。 beforeafteronError适用于单个 RepeatCallback calls。

请注意,当有多个 listener 时,它们位于列表中,因此存在 order。在这种情况下,openbefore在同一 order 中被调用,而afteronErrorclose在 reverse order 中被调用。

8.5 并行处理

的实现不限于顺序执行回调。一些 implementations 能够在 parallel 中执行它们的回调非常重要。为此,Spring Batch 提供TaskExecutorRepeatTemplate,它使用 Spring TaskExecutor策略来运行RepeatCallback。默认是使用SynchronousTaskExecutor,它具有在同一个线程中执行整个迭代的效果(与普通RepeatTemplate相同)。

8.6 声明性迭代

有时会有一些业务处理,你知道你想要在每次发生时重复这些业务处理。这样的经典示例是消息管道的优化 - 如果频繁到达,则处理一批消息比处理每个消息的单独 transaction 的成本更有效。 Spring Batch 提供了一个 AOP 拦截器,它为了这个目的而在RepeatOperations中包装一个方法调用。 RepeatOperationsInterceptor执行截获的方法,并根据提供的RepeatTemplate中的CompletionPolicy重复。

下面是使用 Spring AOP 命名空间重复对名为processMessage的方法的服务调用的声明性迭代的示例(有关如何配置 AOP 拦截器的详细信息,请参阅 Spring 用户指南):

<aop:config>
    <aop:pointcut id="transactional"
        expression="execution(* com..*Service.processMessage(..))" />
    <aop:advisor pointcut-ref="transactional"
        advice-ref="retryAdvice" order="-1"/>
</aop:config>

<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>

上面的 example 在拦截器中使用默认的RepeatTemplate。要更改 policies,listeners 等,您只需要将RepeatTemplate的实例注入拦截器。

如果截获的方法返回void,则拦截器总是返回 ExitStatus.CONTINUABLE(因此如果CompletionPolicy没有有限的终点,则存在无限循环的危险)。否则返回ExitStatus.CONTINUABLE,直到截取方法的 return value 为 null,此时返回ExitStatus.FINISHED。因此,目标方法中的业务逻辑可以通过返回null,或者通过在提供的RepeatTemplate中抛出ExceptionHandler的来表示没有更多的工作要做。

Updated at: 9 months ago
7.4.3. 将输入数据绑定到步骤Table of content9. 重试