1. 重复


XML

Java

1.1. RepeatTemplate

批处理是关于重复操作,可以是简单优化,也可以是 job 的一部分。为了策略化和概括重复并提供迭代器 framework 的数量,Spring Batch 具有RepeatOperations接口。 RepeatOperations接口具有以下定义:

public interface RepeatOperations {

    RepeatStatus iterate(RepeatCallback callback) throws RepeatException;

}

回调是一个接口,如以下定义所示,它允许您插入一些要重复的业务逻辑:

public interface RepeatCallback {

    RepeatStatus doInIteration(RepeatContext context) throws Exception;

}

重复执行回调,直到 implementation 确定迭代应该结束。这些接口中的 return value 是一个枚举,可以是RepeatStatus.CONTINUABLERepeatStatus.FINISHEDRepeatStatus枚举向重复操作的调用者传达关于是否还有其他工作要做的信息。一般来说,RepeatOperations的实现应该检查RepeatStatus并将其用作结束迭代的决定的一部分。任何希望向调用者发出信号表明没有更多工作要做的回调可以 return RepeatStatus.FINISHED

RepeatOperations的最简单的通用 implementation 是RepeatTemplate,如下面的示例所示:

RepeatTemplate template = new RepeatTemplate();

template.setCompletionPolicy(new SimpleCompletionPolicy(2));

template.iterate(new RepeatCallback() {

    public RepeatStatus doInIteration(RepeatContext context) {
        // Do stuff in batch...
        return RepeatStatus.CONTINUABLE;
    }

});

在上面的例子中,我们 return RepeatStatus.CONTINUABLE,表明还有更多的工作要做。回调也可以_ret ,向呼叫者发出信号,表示没有其他工作要做。一些迭代可以通过在回调中完成的工作固有的考虑来终止。就回调而言,其他的实际上是无限循环,并且完成决策被委托给外部 policy,如前面的 example 中所示的情况。

1.1.1. RepeatContext

RepeatCallback的方法参数是RepeatContext。许多回调忽略了 context。但是,如果需要,它可以用作属性包,以便在迭代期间存储瞬态数据。返回iterate方法后,context 不再存在。

如果正在进行嵌套迭代,则RepeatContext具有 parent context。 parent context 偶尔用于存储需要在 calls 和iterate之间共享的数据。例如,如果您想计算迭代中 event 的出现次数并在后续的 calls 中记住它,就是这种情况。

1.1.2. RepeatStatus

RepeatStatus是 Spring Batch 用于指示处理是否已完成的枚举。它有两个可能的RepeatStatus值,如下面的 table 所述:

描述
可持续还有更多工作要做。
FINISHED不应再发生重复。

通过使用RepeatStatus中的and()方法,RepeatStatus值也可以与逻辑 AND 运算组合。这样做的结果是在可持续的 flag 上进行逻辑 AND。换句话说,如果任一状态为FINISHED,则结果为FINISHED

1.2. 完成 Policies

RepeatTemplate内,iterate方法中循环的终止由CompletionPolicy确定,CompletionPolicy也是RepeatContext的工厂。 RepeatTemplate有责任使用当前的 policy 创建RepeatContext并在迭代的每个阶段将其传递给RepeatCallback。回调完成doInIteration后,RepeatTemplate必须调用CompletionPolicy,要求它更新 state(将存储在RepeatContext中)。然后,如果迭代完成,它会询问 policy。

Spring Batch 提供了CompletionPolicy的一些简单的通用 implementations。 SimpleCompletionPolicy允许执行最多固定次数(RepeatStatus.FINISHED强制在任何 time 时间提前完成)。

用户可能需要实现自己的完成 policies 以进行更复杂的决策。例如,一个批处理窗口阻止批处理作业在使用在线系统后执行,这需要一个自定义 policy。

1.3. 异常处理

如果在RepeatCallback中抛出 exception,则RepeatTemplate会查询ExceptionHandler,这可以决定是否 exception。

以下清单显示了ExceptionHandler接口定义:

public interface ExceptionHandler {

    void handleException(RepeatContext context, Throwable throwable)
        throws Throwable;

}

