6. ItemReaders 和 ItemWriters
所有批处理都可以以其最简单的形式描述为读取大量数据,执行某种类型的计算或转换并写出结果。 Spring Batch 提供了三个关键接口来帮助执行批量读取和写入:ItemReader
,ItemProcessor
和ItemWriter
。
6.1 ItemReader
尽管是一个简单的概念,但ItemReader
是用于从许多不同类型的 Importing 中提供数据的方法。最一般的示例包括:
-
平面文件-平面文件 Item 读取器从平面文件中读取数据行,这些行通常描述记录的数据字段,该字段由文件中的固定位置定义或由某些特殊字符(例如逗号)分隔。
-
XML-XML ItemReaders 独立于用于解析,Map 和验证对象的技术来处理 XML。Importing 数据允许根据 XSD 模式验证 XML 文件。
-
数据库-访问数据库资源以返回结果集,该结果集可以 Map 到对象以进行处理。默认的 SQL ItemReaders 调用
RowMapper
来返回对象,如果需要重新启动,则跟踪当前行,存储基本统计信息,并提供一些事务增强功能,这将在后面进行解释。
还有更多的可能性,但本章将重点介绍基本的可能性。所有可用的 ItemReader 的完整列表可以在附录 A 中找到。
ItemReader
是通用 Importing 操作的基本界面:
public interface ItemReader<T> {
T read() throws Exception, UnexpectedInputException, ParseException;
}
read
方法定义了ItemReader
的最基本 Contract;调用它返回一个 Item,如果没有剩余的 Item,则返回 null。一个 Item 可能代表文件中的一行,数据库中的一行或 XML 文件中的元素。通常期望将它们 Map 到可用的域对象(即 Trade,Foo 等),但 Contract 中没有要求这样做。
预计ItemReader
接口的实现将仅转发。但是,如果基础资源是事务性的(例如 JMS 队列),则在回滚方案中,调用 read 可能会在后续调用中返回相同的逻辑项。还值得注意的是,ItemReader
缺少要处理的 Item 不会导致引发异常。例如,配置有返回 0 结果的查询的数据库ItemReader
只会在第一次调用read
时返回 null。
6.2 ItemWriter
ItemWriter
在功能上与ItemReader
类似,但具有相反的运算。资源仍然需要定位,打开和关闭,但是它们的区别在于ItemWriter
是写出而不是读入。对于数据库或队列,它们可能是插入,更新或发送。输出序列化的格式特定于每个批处理作业。
与ItemReader
一样,ItemWriter
是一个相当通用的接口:
public interface ItemWriter<T> {
void write(List<? extends T> items) throws Exception;
}
与ItemReader
上的read
一样,write
提供ItemWriter
的基本 Contract;只要打开,它将尝试写出传入的 Item 列表。因为通常期望将 Item“分批”在一起,然后输出,所以接口接受 Item 列表,而不是 Item 本身。写入列表后,可以执行任何必要的刷新操作,然后再从 write 方法返回。例如,如果写入一个 Hibernate DAO,则可以进行多个写入操作,每个 Item 一个。然后,Writer 可以在返回之前在休眠会话上调用 close。
6.3 ItemProcessor
ItemReader
和ItemWriter
接口对于它们的特定任务都非常有用,但是如果要在编写之前插入业务逻辑怎么办?读写的一种选择是使用复合模式:创建一个包含另一个ItemWriter
的ItemWriter
或一个包含另一个ItemReader
的ItemReader
。例如:
public class CompositeItemWriter<T> implements ItemWriter<T> {
ItemWriter<T> itemWriter;
public CompositeItemWriter(ItemWriter<T> itemWriter) {
this.itemWriter = itemWriter;
}
public void write(List<? extends T> items) throws Exception {
//Add business logic here
itemWriter.write(item);
}
public void setDelegate(ItemWriter<T> itemWriter){
this.itemWriter = itemWriter;
}
}
上面的类包含另一个ItemWriter
,它在提供了一些业务逻辑后将其委托给该ItemWriter
。该模式也可以很容易地用于ItemReader
,也许可以基于主ItemReader
提供的 Importing 来获取更多参考数据。如果您需要自己控制对write
的调用,它也很有用。但是,如果您只想在实际写入之前“转换”传递给写入的 Item,则您自己不需要调用write
:您只想修改该 Item。对于这种情况,Spring Batch 提供了ItemProcessor
接口:
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
ItemProcessor
非常简单;给定一个对象,对其进行转换,然后返回另一个。提供的对象可以是或可以不是相同的类型。关键是,业务逻辑可以在流程中应用,并且完全取决于开发人员来创建。 ItemProcessor
可以直接连接到步骤中,例如,假设ItemReader
提供了 Foo 类型的类,则需要先将其转换为 Bar 类型。可以编写ItemProcessor
来执行转换:
public class Foo {}
public class Bar {
public Bar(Foo foo) {}
}
public class FooProcessor implements ItemProcessor<Foo,Bar>{
public Bar process(Foo foo) throws Exception {
//Perform simple transformation, convert a Foo to a Bar
return new Bar(foo);
}
}
public class BarWriter implements ItemWriter<Bar>{
public void write(List<? extends Bar> bars) throws Exception {
//write bars
}
}
在上面非常简单的示例中,存在一个类Foo
,一个类Bar
和一个类FooProcessor
,它们都坚持ItemProcessor
接口。转换很简单,但是任何类型的转换都可以在这里完成。 BarWriter
将用于写出Bar
对象,如果提供任何其他类型,则会引发异常。同样,如果提供了Foo
以外的任何内容,FooProcessor
将引发异常。然后可以将FooProcessor
注入到Step
中:
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="fooProcessor" writer="barWriter"
commit-interval="2"/>
</tasklet>
</step>
</job>
6.3.1 链接 ItemProcessor
执行单个转换在许多情况下很有用,但是如果您要“链接”多个ItemProcessor
怎么办?这可以使用前面提到的复合图案来完成。要更新先前的单个转换,例如Foo
将转换为Bar
,然后将其转换为Foobar
并写出:
public class Foo {}
public class Bar {
public Bar(Foo foo) {}
}
public class Foobar{
public Foobar(Bar bar) {}
}
public class FooProcessor implements ItemProcessor<Foo,Bar>{
public Bar process(Foo foo) throws Exception {
//Perform simple transformation, convert a Foo to a Bar
return new Bar(foo);
}
}
public class BarProcessor implements ItemProcessor<Bar,FooBar>{
public FooBar process(Bar bar) throws Exception {
return new Foobar(bar);
}
}
public class FoobarWriter implements ItemWriter<FooBar>{
public void write(List<? extends FooBar> items) throws Exception {
//write items
}
}
FooProcessor
和BarProcessor
可以“链接”在一起以得到结果Foobar
:
CompositeItemProcessor<Foo,Foobar> compositeProcessor =
new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooTransformer());
itemProcessors.add(new BarTransformer());
compositeProcessor.setDelegates(itemProcessors);
与前面的示例一样,可以将复合处理器配置为Step
:
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="compositeProcessor" writer="foobarWriter"
commit-interval="2"/>
</tasklet>
</step>
</job>
<bean id="compositeItemProcessor"
class="org.springframework.batch.item.support.CompositeItemProcessor">
<property name="delegates">
<list>
<bean class="..FooProcessor" />
<bean class="..BarProcessor" />
</list>
</property>
</bean>
6.3.2 过滤记录
Item 处理器的一种典型用法是在将记录传递给 ItemWriter 之前过滤掉记录。过滤是一种不同于跳过的动作;跳过表示记录无效,而过滤仅表示不应写入记录。
例如,考虑一个批处理作业,该作业读取一个包含三种不同类型记录的文件:要插入的记录,要更新的记录和要删除的记录。如果系统不支持删除记录,则我们不希望将任何“删除”记录发送到ItemWriter
。但是,由于这些记录实际上并不是不良记录,因此我们希望将其过滤掉,而不是跳过。结果,ItemWriter 将仅接收“插入”和“更新”记录。
要过滤记录,只需从ItemProcessor
返回“ null”。框架将检测到结果为“ null”,并避免将该 Item 添加到传递给ItemWriter
的记录列表中。与往常一样,从ItemProcessor
引发的异常将导致跳过。
6.3.3 容错
回滚块时,可能会重新处理读取期间已缓存的 Item。如果将步骤配置为容错的(通常使用跳过或重试处理),则应以幂等的方式实现所使用的任何 ItemProcessor。通常,这将包括不对 ItemProcessor 的 Importing 项执行任何更改,而仅更新作为结果的实例。
6.4 ItemStream
ItemReader
和ItemWriter
都很好地满足了各自的目的,但是它们之间存在一个共同的问题,那就是需要另一个接口。通常,作为批处理作业范围的一部分,需要打开,关闭读取器和写入器,并需要一种持久化状态的机制:
public interface ItemStream {
void open(ExecutionContext executionContext) throws ItemStreamException;
void update(ExecutionContext executionContext) throws ItemStreamException;
void close() throws ItemStreamException;
}
在描述每种方法之前,我们应该提到ExecutionContext
。同样实现ItemStream
的ItemReader
的 Client 端应在对read
的任何调用之前先调用open
,以打开文件等任何资源或获得连接。类似的限制适用于实现ItemStream
的ItemWriter
。如第 2 章所述,如果在ExecutionContext
中找到了预期的数据,则可以使用它在初始状态以外的位置启动ItemReader
或ItemWriter
。相反,将调用close
以确保安全释放在open
期间分配的任何资源。调用update
主要是为了确保当前保留的任何状态都已加载到提供的ExecutionContext
中。在提交之前将调用此方法,以确保在提交之前将当前状态保留在数据库中。
在ItemStream
的 Client 端是Step
(来自 Spring Batch Core)的特殊情况下,将为每个StepExecution
创建一个ExecutionContext
,以允许用户存储特定执行的状态,并期望如果执行时返回该状态。相同的JobInstance
重新启动。对于那些熟悉 Quartz 的人,其语义与 Quartz JobDataMap
非常相似。
6.5 委托模式和步骤注册
请注意,CompositeItemWriter
是委派模式的示例,在 Spring Batch 中很常见。委托本身可以实现回调接口StepListener
。如果这样做的话,并且它们与 Spring Batch Core 一起作为Job
中Step
的一部分使用,那么几乎可以肯定他们需要手动向Step
注册。如果直接实现该接口的ItemStream
或StepListener
接口,则直接注册到该步骤中的读取器,写入器或处理器将被自动注册。但是由于Step
并不了解委托,因此需要将它们作为侦听器或流(或在适当时将两者同时注入)注入:
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="fooProcessor" writer="compositeItemWriter"
commit-interval="2">
<streams>
<stream ref="barWriter" />
</streams>
</chunk>
</tasklet>
</step>
</job>
<bean id="compositeItemWriter" class="...CustomCompositeItemWriter">
<property name="delegate" ref="barWriter" />
</bean>
<bean id="barWriter" class="...BarWriter" />
6.6 平面文件
交换批量数据的最常见机制之一一直是平面文件。与 XML 具有定义其结构化(XSD)的公认标准不同,任何阅读平面文件的人都必须提前了解文件的结构。通常,所有平面文件都分为两种:定界文件和固定长度文件。分隔文件是指用逗号分隔分隔符的字段。固定长度文件具有设置长度的字段。
6.6.1 FieldSet
在 Spring Batch 中使用平面文件时,无论是用于 Importing 还是输出,最重要的类之一是FieldSet
。许多体系结构和库都包含用于帮助您从文件读入的抽象,但是它们通常返回 String 或 String 数组。这真的只会让您半途而废。 FieldSet
是 Spring Batch 的抽象,用于启用文件资源中字段的绑定。它使开发人员可以像处理数据库 Importing 一样使用文件 Importing。 FieldSet
在概念上与 Jdbc ResultSet
非常相似。 FieldSet 仅需要一个参数,即String
标记数组。 (可选)您还可以在字段名称中进行配置,以便可以按索引或ResultSet
之后的名称访问字段:
String[] tokens = new String[]{"foo", "1", "true"};
FieldSet fs = new DefaultFieldSet(tokens);
String name = fs.readString(0);
int value = fs.readInt(1);
boolean booleanValue = fs.readBoolean(2);
FieldSet
界面上还有更多选项,例如Date
,long,BigDecimal
等。FieldSet
的最大优点是,它提供了对平面文件 Importing 的一致解析。在处理由格式异常引起的错误或进行简单的数据转换时,它可以保持一致,而不是使每个批处理作业以潜在的意外方式进行不同的解析。
6.6.2 FlatFileItemReader
平面文件是最多包含二维(表格)数据的任何类型的文件。 FlatFileItemReader
类有助于在 Spring Batch 框架中读取平面文件,该类提供了用于读取和解析平面文件的基本功能。 FlatFileItemReader
的两个最重要的必需依赖项是Resource
和LineMapper.
。在下一部分中将进一步探讨LineMapper
接口。 resource 属性表示一个 Spring Core Resource
。可以在Spring 框架,第 5 章资源中找到说明如何创建此类 bean 的文档。因此,本指南将不涉及创建Resource
对象的详细信息。但是,可以在下面找到文件系统资源的简单示例:
Resource resource = new FileSystemResource("resources/trades.csv");
在复杂的批处理环境中,目录结构通常由 EAI 基础结构 Management,在 EAI 基础结构中,构建了用于外部接口的放置区,以将文件从 ftp 位置移动到批处理位置,反之亦然。文件移动 Util 超出了 Spring 批处理体系结构的范围,但是批处理作业流中包含文件移动 Util 作为作业流中的步骤并不少见。批处理体系结构只需要知道如何找到要处理的文件就足够了。 Spring Batch 从此起点开始将数据馈入管道的过程。但是,Spring Integration提供了许多此类服务。
FlatFileItemReader
中的其他属性使您可以进一步指定如何解释数据:
表 6.1. FlatFileItemReader 属性
Property | Type | Description |
---|---|---|
comments | String[] | 指定指示 Comments 行的行前缀 |
encoding | String | 指定要使用的文本编码-默认为“ ISO-8859-1” |
lineMapper | LineMapper | 将表示 Item 的String 转换为Object 。 |
linesToSkip | int | 文件顶部要忽略的行数 |
recordSeparatorPolicy | RecordSeparatorPolicy | 用于确定行尾的位置,并执行诸如在带引号的字符串中 continue 到行尾的操作。 |
resource | Resource | 从中读取资源。 |
skippedLinesCallback | LineCallbackHandler | 该接口传递要跳过的文件中各行的原始行内容。如果 linesToSkip 设置为 2,则此接口将被调用两次。 |
strict | boolean | 在严格模式下,如果 Importing 资源不存在,则读取器将在 ExecutionContext 上引发异常。 |
LineMapper
与RowMapper
一样,它采用诸如ResultSet
之类的低级构造并返回Object
,平面文件处理需要相同的构造才能将String
行转换为Object
:
public interface LineMapper<T> {
T mapLine(String line, int lineNumber) throws Exception;
}
基本约定是,给定当前行及其关联的行号,Map 器应返回结果域对象。这与RowMapper
相似,因为每一行都与其行号相关联,就像ResultSet
中的每一行都与其行号相关联一样。这允许将行号绑定到结果域对象,以进行身份比较或提供更多信息。但是,与RowMapper
不同,LineMapper
被赋予了原始行,如上所述,该原始行只会使您到达中间。该行必须标记为FieldSet
,然后可以将其 Map 到对象,如下所述。
LineTokenizer
必须将 Importing 行转换为行FieldSet
的抽象,因为可能需要将许多格式的平面文件数据转换为FieldSet
。在 Spring Batch 中,此接口是LineTokenizer
:
public interface LineTokenizer {
FieldSet tokenize(String line);
}
LineTokenizer
的约定使得在给定 Importing 行的情况下(理论上String
可以包含多行),将返回代表该行的FieldSet
。然后可以将此FieldSet
传递给FieldSetMapper
。 Spring Batch 包含以下LineTokenizer
实现:
-
DelmitedLineTokenizer
-用于 Logging 的字段由定界符分隔的文件。最常见的定界符是逗号,但是也经常使用竖线或分号。 -
FixedLengthTokenizer
-用于 Logging 字段均为“固定宽度”的文件。必须为每种记录类型定义每个字段的宽度。 -
PatternMatchingCompositeLineTokenizer
-通过检查模式,确定应在特定行上使用LineTokenizer
s 列表中的哪一个。
FieldSetMapper
FieldSetMapper
接口定义单个方法mapFieldSet
,该方法采用FieldSet
对象并将其内容 Map 到对象。根据作业的需要,此对象可以是自定义 DTO,域对象或简单数组。 FieldSetMapper
与LineTokenizer
结合使用可将一行数据从资源转换为所需类型的对象:
public interface FieldSetMapper<T> {
T mapFieldSet(FieldSet fieldSet);
}
使用的模式与JdbcTemplate
使用的RowMapper
相同。
DefaultLineMapper
既然已经定义了读取平面文件的基本接口,那么很明显,需要三个基本步骤:
-
从文件中读取一行。
-
将字符串行传递到
LineTokenizer#tokenize
()方法中,以便检索FieldSet
。 -
将标记化返回的
FieldSet
传递给FieldSetMapper
,并从ItemReader#read
()方法返回结果。
上面描述的两个接口代表两个单独的任务:将线转换为FieldSet
,并将FieldSet
Map 到域对象。因为LineTokenizer
的 Importing 与LineMapper
的 Importing(一行)匹配,并且FieldSetMapper
的输出与LineMapper
的输出匹配,所以提供了同时使用LineTokenizer
和FieldSetMapper
的默认实现。 DefaultLineMapper
代表大多数用户将需要的行为:
public class DefaultLineMapper<T> implements LineMapper<T>, InitializingBean {
private LineTokenizer tokenizer;
private FieldSetMapper<T> fieldSetMapper;
public T mapLine(String line, int lineNumber) throws Exception {
return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
}
public void setLineTokenizer(LineTokenizer tokenizer) {
this.tokenizer = tokenizer;
}
public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) {
this.fieldSetMapper = fieldSetMapper;
}
}
以上功能是默认实现中提供的,而不是内置于 Reader 本身中(如在框架的先前版本中所做的那样),以便允许用户在控制解析过程时具有更大的灵 Active,尤其是在访问原始行的情况下。需要。
简单分隔文件读取示例
以下示例将用于使用实际域方案进行说明。这个特定的批处理作业从以下文件中读取足球运动员:
ID,lastName,firstName,position,birthYear,debutYear
"AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996",
"AbduRa00,Abdullah,Rabih,rb,1975,1999",
"AberWa00,Abercrombie,Walter,rb,1959,1982",
"AbraDa00,Abramowicz,Danny,wr,1945,1967",
"AdamBo00,Adams,Bob,te,1946,1969",
"AdamCh00,Adams,Charlie,wr,1979,2003"
该文件的内容将 Map 到以下Player
域对象:
public class Player implements Serializable {
private String ID;
private String lastName;
private String firstName;
private String position;
private int birthYear;
private int debutYear;
public String toString() {
return "PLAYER:ID=" + ID + ",Last Name=" + lastName +
",First Name=" + firstName + ",Position=" + position +
",Birth Year=" + birthYear + ",DebutYear=" +
debutYear;
}
// setters and getters...
}
为了将FieldSet
Map 到Player
对象,需要定义返回玩家的FieldSetMapper
:
protected static class PlayerFieldSetMapper implements FieldSetMapper<Player> {
public Player mapFieldSet(FieldSet fieldSet) {
Player player = new Player();
player.setID(fieldSet.readString(0));
player.setLastName(fieldSet.readString(1));
player.setFirstName(fieldSet.readString(2));
player.setPosition(fieldSet.readString(3));
player.setBirthYear(fieldSet.readInt(4));
player.setDebutYear(fieldSet.readInt(5));
return player;
}
}
然后可以通过正确构造FlatFileItemReader
并调用read
来读取文件:
FlatFileItemReader<Player> itemReader = new FlatFileItemReader<Player>();
itemReader.setResource(new FileSystemResource("resources/players.csv"));
//DelimitedLineTokenizer defaults to comma as its delimiter
DefaultLineMapper<Player> lineMapper = new DefaultLineMapper<Player>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new PlayerFieldSetMapper());
itemReader.setLineMapper(lineMapper);
itemReader.open(new ExecutionContext());
Player player = itemReader.read();
每次对read
的调用都会从文件的每一行返回一个新的 Player 对象。到达文件末尾时,将返回 null。
按名称 Map 字段
DelimitedLineTokenizer
和FixedLengthTokenizer
允许另外一项功能,其功能类似于 Jdbc ResultSet
。字段的名称可以被注入到这些LineTokenizer
实现中,以提高 Map 函数的可读性。首先,将平面文件中所有字段的列名注入令牌生成器中:
tokenizer.setNames(new String[] {"ID", "lastName","firstName","position","birthYear","debutYear"});
FieldSetMapper
可以使用以下信息:
public class PlayerMapper implements FieldSetMapper<Player> {
public Player mapFieldSet(FieldSet fs) {
if(fs == null){
return null;
}
Player player = new Player();
player.setID(fs.readString("ID"));
player.setLastName(fs.readString("lastName"));
player.setFirstName(fs.readString("firstName"));
player.setPosition(fs.readString("position"));
player.setDebutYear(fs.readInt("debutYear"));
player.setBirthYear(fs.readInt("birthYear"));
return player;
}
}
将字段集自动 Map 到域对象
对于许多人来说,必须编写特定的FieldSetMapper
与为JdbcTemplate
编写特定的RowMapper
一样麻烦。 Spring Batch 通过提供FieldSetMapper
来简化此过程,该FieldSetMapper
通过使用 JavaBean 规范将字段名称与对象上的设置器进行匹配来自动 Map 字段。再次使用足球示例,BeanWrapperFieldSetMapper
配置如下所示:
<bean id="fieldSetMapper"
class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<property name="prototypeBeanName" value="player" />
</bean>
<bean id="player"
class="org.springframework.batch.sample.domain.Player"
scope="prototype" />
对于FieldSet
中的每个条目,Map 器将在Player
对象的新实例上查找对应的 setter(因此,需要原型作用域),方式与 Spring 容器将查找与属性名称匹配的 setter 相同。 FieldSet
中的每个可用字段都将被 Map,并且将返回结果Player
对象,而无需代码。
定长文件格式
到目前为止,仅详细讨论了定界文件,但是,它们仅占文件读取图片的一半。许多使用平面文件的组织都使用固定长度格式。固定长度文件示例如下:
UK21341EAH4121131.11customer1
UK21341EAH4221232.11customer2
UK21341EAH4321333.11customer3
UK21341EAH4421434.11customer4
UK21341EAH4521535.11customer5
虽然这 Watch 起来像一个大字段,但实际上代表了 4 个不同的字段:
-
ISIN:要 Order 的商品的唯一标识符-12 个字符长。
-
数量:已 Order 的 Item 数-3 个字符长。
-
价格:商品价格-5 个字符长。
-
Client:Order 商品的 ClientID-9 个字符长。
配置FixedLengthLineTokenizer
时,必须以范围的形式提供以下每种长度:
<bean id="fixedLengthLineTokenizer"
class="org.springframework.batch.io.file.transform.FixedLengthTokenizer">
<property name="names" value="ISIN,Quantity,Price,Customer" />
<property name="columns" value="1-12, 13-15, 16-20, 21-29" />
</bean>
因为FixedLengthLineTokenizer
使用与上述相同的LineTokenizer
接口,所以它将返回相同的FieldSet
,就好像使用了分隔符一样。这允许在处理其输出时使用相同的方法,例如使用BeanWrapperFieldSetMapper
。
Note
要支持上述范围语法,需要在ApplicationContext
中配置专门的属性编辑器RangeArrayPropertyEditor
。但是,此 Bean 是在使用批处理名称空间的ApplicationContext
中自动声明的。
单个文件中的多种记录类型
到目前为止,为简单起见,所有文件读取示例都作了一个关键假设:文件中的所有记录都具有相同的格式。但是,并非总是如此。通常,文件中的记录可能具有不同的格式,需要对其进行不同的标记和 Map 到不同的对象。以下文件摘录对此进行了说明:
USER;Smith;Peter;;T;20014539;F
LINEA;1044391041ABC037.49G201XX1383.12H
LINEB;2134776319DEF422.99M005LI
在此文件中,我们有三种类型的记录:“ USER”,“ LINEA”和“ LINEB”。 “ USER”行对应于一个 User 对象。尽管“ LINEA”比“ LINEB”具有更多信息,但“ LINEA”和“ LINEB”都对应于 Line 对象。
ItemReader
将分别读取每一行,但是我们必须指定不同的LineTokenizer
和FieldSetMapper
对象,以便ItemWriter
将接收正确的 Item。 PatternMatchingCompositeLineMapper
通过允许配置模式到LineTokenizer
的 Map 以及模式到FieldSetMapper
的 Map,使此操作变得容易:
<bean id="orderFileLineMapper"
class="org.spr...PatternMatchingCompositeLineMapper">
<property name="tokenizers">
<map>
<entry key="USER*" value-ref="userTokenizer" />
<entry key="LINEA*" value-ref="lineATokenizer" />
<entry key="LINEB*" value-ref="lineBTokenizer" />
</map>
</property>
<property name="fieldSetMappers">
<map>
<entry key="USER*" value-ref="userFieldSetMapper" />
<entry key="LINE*" value-ref="lineFieldSetMapper" />
</map>
</property>
</bean>
在此示例中,“ LINEA”和“ LINEB”具有单独的LineTokenizer
,但它们都使用相同的FieldSetMapper
。
PatternMatchingCompositeLineMapper
使用PatternMatcher
的match
方法来为每行选择正确的委托。 PatternMatcher
允许使用两个具有特殊含义的通配符:问号(“?”)恰好匹配一个字符,而星号(“ *”)则匹配零个或多个字符。请注意,在上述配置中,所有模式都以星号结尾,从而使它们有效地成为行的前缀。 PatternMatcher
将始终匹配最可能的特定模式,而不考虑配置中的 Sequences。因此,如果“ LINE *”和“ LINEA *”都被列为模式,则“ LINEA”将与模式“ LINEA *”匹配,而“ LINEB”将与模式“ LINE *”匹配。另外,单个星号(“ *”)可以通过匹配未与任何其他模式匹配的任何行作为默认值。
<entry key="*" value-ref="defaultLineTokenizer" />
还有一个PatternMatchingCompositeLineTokenizer
可以单独用于令牌化。
平面文件包含每个跨越多行的记录也是很常见的。为了处理这种情况,需要更复杂的策略。 第 11.5 节“多行记录”中提供了这种常见模式的演示。
平面文件中的异常处理
在很多情况下,对行进行标记可能会引发异常。许多平面文件并不完美,并且包含格式不正确的记录。许多用户选择跳过这些错误的行,注销问题,原始行和行号。以后可以手动或通过其他批处理作业检查这些日志。因此,Spring Batch 提供了一个用于处理解析异常的异常层次结构:FlatFileParseException
和FlatFileFormatException
。尝试读取文件时遇到任何错误,FlatFileItemReader
会引发FlatFileParseException
。 LineTokenizer
接口的实现抛出FlatFileFormatException
,并指示在标记化时遇到的更具体的错误。
IncorrectTokenCountException
DelimitedLineTokenizer
和FixedLengthLineTokenizer
都可以指定可用于创建FieldSet
的列名。但是,如果列名的数量与对行进行标记时找到的列数不匹配,则无法创建FieldSet
,并抛出IncorrectTokenCountException
,其中包含遇到的令牌数和预期的数目:
tokenizer.setNames(new String[] {"A", "B", "C", "D"});
try {
tokenizer.tokenize("a,b,c");
}
catch(IncorrectTokenCountException e){
assertEquals(4, e.getExpectedCount());
assertEquals(3, e.getActualCount());
}
因为令牌化器配置了 4 个列名,但是在文件中仅找到 3 个令牌,所以抛出了IncorrectTokenCountException
。
IncorrectLineLengthException
解析为固定长度格式的文件在解析时还有其他要求,因为与分隔格式不同,每一列必须严格遵守其 sched 义宽度。如果总行长不等于该列的最宽值,则抛出异常:
tokenizer.setColumns(new Range[] { new Range(1, 5),
new Range(6, 10),
new Range(11, 15) });
try {
tokenizer.tokenize("12345");
fail("Expected IncorrectLineLengthException");
}
catch (IncorrectLineLengthException ex) {
assertEquals(15, ex.getExpectedLength());
assertEquals(5, ex.getActualLength());
}
上面标记器的配置范围是:1-5、6-10 和 11-15,因此预期行的总长度为 15.但是,在这种情况下,传入了长度为 5 的行,从而导致IncorrectLineLengthException
被抛出。在这里抛出异常而不是仅 Map 第一列,可以使行的处理更早地失败,并且比试图在FieldSetMapper
的第 2 列中读取失败的情况提供更多的信息。但是,在某些情况下,线的长度并不总是恒定的。因此,可以通过“ strict”属性关闭行长的验证:
tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10) });
tokenizer.setStrict(false);
FieldSet tokens = tokenizer.tokenize("12345");
assertEquals("12345", tokens.readString(0));
assertEquals("", tokens.readString(1));
上面的示例几乎与之前的示例相同,只是调用了 tokenizer.setStrict(false)。此设置告诉令牌化程序在对行进行令牌化时不要强制行长。现在已正确创建并返回FieldSet
。但是,它将仅包含剩余值的空令牌。
6.6.3 FlatFileItemWriter
写入平面文件具有相同的问题和必须从文件中读取的问题。步骤必须能够以事务方式以定界或定长格式写出。
LineAggregator
就像需要LineTokenizer
接口来获取一项并将其变成String
一样,文件写入必须具有一种将多个字段聚合到单个字符串中以写入文件的方法。在 Spring Batch 中,这是LineAggregator
:
public interface LineAggregator<T> {
public String aggregate(T item);
}
LineAggregator
与LineTokenizer
相反。 LineTokenizer
取String
并返回FieldSet
,而LineAggregator
取item
并返回String
。
PassThroughLineAggregator
LineAggregator 接口的最基本实现是PassThroughLineAggregator
,它简单地假定对象已经是一个字符串,或者它的字符串表示形式可以用于编写:
public class PassThroughLineAggregator<T> implements LineAggregator<T> {
public String aggregate(T item) {
return item.toString();
}
}
如果需要直接控制创建字符串,但是上面的实现很有用,但是FlatFileItemWriter
的优点(例如事务和重新启动支持)是必需的。
简化文件编写示例
既然已经定义了LineAggregator
接口及其最基本的实现PassThroughLineAggregator
,那么可以说明基本的编写流程:
-
要写入的对象被传递到
LineAggregator
以获得String
。 -
返回的
String
将被写入配置的文件。
FlatFileItemWriter
的以下摘录用代码表示:
public void write(T item) throws Exception {
write(lineAggregator.aggregate(item) + LINE_SEPARATOR);
}
一个简单的配置如下所示:
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" value="file:target/test-outputs/output.txt" />
<property name="lineAggregator">
<bean class="org.spr...PassThroughLineAggregator"/>
</property>
</bean>
FieldExtractor
上面的示例对于写入文件的最基本用途可能很有用。但是,FlatFileItemWriter
的大多数用户将具有需要写出的域对象,因此必须将其转换为一行。在读取文件时,需要满足以下条件:
-
从文件中读取一行。
-
将字符串行传递到
LineTokenizer#tokenize
()方法中,以便检索FieldSet
-
将令牌化返回的
FieldSet
传递给FieldSetMapper
,并从ItemReader#read
()方法返回结果
文件写入具有相似但相反的步骤:
-
将要写入的 Item 传递给 Writer
-
将 Item 上的字段转换为数组
-
将结果数组聚合为一行
因为框架没有办法知道对象中哪些字段需要写出,所以必须写一个FieldExtractor
来完成将 Item 变成数组的任务:
public interface FieldExtractor<T> {
Object[] extract(T item);
}
FieldExtractor
接口的实现应从提供的对象的字段中创建一个数组,然后可以使用元素之间的分隔符将其写出,也可以将其写为字段宽度线的一部分。
PassThroughFieldExtractor
在许多情况下,需要写出集合,例如数组Collection
或FieldSet
。从这些集合类型之一中“提取”数组非常简单:只需将集合转换为数组即可。因此,在这种情况下应使用PassThroughFieldExtractor
。应当注意,如果传入的对象不是集合类型,则PassThroughFieldExtractor
将返回仅包含要提取的 Item 的数组。
BeanWrapperFieldExtractor
与在文件读取部分中介绍的BeanWrapperFieldSetMapper
一样,通常最好配置如何将域对象转换为对象数组,而不是自己编写转换。 BeanWrapperFieldExtractor
仅提供这种类型的功能:
BeanWrapperFieldExtractor<Name> extractor = new BeanWrapperFieldExtractor<Name>();
extractor.setNames(new String[] { "first", "last", "born" });
String first = "Alan";
String last = "Turing";
int born = 1912;
Name n = new Name(first, last, born);
Object[] values = extractor.extract(n);
assertEquals(first, values[0]);
assertEquals(last, values[1]);
assertEquals(born, values[2]);
此提取器实现只有一个必需的属性,即要 Map 的字段名称。就像BeanWrapperFieldSetMapper
需要字段名称将FieldSet
上的字段 Map 到所提供对象上的 setter 一样,BeanWrapperFieldExtractor
也需要名称 Map 到 getter 来创建对象数组。值得注意的是,名称的 Sequences 决定了数组中字段的 Sequences。
分隔文件写入示例
最基本的平面文件格式是其中所有字段都由定界符分隔的格式。这可以使用DelimitedLineAggregator
完成。下面的示例写出一个简单的域对象,该对象代表 Client 帐户的贷方:
public class CustomerCredit {
private int id;
private String name;
private BigDecimal credit;
//getters and setters removed for clarity
}
由于正在使用域对象,因此必须提供 FieldExtractor 接口的实现以及要使用的定界符:
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator">
<bean class="org.spr...DelimitedLineAggregator">
<property name="delimiter" value=","/>
<property name="fieldExtractor">
<bean class="org.spr...BeanWrapperFieldExtractor">
<property name="names" value="name,credit"/>
</bean>
</property>
</bean>
</property>
</bean>
在这种情况下,本章前面介绍的BeanWrapperFieldExtractor
用于将CustomerCredit
中的名称和贷方字段转换为对象数组,然后将其写成每个字段之间的逗号。
定宽文件写入示例
分隔不是平面文件格式的唯一类型。许多人更喜欢为每个列使用固定宽度来在字段之间划定轮廓,这通常称为“固定宽度”。 Spring Batch 通过FormatterLineAggregator
在文件写入中支持此功能。使用上述相同的CustomerCredit
域对象,可以将其配置如下:
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator">
<bean class="org.spr...FormatterLineAggregator">
<property name="fieldExtractor">
<bean class="org.spr...BeanWrapperFieldExtractor">
<property name="names" value="name,credit" />
</bean>
</property>
<property name="format" value="%-9s%-2.0f" />
</bean>
</property>
</bean>
上面的大多数示例应该 Watch 起来很熟悉。但是,format 属性的值是新的:
<property name="format" value="%-9s%-2.0f" />
使用与 Java 5 相同的Formatter
构建底层实现。Java Formatter
基于 C 编程语言的printf
功能。有关如何配置格式化程序的大多数详细信息,请参见Formatter的 javadoc。
处理文件创建
FlatFileItemReader
与文件资源的关系非常简单。初始化 Reader 后,它将打开文件(如果存在),并引发异常(如果没有)。文件写入并不是那么简单。乍一 Watch,似乎对于FlatFileItemWriter
应该存在类似的直接约定:如果文件已经存在,则引发异常;如果不存在,则创建它并开始写入。但是,潜在地重新启动Job
可能会导致问题。在正常的重新启动方案中,Contract 是相反的:如果文件存在,则从最后一个已知的好的位置开始对其进行写入,如果不存在,则引发异常。但是,如果此作业的文件名始终相同会怎样?在这种情况下,您希望删除该文件(如果存在),除非重新启动。由于这种可能性,FlatFileItemWriter
包含属性shouldDeleteIfExists
。将此属性设置为 true 将导致在打开编写器时删除具有相同名称的现有文件。
6.7 XMLItem 读取器和写入器
Spring Batch 提供了用于读取 XML 记录并将它们 Map 到 Java 对象以及将 Java 对象编写为 XML 记录的事务性基础结构。
Note
StAX API 用于 I/O,因为其他标准 XML 解析 API 不符合批处理要求(DOM 将整个 Importing 立即加载到内存中,而 SAX 控制解析过程,仅允许用户提供回调)。
让我们仔细 WatchWatchSpring Batch 中 XMLImporting 和输出的工作方式。首先,有一些概念与文件读写不同,但在 Spring Batch XML 处理中很常见。通过 XML 处理,而不是需要标记的记录行(FieldSets),假定 XML 资源是与各个记录相对应的“片段”的集合:
图 3.1:XMLImporting
在上述方案中,“贸易”标签被定义为“根元素”。 ' '和' '之间的所有内容均被视为一个“片段”。 Spring Batch 使用对象/ XMLMap(OXM)将片段绑定到对象。但是,Spring Batch 不与任何特定的 XML 绑定技术绑定。典型的用法是委托Spring OXM,它为最流行的 OXM 技术提供统一的抽象。对 Spring OXM 的依赖关系是可选的,如果需要,您可以选择实现特定于 Spring Batch 的接口。与 OXM 支持的技术的关系可以显示如下:
图 3.2:OXM 绑定
现在,对 OXM 进行了介绍,并介绍了如何使用 XML 片段来表示记录,下面让我们仔细 WatchWatchReader。
6.7.1 StaxEventItemReader
StaxEventItemReader
配置提供了用于处理 XMLImporting 流中的记录的典型设置。首先,让我们检查StaxEventItemReader
可以处理的一组 XML 记录。
<?xml version="1.0" encoding="UTF-8"?>
<records>
<trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0001</isin>
<quantity>5</quantity>
<price>11.39</price>
<customer>Customer1</customer>
</trade>
<trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0002</isin>
<quantity>2</quantity>
<price>72.99</price>
<customer>Customer2c</customer>
</trade>
<trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0003</isin>
<quantity>9</quantity>
<price>99.99</price>
<customer>Customer3</customer>
</trade>
</records>
为了能够处理 XML 记录,需要执行以下操作:
-
根元素名称-组成要 Map 对象的片段的根元素的名称。示例配置通过贸易价值展示了这一点。
-
资源-表示要读取的文件的 Spring 资源。
-
Unmarshaller
-Spring OXM 提供的解组工具,用于将 XML 片段 Map 到对象。
<bean id="itemReader" class="org.springframework.batch.item.xml.StaxEventItemReader">
<property name="fragmentRootElementName" value="trade" />
<property name="resource" value="data/iosample/input/input.xml" />
<property name="unmarshaller" ref="tradeMarshaller" />
</bean>
注意,在此示例中,我们选择使用XStreamMarshaller
来接受作为 Map 传入的别名,其中第一个键和值是片段的名称(即根元素)以及要绑定的对象类型。然后,类似于FieldSet
,Map 到对象类型内字段的其他元素的名称在 Map 中描述为键/值对。在配置文件中,我们可以使用 Spring 配置 Util 来描述所需的别名,如下所示:
<bean id="tradeMarshaller"
class="org.springframework.oxm.xstream.XStreamMarshaller">
<property name="aliases">
<util:map id="aliases">
<entry key="trade"
value="org.springframework.batch.sample.domain.Trade" />
<entry key="price" value="java.math.BigDecimal" />
<entry key="name" value="java.lang.String" />
</util:map>
</property>
</bean>
Importing 时,Reader 将读取 XML 资源,直到它识别出一个新的片段即将开始(默认情况下通过匹配标记名称)。读取器从该片段创建一个独立的 XML 文档(或至少使它 Watch 起来如此),然后将该文档传递给解串器(通常是 Spring OXM Unmarshaller
的包装器),以将 XMLMap 到 Java 对象。
总之,此过程类似于以下脚本 Java 代码,该代码使用 Spring 配置提供的注入:
StaxEventItemReader xmlStaxEventItemReader = new StaxEventItemReader()
Resource resource = new ByteArrayResource(xmlResource.getBytes())
Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
XStreamMarshaller unmarshaller = new XStreamMarshaller();
unmarshaller.setAliases(aliases);
xmlStaxEventItemReader.setUnmarshaller(unmarshaller);
xmlStaxEventItemReader.setResource(resource);
xmlStaxEventItemReader.setFragmentRootElementName("trade");
xmlStaxEventItemReader.open(new ExecutionContext());
boolean hasNext = true
CustomerCredit credit = null;
while (hasNext) {
credit = xmlStaxEventItemReader.read();
if (credit == null) {
hasNext = false;
}
else {
System.out.println(credit);
}
}
6.7.2 StaxEventItemWriter
输出与 Importing 对称地工作。 StaxEventItemWriter
需要Resource
,编组和rootTagName
。将 Java 对象传递到编组器(通常是标准 Spring OXM Marshaller
),编组器使用自定义事件编写器写入Resource
,该事件编写器过滤 OXM 工具为每个片段生成的StartDocument
和EndDocument
事件。我们将在使用MarshallingEventWriterSerializer
的示例中对此进行展示。此设置的 Spring 配置如下所示:
<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" ref="outputResource" />
<property name="marshaller" ref="customerCreditMarshaller" />
<property name="rootTagName" value="customers" />
<property name="overwriteOutput" value="true" />
</bean>
该配置设置了三个必需的属性,还可以选择设置 overwriteOutput = true,这在本章前面提到的用于指定是否可以覆盖现有文件。应当注意,用于编写程序的编组器与本章前面的阅读示例中使用的编组器完全相同:
<bean id="customerCreditMarshaller"
class="org.springframework.oxm.xstream.XStreamMarshaller">
<property name="aliases">
<util:map id="aliases">
<entry key="customer"
value="org.springframework.batch.sample.domain.CustomerCredit" />
<entry key="credit" value="java.math.BigDecimal" />
<entry key="name" value="java.lang.String" />
</util:map>
</property>
</bean>
总结一下 Java 示例,以下代码说明了所有讨论的要点,展示了所需属性的编程设置:
StaxEventItemWriter staxItemWriter = new StaxEventItemWriter()
FileSystemResource resource = new FileSystemResource("data/outputFile.xml")
Map aliases = new HashMap();
aliases.put("customer","org.springframework.batch.sample.domain.CustomerCredit");
aliases.put("credit","java.math.BigDecimal");
aliases.put("name","java.lang.String");
Marshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
staxItemWriter.setResource(resource);
staxItemWriter.setMarshaller(marshaller);
staxItemWriter.setRootTagName("trades");
staxItemWriter.setOverwriteOutput(true);
ExecutionContext executionContext = new ExecutionContext();
staxItemWriter.open(executionContext);
CustomerCredit Credit = new CustomerCredit();
trade.setPrice(11.39);
credit.setName("Customer1");
staxItemWriter.write(trade);
6.8 多文件 Importing
通常在单个Step
中处理多个文件。假设文件格式相同,则MultiResourceItemReader
支持 XML 和平面文件处理的这种类型的 Importing。考虑目录中的以下文件:
file-1.txt file-2.txt ignored.txt
file-1.txt 和 file-2.txt 的格式相同,出于商业原因,应一起处理。可以使用通配符使用MuliResourceItemReader
读取两个文件:
<bean id="multiResourceReader" class="org.spr...MultiResourceItemReader">
<property name="resources" value="classpath:data/input/file-*.txt" />
<property name="delegate" ref="flatFileItemReader" />
</bean>
引用的委托是简单的FlatFileItemReader
。上面的配置将从两个文件中读取 Importing,以处理回滚和重新启动场景。应当注意,与任何ItemReader
一样,添加额外的 Importing(在这种情况下为文件)可能会在重新启动时引起潜在的问题。建议批处理作业使用其各自的目录,直到成功完成为止。
6.9 Database
像大多数企业应用程序样式一样,数据库是批处理的中央存储机制。但是,批处理与其他应用程序样式不同,这是由于系统必须使用的数据集的绝对大小。如果 SQL 语句返回 100 万行,则结果集可能将所有返回的结果保存在内存中,直到读取了所有行。 Spring Batch 针对此问题提供了两种类型的解决方案:游标和分页数据库 ItemReaders。
6.9.1 基于游标的 ItemReader
通常,使用数据库游标是大多数批处理开发人员的默认方法,因为它是数据库解决“流式”关系数据问题的方法。 Java ResultSet
类本质上是用于操纵游标的面向对象的机制。 ResultSet
将光标保留到当前数据行。在ResultSet
上调用next
会将光标移到下一行。基于 Spring Batch 游标的 ItemReaders 在初始化时打开游标,并针对每次对read
的调用将游标向前移动一行,从而返回可用于处理的 Map 对象。然后将调用close
方法以确保释放所有资源。 Spring 核心JdbcTemplate
通过使用回调模式完全 MapResultSet
中的所有行并在将控制权返回给方法调用者之前关闭来解决此问题。但是,必须分批完成,直到步骤完成。下面是基于游标的ItemReader
的工作原理的一般示意图,虽然以 SQL 语句为例,因为它广为人知,但是任何技术都可以实现基本方法:
本示例说明了基本模式。给定一个'FOO'表,该表具有三列:ID,NAME 和 BAR,选择 ID 大于 1 但小于 7 的所有行。这会将光标的开始(行 1)放在 ID 2 上。结果该行的内容应该是完全 Map 的 Foo 对象。再次调用read
()将光标移动到下一行,即 ID 为 3 的 Foo。这些读取的结果将在每个read
之后写出,从而允许对对象进行垃圾回收(假设没有实例变量是维护对它们的引用)。
JdbcCursorItemReader
JdbcCursorItemReader
是基于游标的技术的 Jdbc 实现。它直接与ResultSet
一起使用,并且需要 SQL 语句针对从DataSource
获得的连接运行。以下数据库模式将用作示例:
CREATE TABLE CUSTOMER (
ID BIGINT IDENTITY PRIMARY KEY,
NAME VARCHAR(45),
CREDIT FLOAT
);
许多人喜欢为每一行使用一个域对象,因此我们将使用RowMapper
接口的实现来 MapCustomerCredit
对象:
public class CustomerCreditRowMapper implements RowMapper {
public static final String ID_COLUMN = "id";
public static final String NAME_COLUMN = "name";
public static final String CREDIT_COLUMN = "credit";
public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
CustomerCredit customerCredit = new CustomerCredit();
customerCredit.setId(rs.getInt(ID_COLUMN));
customerCredit.setName(rs.getString(NAME_COLUMN));
customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));
return customerCredit;
}
}
由于JdbcTemplate
对 Spring 的用户非常熟悉,并且JdbcCursorItemReader
与它共享关键接口,因此有一个示例如何使用JdbcTemplate
读取此数据以与ItemReader
进行对比很有用。就本示例而言,我们假设 CUSTOMER 数据库中有 1,000 行。第一个示例将使用JdbcTemplate
:
//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
new CustomerCreditRowMapper());
运行此代码段后,customerCredits 列表将包含 1,000 个CustomerCredit
对象。在查询方法中,将从DataSource
获得连接,将针对该DataSource
运行所提供的 SQL,并对ResultSet
中的每一行调用mapRow
方法。让我们将此与JdbcCursorItemReader
的方法进行对比:
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close(executionContext);
运行此代码段后,计数器将等于 1,000.如果上面的代码已将返回的 customerCredit 放入列表中,则结果将与JdbcTemplate
示例完全相同。但是,ItemReader
的最大优点是它允许“流式传输”Item。可以一次调用read
方法,然后通过ItemWriter
写出该 Item,然后通过read
获得下一个 Item。这样就可以在“块”中进行 Item 读取和写入,并定期进行,这是高性能批处理的本质。此外,它很容易配置为注入到 Spring Batch Step
中:
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
Additional Properties
由于在 Java 中打开光标有多种选择,因此JdbcCustorItemReader
上可以设置许多属性:
表 6.2. JdbcCursorItemReader 属性
ignoreWarnings | 确定是否记录 SQLWarning 或导致异常-默认为 true |
fetchSize | 当ItemReader 所使用的ResultSet 对象需要更多行时,向 Jdbc 驱动程序提示有关应从数据库中获取的行数。默认情况下,不给出任何提示。 |
maxRows | 设置基础ResultSet 一次可以容纳的最大行数限制。 |
queryTimeout | 将驱动程序 awaitStatement 对象执行的秒数设置为给定的秒数。如果超出限制,则抛出DataAccessEception 。 (有关详细信息,请咨询您的驱动程序供应商文档)。 |
verifyCursorPosition | 由于ItemReader 持有的ResultSet 被传递给RowMapper ,因此用户可以自己调用ResultSet.next (),这可能会导致读取器的内部计数出现问题。如果将RowMapper 调用之后的光标位置与之前的位置不同,则将此值设置为 true 将引发异常。 |
saveState | 指示是否将读取器的状态保存在ItemStream#update (ExecutionContext )提供的ExecutionContext 中。默认值为 true。 |
driverSupportsAbsolute | 默认为 false。指示 Jdbc 驱动程序是否支持在ResultSet 上设置绝对行。对于支持ResultSet.absolute ()的 Jdbc 驱动程序,建议将其设置为 true,因为它可以提高性能,特别是如果在处理大型数据集时某个步骤失败时。 |
setUseSharedExtendedConnection | 默认为 false。指示用于游标的连接是否应由所有其他处理使用,从而共享同一事务。如果将其设置为 false(这是默认值),则游标将使用其自己的连接打开,并且将不参与在其余步骤处理中启动的任何事务。如果将此标志设置为 true,则必须将DataSource 包裹在ExtendedConnectionDataSourceProxy 中,以防止每次提交后关闭和释放连接。当将此选项设置为 true 时,将同时使用'READ_ONLY'和'HOLD_CUSORS_OVER_COMMIT'选项创建用于打开游标的语句。这样就可以使游标在事务开始时保持打开状态,并在步骤处理中执行提交。要使用此功能,您需要一个支持此功能的数据库以及一个支持 Jdbc 3.0 或更高版本的 Jdbc 驱动程序。 |
HibernateCursorItemReader
就像普通的 Spring 用户在决定是否使用 ORM 解决方案(这会影响他们使用JdbcTemplate
还是HibernateTemplate
)上做出重要决定一样,Spring Batch 用户也具有相同的选择。 HibernateCursorItemReader
是光标技术的 Hibernate 实现。 Hibernate 的批量使用方式一直存在争议。这主要是因为 Hibernate 最初是为支持在线应用程序样式而开发的。但是,这并不意味着它不能用于批处理。解决此问题的最简单方法是使用StatelessSession
而不是标准会话。这消除了休眠使用的所有缓存和脏检查,这些检查可能会在批处理方案中引起问题。有关 Stateless 和正常休眠会话之间差异的更多信息,请参阅特定休眠版本的文档。 HibernateCursorItemReader
允许您声明 HQL 语句并传递SessionFactory
,这将以与JdbcCursorItemReader
相同的基本方式将每次调用将一项返回给read
。以下是使用与 JDBCReader 相同的“Client 信用”示例的示例配置:
HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close(executionContext);
假设已为 Customer 表正确创建了休眠 Map 文件,此配置的ItemReader
将以与JdbcCursorItemReader
所述完全相同的方式返回CustomerCredit
个对象。 “ useStatelessSession”属性默认为 true,但已在此处添加,以引起人们注意打开或关闭该属性的能力。还值得注意的是,可以通过 setFetchSize 属性设置基础游标的 fetchSize。与JdbcCursorItemReader
一样,配置非常简单:
<bean id="itemReader"
class="org.springframework.batch.item.database.HibernateCursorItemReader">
<property name="sessionFactory" ref="sessionFactory" />
<property name="queryString" value="from CustomerCredit" />
</bean>
StoredProcedureItemReader
有时有必要使用存储过程获取游标数据。 StoredProcedureItemReader
的工作方式与JdbcCursorItemReader
相似,不同之处在于,我们不执行查询来获取游标,而是执行存储过程来返回游标。存储过程可以通过三种不同的方式返回游标:
-
作为返回的 ResultSet(由 SQL Server,Sybase,DB2,Derby 和 MySQL 使用)
-
作为作为 out 参数返回的参考光标(由 Oracle 和 PostgreSQL 使用)
-
作为存储函数调用的返回值
以下是使用与之前相同的“Client 信用”示例的基本示例配置:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
本示例依赖于存储过程来提供 ResultSet 作为返回结果(上述选项 1)。
如果存储过程返回了一个 ref-cursor(选项 2),那么我们将需要提供 out 参数的位置,即返回的 ref-cursor。这是一个示例,其中第一个参数是返回的参考光标:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
如果游标是从存储的函数(选项 3)返回的,则需要将属性“ function
”设置为true
。默认为false
。Watch 起来像这样:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="function" value="true"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
在所有这些情况下,我们都需要定义RowMapper
以及DataSource
以及实际过程名称。
如果存储过程或函数接受参数,则必须通过 parameters 属性声明和设置它们。这是一个声明三个参数的 Oracle 示例。第一个是返回参量的 out 参数,第二个和第三个是采用 INTEGER 类型值的 in 参数:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="spring.cursor_func"/>
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="newid"/>
<constructor-arg index="1">
<util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="amount"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="custid"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper" ref="rowMapper"/>
<property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>
除了参数声明外,我们还需要指定一个PreparedStatementSetter
实现,以设置调用的参数值。这与上面的JdbcCursorItemReader
相同。 称为“其他属性”的部分中列出的所有其他属性也适用于StoredProcedureItemReader
。
6.9.2 分页 ItemReader
使用数据库游标的另一种方法是执行多个查询,其中每个查询都带回一部分结果。我们将此部分称为页面。每个执行的查询必须指定起始行号和我们要为页面返回的行数。
JdbcPagingItemReader
寻呼ItemReader
的一种实现是JdbcPagingItemReader
。 JdbcPagingItemReader
需要一个PagingQueryProvider
负责提供用于检索组成页面的行的 SQL 查询。由于每个数据库都有其提供分页支持的策略,因此我们需要为每种受支持的数据库类型使用不同的PagingQueryProvider
。还有SqlPagingQueryProviderFactoryBean
可以自动检测正在使用的数据库并确定适当的PagingQueryProvider
实现。这简化了配置,是推荐的最佳实践。
SqlPagingQueryProviderFactoryBean
要求您指定一个 select 子句和一个 from 子句。您还可以提供可选的 where 子句。这些子句将用于构建与所需 sortKey 组合的 SQL 语句。
Note
重要的是在 sortKey 上具有唯一的键约束,以确保两次执行之间不会丢失任何数据。
打开 Reader 后,它将以与任何其他ItemReader
相同的基本方式将每个调用中的一项传递回read
。当需要其他行时,分页将在幕后进行。
以下是使用与上面的基于光标的 ItemReader 类似的“Client 信用”示例的示例配置:
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="queryProvider">
<bean class="org.spr...SqlPagingQueryProviderFactoryBean">
<property name="selectClause" value="select id, name, credit"/>
<property name="fromClause" value="from customer"/>
<property name="whereClause" value="where status=:status"/>
<property name="sortKey" value="id"/>
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="status" value="NEW"/>
</map>
</property>
<property name="pageSize" value="1000"/>
<property name="rowMapper" ref="customerMapper"/>
</bean>
此配置的ItemReader
将使用必须指定的RowMapper
返回CustomerCredit
个对象。 “ pageSize”属性确定每次查询执行时从数据库读取的实体数。
'parameterValues'属性可用于指定查询的参数值 Map。如果在 where 子句中使用命名参数,则每个条目的键应与命名参数的名称匹配。如果使用传统的“?”占位符,则每个条目的键应为占位符的编号,从 1 开始。
JpaPagingItemReader
寻呼ItemReader
的另一种实现是JpaPagingItemReader
。 JPA 没有与 Hibernate StatelessSession
类似的概念,因此我们必须使用 JPA 规范提供的其他功能。由于 JPA 支持分页,因此在使用 JPA 进行批处理时,这是自然的选择。读取每个页面后,实体将被分离,并且持久性上下文将被清除,以允许在处理页面后对实体进行垃圾回收。
JpaPagingItemReader
允许您声明 JPQL 语句并传递EntityManagerFactory
。然后,它将以与任何其他ItemReader
相同的基本方式将每次调用将一项返回给read
。当需要其他实体时,分页发生在幕后。以下是使用与上述 JDBCReader 相同的“Client 信用”示例的示例配置:
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
<property name="queryString" value="select c from CustomerCredit c"/>
<property name="pageSize" value="1000"/>
</bean>
假设 Customer 对象具有正确的 JPA 注解或 ORMMap 文件,此配置的ItemReader
将以与上面JdbcPagingItemReader
所述完全相同的方式返回CustomerCredit
个对象。 “ pageSize”属性确定每次查询执行时从数据库读取的实体数。
IbatisPagingItemReader
Note
从 Spring Batch 3.0 开始不推荐使用该 Reader。
如果使用 IBATIS 进行数据访问,则可以使用IbatisPagingItemReader
,顾名思义,它是分页ItemReader
的实现。 IBATIS 不直接支持读取页面中的行,但是通过提供几个标准变量,您可以为 IBATIS 查询添加分页支持。
这是上面示例中的IbatisPagingItemReader
读取 CustomerCredits 的配置示例:
<bean id="itemReader" class="org.spr...IbatisPagingItemReader">
<property name="sqlMapClient" ref="sqlMapClient"/>
<property name="queryId" value="getPagedCustomerCredits"/>
<property name="pageSize" value="1000"/>
</bean>
上面的IbatisPagingItemReader
配置引用了一个名为“ getPagedCustomerCredits”的 IBATIS 查询。这是有关 MySQL 查询的示例。
<select id="getPagedCustomerCredits" resultMap="customerCreditResult">
select id, name, credit from customer order by id asc LIMIT #_skiprows#, #_pagesize#
</select>
_skiprows
和_pagesize
变量由IbatisPagingItemReader
提供,并且如有必要,还可以使用_page
变量。分页查询的语法因所使用的数据库而异。这是 Oracle 的示例(不幸的是,我们需要对某些运算符使用 CDATA,因为它属于 XML 文档):
<select id="getPagedCustomerCredits" resultMap="customerCreditResult">
select * from (
select * from (
select t.id, t.name, t.credit, ROWNUM ROWNUM_ from customer t order by id
)) where ROWNUM_ <![CDATA[ > ]]> ( #_page# * #_pagesize# )
) where ROWNUM <![CDATA[ <= ]]> #_pagesize#
</select>
6.9.3 数据库 ItemWriters
虽然平面文件和 XML 都有特定的 ItemWriter,但在数据库世界中没有确切的等效项。这是因为事务提供了所需的所有功能。 ItemWriters 对于文件来说是必需的,因为它们必须像处理事务一样工作,跟踪已写入的 Item 并在适当的时间进行刷新或清除。数据库不需要此功能,因为写入已包含在事务中。用户可以创建自己的 DAO 来实现ItemWriter
接口,也可以使用自定义ItemWriter
中的 DAO 来处理通用的处理问题,无论哪种方式,他们都应该可以正常工作。需要注意的一件事是批处理输出所提供的性能和错误处理功能。这在将 hibernate 用作ItemWriter
时最常见,但在使用 Jdbc 批处理模式时可能会有相同的问题。假设我们要小心刷新并且数据中没有错误,则批处理数据库输出没有任何固有的缺陷。但是,写出时发生的任何错误都可能引起混乱,因为无法知道哪个单个 Item 导致了异常,或者甚至没有任何单个 Item 负责,如下所示:
如果项在被写出之前被缓冲,则遇到的任何错误都不会被抛出,直到在提交之前刷新缓冲区为止。例如,假设每个块将写入 20 个 Item,第 15 个 Item 将引发 DataIntegrityViolationException。就“步骤”而言,所有 20 个 Item 都将成功写出,因为在实际写出之前没有办法知道会发生错误。调用Session#
flush
()后,缓冲区将被清空,并且将触发异常。此时,Step
无能为力,必须回滚该事务。通常,此异常可能导致 Item 被跳过(取决于跳过/重试策略),然后将不会再次将其写出。但是,在批处理方案中,无法知道是哪个 Item 导致了问题,故障发生时整个缓冲区都被写了出来。解决此问题的唯一方法是在每个 Item 之后冲洗:
这是一个常见的用例,尤其是在使用 Hibernate 时,实现ItemWriter
的简单准则是在每次调用write()
时刷新。这样做可以使 Item 可靠地被跳过,Spring Batch 在内部会在发生错误后对ItemWriter
的调用粒度进行内部维护。
6.10 重用现有服务
批处理系统通常与其他应用程序样式结合使用。最常见的是在线系统,但它也可以通过移动每种应用程序样式使用的必要批量数据来支持集成甚至胖 Client 端应用程序。因此,许多用户通常都想在其批处理作业中重用现有的 DAO 或其他服务。通过允许注入任何必需的类,Spring 容器本身使此操作相当容易。但是,在某些情况下,现有服务需要充当ItemReader
或ItemWriter
,以满足另一个 Spring Batch 类的依赖关系,或者因为它确实是某个步骤的主要ItemReader
。为每个需要包装的服务编写一个适配器类很简单,但是由于这是一个普遍的问题,Spring Batch 提供了ItemReaderAdapter
和ItemWriterAdapter
的实现。这两个类都实现了调用委托模式的标准 Spring 方法,并且设置起来非常简单。以下是 Reader 的示例:
<bean id="itemReader" class="org.springframework.batch.item.adapter.ItemReaderAdapter">
<property name="targetObject" ref="fooService" />
<property name="targetMethod" value="generateFoo" />
</bean>
<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />
需要注意的重要一点是 targetMethod 的协定必须与read
的协定相同:用尽时它将返回 null,否则返回Object
。根据ItemWriter
的实现,其他任何因素都将阻止框架知道处理何时结束,从而导致无限循环或错误失败。 ItemWriter
实现同样简单:
<bean id="itemWriter" class="org.springframework.batch.item.adapter.ItemWriterAdapter">
<property name="targetObject" ref="fooService" />
<property name="targetMethod" value="processFoo" />
</bean>
<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />
6.11 验证 Importing
在本章的过程中,讨论了多种解析 Importing 的方法。如果每个主要实现的格式都不正确,则将引发异常。如果缺少一系列数据,FixedLengthTokenizer
将引发异常。同样,尝试访问FieldSetMapper
的RowMapper
中不存在的索引或格式与预期的格式不同的索引将导致引发异常。所有这些类型的异常都将在read
返回之前引发。但是,它们无法解决退回商品是否有效的问题。例如,如果字段之一是年龄,则显然不能为负。它将正确解析,因为它已经存在并且是一个数字,但是不会引起异常。由于已经存在大量的 Validation 框架,因此 Spring Batch 不会尝试提供另一个框架,而是提供了一个非常简单的接口,可以由许多框架实现:
public interface Validator {
void validate(Object value) throws ValidationException;
}
约定是,如果对象无效,则validate
方法将引发异常,如果有效,则正常返回。 Spring Batch 提供了开箱即用的ItemProcessor:
<bean class="org.springframework.batch.item.validator.ValidatingItemProcessor">
<property name="validator" ref="validator" />
</bean>
<bean id="validator"
class="org.springframework.batch.item.validator.SpringValidator">
<property name="validator">
<bean id="orderValidator"
class="org.springmodules.validation.valang.ValangValidator">
<property name="valang">
<value>
<![CDATA[
{ orderId : ? > 0 AND ? <= 9999999999 : 'Incorrect order ID' : 'error.order.id' }
{ totalLines : ? = size(lineItems) : 'Bad count of order lines'
: 'error.order.lines.badcount'}
{ customer.registered : customer.businessCustomer = FALSE OR ? = TRUE
: 'Business customer must be registered'
: 'error.customer.registration'}
{ customer.companyName : customer.businessCustomer = FALSE OR ? HAS TEXT
: 'Company name for business customer is mandatory'
:'error.customer.companyname'}
]]>
</value>
</property>
</bean>
</property>
</bean>
这个简单的示例显示了一个简单的ValangValidator
,用于验证订单对象。目的不是为了展示 Valang 功能,而是为了展示如何添加验证器。
6.12 防止状态持久化
默认情况下,所有ItemReader
和ItemWriter
实现都在提交前将其当前状态存储在ExecutionContext
中。但是,这可能并不总是所需的行为。例如,许多开发人员选择使用过程指示器来使其数据库读取器“可重新运行”。在 Importing 数据中添加了一个额外的列,以指示是否已对其进行处理。当读取(或写出)特定记录时,已处理标志将从 false 翻转为 true。然后,SQL 语句可以在 where 子句中包含一个额外的语句,例如“ where PROCESSED_IND = false”,从而确保在重新启动的情况下仅返回未处理的记录。在这种情况下,最好不要存储任何状态,例如当前行号,因为它在重启时将是无关紧要的。因此,所有 Reader 和 Writer 都包含“ saveState”属性:
<bean id="playerSummarizationSource" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource" />
<property name="rowMapper">
<bean class="org.springframework.batch.sample.PlayerSummaryMapper" />
</property>
<property name="saveState" value="false" />
<property name="sql">
<value>
SELECT games.player_id, games.year_no, SUM(COMPLETES),
SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),
SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),
SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)
from games, players where players.player_id =
games.player_id group by games.player_id, games.year_no
</value>
</property>
</bean>
上面配置的ItemReader
不会对其参与的执行在ExecutionContext
中进行任何 Importing。
6.13 创建自定义 ItemReader 和 ItemWriters
到目前为止,本章已经讨论了 Spring Batch 中存在的用于读写的基本协定以及一些常见的实现。但是,这些都是相当通用的,开箱即用的实现可能无法涵盖很多潜在的场景。本节将通过一个简单的示例说明如何创建自定义的ItemReader
和ItemWriter
实现并正确实现其 Contract。 ItemReader
还将实现ItemStream
,以说明如何使读取器或写入器可重新启动。
6.13.1 自定义 ItemReader 示例
就本示例而言,将创建一个从提供的列表中读取的简单ItemReader
实现。我们将从实现ItemReader
,read
的最基本 Contract 开始:
public class CustomItemReader<T> implements ItemReader<T>{
List<T> items;
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
NoWorkFoundException, ParseException {
if (!items.isEmpty()) {
return items.remove(0);
}
return null;
}
}
这个非常简单的类获取一个 Item 列表,然后一次返回一个 Item,将其从列表中删除。当列表为空时,它返回 null,从而满足ItemReader
的最基本要求,如下所示:
List<String> items = new ArrayList<String>();
items.add("1");
items.add("2");
items.add("3");
ItemReader itemReader = new CustomItemReader<String>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
使 ItemReader 可重新启动
现在的最后一个挑战是使ItemReader
可重新启动。当前,如果断电,并且处理再次开始,则ItemReader
必须从头开始。这实际上在许多情况下都是有效的,但有时最好在批处理作业从它停止的地方开始。关键的区别通常是 Reader 是有状态的还是 Stateless 的。Stateless 读取器无需担心可重新启动性,但是有状态读取器必须尝试重新启动时重新构造其最后一个已知状态。因此,我们建议您尽可能使自定义 Reader 保持 Stateless,因此您不必担心可重新启动性。
如果确实需要存储状态,则应使用ItemStream
接口:
public class CustomItemReader<T> implements ItemReader<T>, ItemStream {
List<T> items;
int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
ParseException {
if (currentIndex < items.size()) {
return items.get(currentIndex++);
}
return null;
}
public void open(ExecutionContext executionContext) throws ItemStreamException {
if(executionContext.containsKey(CURRENT_INDEX)){
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
}
else{
currentIndex = 0;
}
}
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}
public void close() throws ItemStreamException {}
}
每次调用ItemStream
update
方法时,ItemReader
的当前索引将通过键“ current.index”存储在提供的ExecutionContext
中。调用ItemStream
open
方法时,将检查ExecutionContext
以查 Watch 其是否包含具有该键的条目。如果找到该键,则当前索引将移动到该位置。这是一个非常琐碎的示例,但仍然符合一般约定:
ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);
List<String> items = new ArrayList<String>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<String>(items);
((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());
大多数 ItemReader 具有更复杂的重启逻辑。例如JdbcCursorItemReader
,将最后处理的行的行 ID 存储在游标中。
还值得注意的是,ExecutionContext
中使用的密钥不应太小。这是因为Step
中的所有ItemStream
使用相同的ExecutionContext
。在大多数情况下,只需在键之前加上类名就足以保证唯一性。但是,在极少数情况下,在同一步骤中使用两个相同类型的ItemStream
(如果需要输出两个文件,可能会发生这种情况),那么将需要一个更唯一的名称。因此,许多 Spring Batch ItemReader
和ItemWriter
实现都具有setName
()属性,该属性允许覆盖此键名。
6.13.2 自定义 ItemWriter 示例
实现自定义ItemWriter
的方式与上述ItemReader
的示例在很多方面都相似,但是在足以保证其自己的示例方面有很多不同。但是,添加可重新启动性本质上是相同的,因此在本示例中将不涉及它。与ItemReader
示例一样,将使用List
来使示例尽可能简单:
public class CustomItemWriter<T> implements ItemWriter<T> {
List<T> output = TransactionAwareProxyFactory.createTransactionalList();
public void write(List<? extends T> items) throws Exception {
output.addAll(items);
}
public List<T> getOutput() {
return output;
}
}
使 ItemWriter 可重新启动
为了使 ItemWriter 可重新启动,我们将遵循与ItemReader
相同的过程,添加并实现ItemStream
接口以同步执行上下文。在该示例中,我们可能必须计算已处理的 Item 数,并将其添加为页脚记录。如果需要这样做,则可以在ItemWriter
中实现ItemStream
,以便在重新打开流时从执行上下文重新构造计数器。
在许多实际情况下,自定义 ItemWriters 还会委派给另一个本身可重新启动的编写器(例如,在写入文件时),否则它会写入事务性资源,因此不需要重新启动,因为它是 Stateless 的。当您有状态 Writer 时,您可能还应该确保实现ItemStream
和ItemWriter
。还请记住,Writer 的 Client 端需要了解ItemStream
,因此您可能需要将其作为流注册到配置 xml 中。