1. Repeat
1.1. RepeatTemplate
批处理与重复操作有关,可以作为简单的优化或作为工作的一部分。为了对重复进行策略化和通用化,并提供相当于迭代器框架的内容,Spring Batch 具有RepeatOperations
接口。 RepeatOperations
接口具有以下定义:
public interface RepeatOperations {
RepeatStatus iterate(RepeatCallback callback) throws RepeatException;
}
回调是一个接口,如以下定义所示,该接口使您可以插入一些要重复的业务逻辑:
public interface RepeatCallback {
RepeatStatus doInIteration(RepeatContext context) throws Exception;
}
回调将重复执行,直到实现确定迭代应结束为止。这些接口中的返回值是一个枚举,可以是RepeatStatus.CONTINUABLE
或RepeatStatus.FINISHED
。 RepeatStatus
枚举将有关是否还有更多工作要做的信息传达给重复操作的调用方。一般来说,RepeatOperations
的实现应检查RepeatStatus
并将其用作结束迭代的决定的一部分。任何希望向调用者发送 signal 的通知,它们都可以返回RepeatStatus.FINISHED
。
RepeatOperations
最简单的通用实现是RepeatTemplate
,如以下示例所示:
RepeatTemplate template = new RepeatTemplate();
template.setCompletionPolicy(new SimpleCompletionPolicy(2));
template.iterate(new RepeatCallback() {
public RepeatStatus doInIteration(RepeatContext context) {
// Do stuff in batch...
return RepeatStatus.CONTINUABLE;
}
});
在前面的示例中,我们返回RepeatStatus.CONTINUABLE
,以表明还有更多工作要做。回调函数还可以返回RepeatStatus.FINISHED
,以向调用者发出 signal,表明没有其他工作要做。某些迭代可以通过回调中完成的工作固有的考虑来终止。就上述回调而言,其他有效地是无限循环,并将完成决策委派给外部策略,如上例所示。
1.1.1. RepeatContext
RepeatCallback
的方法参数是RepeatContext
。许多回调忽略上下文。但是,如有必要,可以将其用作属性包,以在迭代过程中存储瞬态数据。 iterate
方法返回后,上下文不再存在。
如果正在进行嵌套的迭代,则RepeatContext
具有父上下文。父上下文有时对于存储需要在iterate
的调用之间共享的数据很有用。例如,如果要计算迭代中事件的发生次数并在后续调用中记住该事件,就属于这种情况。
1.1.2. RepeatStatus
RepeatStatus
是 Spring Batch 用来表示处理是否完成的枚举。它有两个可能的RepeatStatus
值,如下表所示:
表 1. RepeatStatus 属性
Value | Description |
CONTINUABLE | 还有更多工作要做。 |
FINISHED | 不再重复。 |
也可以使用RepeatStatus
中的and()
方法将RepeatStatus
值与逻辑 AND 运算结合。这样做的结果是对可持续标志进行逻辑与。换句话说,如果任一状态为FINISHED
,则结果为FINISHED
。
1.2.完成 Policy
在RepeatTemplate
内部,iterate
方法中循环的终止由CompletionPolicy
确定,CompletionPolicy
也是RepeatContext
的工厂。 RepeatTemplate
负责使用当前策略来创建RepeatContext
并将其在迭代的每个阶段传递给RepeatCallback
。回调完成其doInIteration
之后,RepeatTemplate
必须调用CompletionPolicy
以要求其更新其状态(将存储在RepeatContext
中)。然后,它询问策略迭代是否完成。
Spring Batch 提供了CompletionPolicy
的一些简单通用实现。 SimpleCompletionPolicy
允许执行多达固定次数(RepeatStatus.FINISHED
随时强制提前完成)。
用户可能需要实施自己的完成策略以做出更复杂的决定。例如,一个阻止批处理作业在联机系统正在使用时执行的批处理窗口将需要自定义策略。
1.3.异常处理
如果RepeatCallback
内抛出异常,则RepeatTemplate
咨询ExceptionHandler
,后者可以决定是否重新抛出该异常。
以下 Lists 显示了ExceptionHandler
接口定义:
public interface ExceptionHandler {
void handleException(RepeatContext context, Throwable throwable)
throws Throwable;
}
一个常见的用例是计算给定类型的异常数,并在达到限制时失败。为此,Spring Batch 提供了SimpleLimitExceptionHandler
和更灵活的RethrowOnThresholdExceptionHandler
。 SimpleLimitExceptionHandler
具有 limit 属性和应与当前异常进行比较的异常类型。提供类型的所有子类也将被计数。给定类型的异常将被忽略,直到达到限制,然后将其重新抛出。总是会抛出其他类型的异常。
SimpleLimitExceptionHandler
的一个重要的可选属性是称为useParent
的布尔标志。默认情况下为false
,因此限制仅在当前RepeatContext
中考虑。当设置为true
时,限制将在嵌套迭代中跨兄弟上下文保留(例如,步骤中的一组块)。
1.4. Listeners
通常,能够接收跨多个不同迭代的跨领域关注点的其他回调是很有用的。为此,Spring Batch 提供了RepeatListener
接口。 RepeatTemplate
允许用户注册RepeatListener
实现,并且在迭代过程中为他们提供RepeatContext
和RepeatStatus
的回调。
RepeatListener
接口具有以下定义:
public interface RepeatListener {
void before(RepeatContext context);
void after(RepeatContext context, RepeatStatus result);
void open(RepeatContext context);
void onError(RepeatContext context, Throwable e);
void close(RepeatContext context);
}
open
和close
回调在整个迭代之前和之后出现。 before
,after
和onError
适用于各个RepeatCallback
调用。
请注意,当侦听器不止一个时,它们在列表中,因此有一个 Sequences。在这种情况下,open
和before
的调用 Sequences 相同,而after
,onError
和close
的调用 Sequences 相反。
1.5.并行处理
RepeatOperations
的实现不限于 Sequences 执行回调。某些实现能够并行执行其回调非常重要。为此,Spring Batch 提供了TaskExecutorRepeatTemplate
,它使用 Spring TaskExecutor
策略来运行RepeatCallback
。默认是使用SynchronousTaskExecutor
,它具有在同一线程中执行整个迭代的效果(与普通RepeatTemplate
相同)。
1.6.声明式迭代
有时,您知道某些业务处理需要在每次发生时重复进行。典型的例子是消息管道的优化。如果要处理一批消息(如果它们经常到达),则比承担每条消息的单独事务处理的成本更为有效。 Spring Batch 为此提供了一个 AOP 拦截器,该拦截器将方法调用包装在RepeatOperations
对象中。 RepeatOperationsInterceptor
执行拦截的方法,并根据提供的RepeatTemplate
中的CompletionPolicy
重复。
以下示例显示了使用 Spring AOP 名称空间进行声明式迭代,以重复调用名为processMessage
的方法的服务(有关如何配置 AOP 拦截器的更多详细信息,请参见 Spring 用户指南):
<aop:config>
<aop:pointcut id="transactional"
expression="execution(* com..*Service.processMessage(..))" />
<aop:advisor pointcut-ref="transactional"
advice-ref="retryAdvice" order="-1"/>
</aop:config>
<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>
以下示例演示如何使用 Java 配置重复对称为processMessage
的方法的服务调用(有关如何配置 AOP 拦截器的更多详细信息,请参见 Spring 用户指南):
@Bean
public MyService myService() {
ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
factory.setInterfaces(MyService.class);
factory.setTarget(new MyService());
MyService service = (MyService) factory.getProxy();
JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
pointcut.setPatterns(".*processMessage.*");
RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();
((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));
return service;
}
前面的示例在拦截器内部使用默认值RepeatTemplate
。要更改策略,侦听器和其他详细信息,您可以将RepeatTemplate
的实例注入拦截器。
如果拦截的方法返回void
,则拦截器总是返回RepeatStatus.CONTINUABLE
(因此,如果CompletionPolicy
没有 endpoints,则存在无限循环的危险)。否则,它将返回RepeatStatus.CONTINUABLE
,直到被拦截方法的返回值为null
为止,此时它将返回RepeatStatus.FINISHED
。因此,目标方法内部的业务逻辑可以通过返回null
或引发由提供的RepeatTemplate
中的ExceptionHandler
抛出的异常来发出 signal,表明没有其他工作要做。