common 用例是计算给定类型的 exceptions 的数量,并在达到限制时失败。为此,Spring Batch 提供SimpleLimitExceptionHandler和稍微灵活的RethrowOnThresholdExceptionHandlerSimpleLimitExceptionHandler有一个限制 property 和一个 exception 类型,应该与当前的 exception 进行比较。还计算所提供类型的所有子类。 在达到限制之前,将忽略给定类型的异常,然后重新抛出它们。 总是重新抛出其他类型的异常。

SimpleLimitExceptionHandler的一个重要的可选 property 是 boolean flag,称为useParent。它默认为false,因此限制仅在当前RepeatContext中计算。设置为true时,在嵌套迭代中(例如 step 中的一组块),跨越同级上下文保持限制。

1.4. 听众

通常,能够在多个不同的迭代中接收 cross-cutting 关注点的附加回调是有用的。为此,Spring Batch 提供RepeatListener接口。 RepeatTemplate允许用户注册RepeatListener __mplement,并且在迭代期间可以使用RepeatContextRepeatStatus给出回调。

RepeatListener接口具有以下定义:

public interface RepeatListener {
    void before(RepeatContext context);
    void after(RepeatContext context, RepeatStatus result);
    void open(RepeatContext context);
    void onError(RepeatContext context, Throwable e);
    void close(RepeatContext context);
}

openclose回调在整个迭代之前和之后。 beforeafteronError适用于个人RepeatCallback calls。

请注意,当有多个 listener 时,它们位于列表中,因此存在 order。在这种情况下,openbefore在同一 order 中被调用,而afteronErrorclose在 reverse order 中被调用。

1.5. 并行处理

的实现不限于顺序执行回调。一些 implementations 能够在 parallel 中执行它们的回调非常重要。为此,Spring Batch 提供TaskExecutorRepeatTemplate,它使用 Spring TaskExecutor策略来运行RepeatCallback。默认是使用SynchronousTaskExecutor,它具有在同一个线程中执行整个迭代的效果(与普通RepeatTemplate相同)。

1.6. 声明性迭代

有时会有一些业务处理,你知道你想要在每次发生时重复这些业务处理。经典的例子是消息管道的优化。如果频繁到达,则处理一批消息比处理每条消息的单独 transaction 的成本更有效。 Spring Batch 提供了一个 AOP 拦截器,它为RepeatOperations object 包装一个方法调用。 RepeatOperationsInterceptor执行截获的方法,并根据提供的RepeatTemplate中的CompletionPolicy重复。

以下 example 显示了使用 Spring AOP 命名空间重复对名为processMessage的方法的服务调用的声明性迭代(有关如何配置 AOP 拦截器的更多详细信息,请参阅 Spring 用户指南):

<aop:config>
    <aop:pointcut id="transactional"
        expression="execution(* com..*Service.processMessage(..))" />
    <aop:advisor pointcut-ref="transactional"
        advice-ref="retryAdvice" order="-1"/>
</aop:config>

<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>

以下 example 演示了如何使用 java configuration 重复对名为processMessage的方法的服务调用(有关如何配置 AOP 拦截器的更多详细信息,请参阅 Spring 用户指南):

@Bean
public MyService myService() {
        ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
        factory.setInterfaces(MyService.class);
        factory.setTarget(new MyService());

        MyService service = (MyService) factory.getProxy();
        JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
        pointcut.setPatterns(".*processMessage.*");

        RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();

        ((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));

        return service;
}

前面的 example 在拦截器中使用默认的RepeatTemplate。要更改 policies,listeners 和其他详细信息,可以__ject 一个RepeatTemplate的实例进入拦截器。

如果截获的方法返回void,则拦截器总是返回RepeatStatus.CONTINUABLE(因此如果CompletionPolicy没有有限的终点,则存在无限循环的危险)。否则,它返回RepeatStatus.CONTINUABLE,直到截取方法的 return value 为null,此时它返回RepeatStatus.FINISHED。因此,目标方法中的业务逻辑可以通过返回null或通过在提供的RepeatTemplate中抛出ExceptionHandler 的 exception 来表示没有更多的工作要做。