On this page
1. ItemReaders 和 ItemWriters
所有批处理都可以以其最简单的形式描述为读取大量数据,执行某种类型的计算或转换并写出结果。 Spring Batch 提供了三个关键接口来帮助执行批量读取和写入:ItemReader
,ItemProcessor
和ItemWriter
。
1.1. ItemReader
尽管是一个简单的概念,但ItemReader
是用于从许多不同类型的 Importing 中提供数据的方法。最一般的示例包括:
平面文件:平面文件 Item 读取器从平面文件中读取数据行,该文件通常描述记录,这些记录的数据字段由文件中的固定位置定义或由某些特殊字符(例如逗号)分隔。
XML:XML
ItemReaders
独立于用于解析,Map 和验证对象的技术来处理 XML。Importing 数据允许根据 XSD 模式验证 XML 文件。数据库:访问数据库资源以返回结果集,该结果集可以 Map 到对象以进行处理。默认的 SQL
ItemReader
实现调用RowMapper
来返回对象,如果需要重新启动,则跟踪当前行,存储基本统计信息,并提供一些事务增强功能,这些将在后面进行说明。
还有更多的可能性,但在本章中我们将重点介绍基本的可能性。 Appendix A中可以找到所有可用的ItemReader
实现的完整列表。
ItemReader
是通用 Importing 操作的基本接口,如以下接口定义所示:
public interface ItemReader<T> {
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
read
方法定义ItemReader
的最基本 Contract。调用它会返回一个 Item;如果没有更多 Item,则返回null
。一个 Item 可能代表文件中的一行,数据库中的一行或 XML 文件中的元素。通常希望将它们 Map 到可用的域对象(例如Trade
,Foo
或其他),但是 Contract 中没有要求这样做。
预期ItemReader
接口的实现仅向前。但是,如果基础资源是事务性的(例如 JMS 队列),则在回滚场景中,调用read
可能会在后续调用中返回相同的逻辑项。还值得注意的是,ItemReader
缺少要处理的 Item 不会导致引发异常。例如,配置有返回 0 结果的查询的数据库ItemReader
在第一次调用 read 时将返回null
。
1.2. ItemWriter
ItemWriter
在功能上与ItemReader
类似,但是具有相反的运算。资源仍然需要定位,打开和关闭,但是它们的区别在于ItemWriter
是写出而不是读入。对于数据库或队列,这些操作可能是插入,更新或发送。输出序列化的格式特定于每个批处理作业。
与ItemReader
一样,ItemWriter
是一个相当通用的接口,如以下接口定义所示:
public interface ItemWriter<T> {
void write(List<? extends T> items) throws Exception;
}
与ItemReader
上的read
一样,write
提供ItemWriter
的基本 Contract。只要打开,它就会尝试写出传入 Item 的列表。因为通常期望将 Item“分批”在一起,然后输出,所以接口接受 Item 列表,而不是 Item 本身。写入列表后,可以执行任何必要的刷新操作,然后再从 write 方法返回。例如,如果写入一个 Hibernate DAO,则可以进行多个写入操作,每个 Item 一个。然后,Writer 可以在休眠会话上调用flush
,然后再返回。
1.3. ItemProcessor
ItemReader
和ItemWriter
接口对于它们的特定任务都非常有用,但是如果要在编写之前插入业务逻辑怎么办?读写的一种选择是使用复合模式:创建包含另一个ItemWriter
的ItemWriter
或包含另一个ItemReader
的ItemReader
。以下代码显示了一个示例:
public class CompositeItemWriter<T> implements ItemWriter<T> {
ItemWriter<T> itemWriter;
public CompositeItemWriter(ItemWriter<T> itemWriter) {
this.itemWriter = itemWriter;
}
public void write(List<? extends T> items) throws Exception {
//Add business logic here
itemWriter.write(items);
}
public void setDelegate(ItemWriter<T> itemWriter){
this.itemWriter = itemWriter;
}
}
上一类包含另一个ItemWriter
,它在提供了一些业务逻辑后将其委托给该ItemWriter
。该模式也可以很容易地用于ItemReader
,也许可以基于主ItemReader
提供的 Importing 来获取更多参考数据。如果您需要自己控制对write
的调用,它也很有用。但是,如果您只想在实际写入之前“转换”传递给写入的 Item,则无需自己write
。您可以只修改 Item。对于这种情况,Spring Batch 提供了ItemProcessor
接口,如以下接口定义所示:
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
ItemProcessor
很简单。给定一个对象,对其进行转换,然后返回另一个。提供的对象可以是或可以不是相同的类型。关键是可以在流程中应用业务逻辑,并且完全由开发人员来创建该逻辑。 ItemProcessor
可以直接连接到步骤中。例如,假设ItemReader
提供了Foo
类型的类,并且在将其写出之前需要将其转换为Bar
类型。以下示例显示了执行转换的ItemProcessor
:
public class Foo {}
public class Bar {
public Bar(Foo foo) {}
}
public class FooProcessor implements ItemProcessor<Foo,Bar>{
public Bar process(Foo foo) throws Exception {
//Perform simple transformation, convert a Foo to a Bar
return new Bar(foo);
}
}
public class BarWriter implements ItemWriter<Bar>{
public void write(List<? extends Bar> bars) throws Exception {
//write bars
}
}
在前面的示例中,存在一个类Foo
,一个类Bar
和一个FooProcessor
,它们坚持ItemProcessor
接口。转换很简单,但是任何类型的转换都可以在这里完成。 BarWriter
写入Bar
对象,如果提供任何其他类型,则抛出异常。同样,如果提供了Foo
以外的内容,则FooProcessor
引发异常。然后可以将FooProcessor
注入到Step
中,如以下示例所示:
XML Configuration
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="fooProcessor" writer="barWriter"
commit-interval="2"/>
</tasklet>
</step>
</job>
Java Configuration
@Bean
public Job ioSampleJob() {
return this.jobBuilderFactory.get("ioSampleJOb")
.start(step1())
.end()
.build();
}
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(fooReader())
.processor(fooProcessor())
.writer(barWriter())
.build();
}
1.3.1. 链接项处理器
执行单个转换在许多情况下很有用,但是如果要将多个ItemProcessor
实现“链接”在一起怎么办?这可以使用前面提到的复合图案来完成。要更新先前的单个转换,例如Foo
转换为Bar
,然后转换为Foobar
并写出,如以下示例所示:
public class Foo {}
public class Bar {
public Bar(Foo foo) {}
}
public class Foobar {
public Foobar(Bar bar) {}
}
public class FooProcessor implements ItemProcessor<Foo,Bar>{
public Bar process(Foo foo) throws Exception {
//Perform simple transformation, convert a Foo to a Bar
return new Bar(foo);
}
}
public class BarProcessor implements ItemProcessor<Bar,Foobar>{
public Foobar process(Bar bar) throws Exception {
return new Foobar(bar);
}
}
public class FoobarWriter implements ItemWriter<Foobar>{
public void write(List<? extends Foobar> items) throws Exception {
//write items
}
}
FooProcessor
和BarProcessor
可以“链接”在一起以得到结果Foobar
,如以下示例所示:
CompositeItemProcessor<Foo,Foobar> compositeProcessor =
new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooTransformer());
itemProcessors.add(new BarTransformer());
compositeProcessor.setDelegates(itemProcessors);
与前面的示例一样,可以将复合处理器配置为Step
:
XML Configuration
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="compositeItemProcessor" writer="foobarWriter"
commit-interval="2"/>
</tasklet>
</step>
</job>
<bean id="compositeItemProcessor"
class="org.springframework.batch.item.support.CompositeItemProcessor">
<property name="delegates">
<list>
<bean class="..FooProcessor" />
<bean class="..BarProcessor" />
</list>
</property>
</bean>
Java Configuration
@Bean
public Job ioSampleJob() {
return this.jobBuilderFactory.get("ioSampleJob")
.start(step1())
.end()
.build();
}
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(fooReader())
.processor(compositeProcessor())
.writer(foobarWriter())
.build();
}
@Bean
public CompositeItemProcessor compositeProcessor() {
List<ItemProcessor> delegates = new ArrayList<>(2);
delegates.add(new FooProcessor());
delegates.add(new BarProcessor());
CompositeItemProcessor processor = new CompositeItemProcessor();
processor.setDelegates(delegates);
return processor;
}
1.3.2. 筛选记录
Item 处理器的一种典型用法是在将记录传递给ItemWriter
之前过滤掉记录。过滤是不同于跳过的动作。跳过表示一条记录无效,而过滤只是表明不应写入一条记录。
例如,考虑一个批处理作业,该作业读取一个包含三种不同类型记录的文件:要插入的记录,要更新的记录和要删除的记录。如果系统不支持删除记录,则我们不希望将任何“删除”记录发送到ItemWriter
。但是,由于这些记录实际上并不是不良记录,因此我们希望将其过滤掉而不是跳过它们。结果,ItemWriter
将仅接收“插入”和“更新”记录。
要过滤记录,您可以从ItemProcessor
返回null
。框架检测到结果为null
,并避免将该 Item 添加到传递给ItemWriter
的记录列表中。与往常一样,从ItemProcessor
引发的异常导致跳过。
1.3.3. 容错能力
回滚块时,可能会重新处理读取期间已缓存的 Item。如果将某个步骤配置为容错的(通常通过使用跳过或重试处理),则所使用的任何ItemProcessor
都应以幂等的方式实现。通常,这包括对ItemProcessor
的 Importing 项不执行任何更改,仅更新作为结果的实例。
1.4. ItemStream
ItemReaders
和ItemWriters
都可以很好地满足其各自的目的,但是它们之间存在一个共同的问题,即需要另一个接口。通常,作为批处理作业范围的一部分,需要打开,关闭读取器和写入器,并需要一种持久化状态的机制。 ItemStream
接口可达到此目的,如以下示例所示:
public interface ItemStream {
void open(ExecutionContext executionContext) throws ItemStreamException;
void update(ExecutionContext executionContext) throws ItemStreamException;
void close() throws ItemStreamException;
}
在描述每种方法之前,我们应该提到ExecutionContext
。同样实现ItemStream
的ItemReader
的 Client 端应在对read
的任何调用之前先调用open
,以打开文件等任何资源或获得连接。类似的限制适用于实现ItemStream
的ItemWriter
。如第 2 章所述,如果在ExecutionContext
中找到了预期的数据,则可以使用它在初始状态以外的位置启动ItemReader
或ItemWriter
。相反,调用close
以确保安全释放打开期间分配的所有资源。调用update
主要是为了确保当前保持的任何状态都已加载到提供的ExecutionContext
中。在提交之前调用此方法,以确保在提交之前将当前状态保留在数据库中。
在ItemStream
的 Client 端是Step
(来自 Spring Batch Core)的特殊情况下,将为每个 StepExecution 创建一个ExecutionContext
,以允许用户存储特定执行的状态,并期望如果相同则返回该状态。 JobInstance
重新开始。对于那些熟悉 Quartz 的人来说,其语义与 Quartz JobDataMap
非常相似。
1.5. 委托模式和步骤注册
请注意,CompositeItemWriter
是委派模式的示例,在 Spring Batch 中很常见。委托本身可以实现回调接口,例如StepListener
。如果这样做了,并且它们与 Spring Batch Core 一起作为Job
中Step
的一部分使用,那么几乎可以肯定他们需要在Step
中手动注册。如果直接实现ItemStream
或StepListener
接口,则直接连接到Step
的读取器,写入器或处理器会自动注册。但是,由于Step
未知委托,因此需要将它们作为侦听器或流(或在适当时将两者同时注入)注入,如以下示例所示:
XML Configuration
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="fooProcessor" writer="compositeItemWriter"
commit-interval="2">
<streams>
<stream ref="barWriter" />
</streams>
</chunk>
</tasklet>
</step>
</job>
<bean id="compositeItemWriter" class="...CustomCompositeItemWriter">
<property name="delegate" ref="barWriter" />
</bean>
<bean id="barWriter" class="...BarWriter" />
Java Configuration
@Bean
public Job ioSampleJob() {
return this.jobBuilderFactory.get("ioSampleJob")
.start(step1())
.end()
.build();
}
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(fooReader())
.processor(fooProcessor())
.writer(compositeItemWriter())
.stream(barWriter())
.build();
}
@Bean
public CustomCompositeItemWriter compositeItemWriter() {
CustomCompositeItemWriter writer = new CustomCompositeItemWriter();
writer.setDelegate(barWriter());
return writer;
}
@Bean
public BarWriter barWriter() {
return new BarWriter();
}
1.6. 平面文件
交换批量数据的最常见机制之一一直是平面文件。与 XML 具有定义其结构化(XSD)的公认标准不同,任何阅读平面文件的人都必须提前了解文件的结构。通常,所有平面文件都分为两种:定界文件和定长文件。分隔文件是指用逗号分隔分隔符的字段。固定长度文件具有设置长度的字段。
1.6.1. FieldSet
在 Spring Batch 中使用平面文件时,无论是用于 Importing 还是输出,最重要的类之一是FieldSet
。许多体系结构和库都包含用于帮助您从文件读入的抽象,但是它们通常返回String
或String
对象的数组。这真的只会让您半途而废。 FieldSet
是 Spring Batch 的抽象,用于启用文件资源中字段的绑定。它使开发人员可以像处理数据库 Importing 一样使用文件 Importing。 FieldSet
在概念上与 JDBC ResultSet
类似。 FieldSet
仅需要一个参数:String
令牌数组。 (可选)您还可以配置字段的名称,以便可以按ResultSet
之后的模式通过索引或名称来访问字段,如以下示例所示:
String[] tokens = new String[]{"foo", "1", "true"};
FieldSet fs = new DefaultFieldSet(tokens);
String name = fs.readString(0);
int value = fs.readInt(1);
boolean booleanValue = fs.readBoolean(2);
FieldSet
界面上还有更多选项,例如Date
,long,BigDecimal
等等。 FieldSet
的最大优点是它提供了平面文件 Importing 的一致解析。在处理由格式异常引起的错误或进行简单的数据转换时,它可以保持一致,而不是使每个批处理作业以潜在的意外方式进行不同的解析。
1.6.2. FlatFileItemReader
平面文件是最多包含二维(表格)数据的任何类型的文件。名为FlatFileItemReader
的类有助于在 Spring Batch 框架中读取平面文件,该类提供了用于读取和解析平面文件的基本功能。 FlatFileItemReader
的两个最重要的必需依存关系是Resource
和LineMapper
。 LineMapper
接口将在下一部分中进行详细介绍。 resource 属性表示一个 Spring Core Resource
。可以在Spring 框架,第 5 章。资源中找到说明如何创建此类 bean 的文档。因此,除了显示以下简单示例之外,本指南不介绍创建Resource
对象的详细信息:
Resource resource = new FileSystemResource("resources/trades.csv");
在复杂的批处理环境中,目录结构通常由 EAI 基础结构 Management,在 EAI 基础结构中,构建了用于外部接口的放置区,以将文件从 FTP 位置移动到批处理位置,反之亦然。文件移动 Util 超出了 Spring Batch 体系结构的范围,但是批处理作业流中包括文件移动 Util 作为作业流中的步骤并不少见。批处理体系结构只需要知道如何找到要处理的文件。 Spring Batch 从此起点开始将数据馈入管道的过程。但是,Spring Integration提供了许多此类服务。
FlatFileItemReader
中的其他属性使您可以进一步指定数据的解释方式,如下表所述:
表 1. FlatFileItemReader
属性
Property | Type | Description |
---|---|---|
comments | String[] | 指定指示 Comments 行的行前缀。 |
encoding | String | 指定要使用的文本编码。默认值为Charset.defaultCharset() 。 |
lineMapper | LineMapper |
将表示 Item 的String 转换为Object 。 |
linesToSkip | int | 文件顶部要忽略的行数。 |
recordSeparatorPolicy | RecordSeparatorPolicy | 用于确定行尾的位置,并执行诸如在带引号的字符串中 continue 到行尾的操作。 |
resource | Resource |
从中读取资源。 |
skippedLinesCallback | LineCallbackHandler | 传递要跳过的文件中各行的原始行内容的接口。如果linesToSkip 设置为 2,则此接口将被调用两次。 |
strict | boolean | 在严格模式下,如果 Importing 资源不存在,则读取器将在ExecutionContext 上引发异常。否则,它将记录问题并 continue。 |
LineMapper
与RowMapper
一样,它采用诸如ResultSet
之类的低级构造并返回Object
,平面文件处理需要相同的构造才能将String
行转换为Object
,如以下接口定义所示:
public interface LineMapper<T> {
T mapLine(String line, int lineNumber) throws Exception;
}
基本约定是,给定当前行及其关联的行号,Map 器应返回结果域对象。这类似于RowMapper
,因为每一行都与其行号相关联,就像ResultSet
中的每一行都与其行号绑定一样。这允许将行号绑定到结果域对象,以进行身份比较或提供更多信息。但是,与RowMapper
不同,LineMapper
被赋予了原始行,如上所述,该原始行只会使您到达中间。该行必须标记为FieldSet
,然后可以将其 Map 到一个对象,如本文档后面所述。
LineTokenizer
必须将 Importing 行转换为FieldSet
的抽象,因为可能需要将多种格式的平面文件数据转换为FieldSet
。在 Spring Batch 中,此接口是LineTokenizer
:
public interface LineTokenizer {
FieldSet tokenize(String line);
}
LineTokenizer
的约定是这样的:给定 Importing 行(理论上String
可以包含多条线),则返回代表该行的FieldSet
。然后可以将此FieldSet
传递给FieldSetMapper
。 Spring Batch 包含以下LineTokenizer
实现:
DelimitedLineTokenizer
:用于 Logging 的字段由定界符分隔的文件。最常见的定界符是逗号,但是也经常使用竖线或分号。FixedLengthTokenizer
:用于 Logging 的字段均为“固定宽度”的文件。必须为每种记录类型定义每个字段的宽度。PatternMatchingCompositeLineTokenizer
:通过检查模式,确定应在特定行上使用标记器列表中的哪个LineTokenizer
。
FieldSetMapper
FieldSetMapper
接口定义单个方法mapFieldSet
,该方法采用FieldSet
对象并将其内容 Map 到对象。根据作业的需要,此对象可以是自定义 DTO,域对象或数组。 FieldSetMapper
与LineTokenizer
结合使用,可将一行数据从资源转换为所需类型的对象,如以下接口定义所示:
public interface FieldSetMapper<T> {
T mapFieldSet(FieldSet fieldSet) throws BindException;
}
使用的模式与JdbcTemplate
使用的RowMapper
相同。
DefaultLineMapper
既然已经定义了读取平面文件的基本接口,那么很明显,需要三个基本步骤:
从文件中读取一行。
将
String
行传递到LineTokenizer#tokenize()
方法中以检索FieldSet
。将标记化返回的
FieldSet
传递给FieldSetMapper
,并从ItemReader#read()
方法返回结果。
上述两个接口代表两个单独的任务:将线转换为FieldSet
并将FieldSet
Map 到域对象。因为LineTokenizer
的 Importing 与LineMapper
的 Importing(一条线)匹配,并且FieldSetMapper
的输出与LineMapper
的输出匹配,所以提供了同时使用LineTokenizer
和FieldSetMapper
的默认实现。下列类定义中显示的DefaultLineMapper
表示大多数用户需要的行为:
public class DefaultLineMapper<T> implements LineMapper<>, InitializingBean {
private LineTokenizer tokenizer;
private FieldSetMapper<T> fieldSetMapper;
public T mapLine(String line, int lineNumber) throws Exception {
return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
}
public void setLineTokenizer(LineTokenizer tokenizer) {
this.tokenizer = tokenizer;
}
public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) {
this.fieldSetMapper = fieldSetMapper;
}
}
以上功能是默认实现中提供的,而不是内置于 Reader 本身中(如在框架的先前版本中所做的那样),以使用户在控制解析过程时具有更大的灵 Active,尤其是在需要访问原始行的情况下。
简单分隔文件读取示例
以下示例说明了如何使用实际域方案读取平面文件。这个特定的批处理作业从以下文件中读取足球运动员:
ID,lastName,firstName,position,birthYear,debutYear
"AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996",
"AbduRa00,Abdullah,Rabih,rb,1975,1999",
"AberWa00,Abercrombie,Walter,rb,1959,1982",
"AbraDa00,Abramowicz,Danny,wr,1945,1967",
"AdamBo00,Adams,Bob,te,1946,1969",
"AdamCh00,Adams,Charlie,wr,1979,2003"
该文件的内容 Map 到以下Player
域对象:
public class Player implements Serializable {
private String ID;
private String lastName;
private String firstName;
private String position;
private int birthYear;
private int debutYear;
public String toString() {
return "PLAYER:ID=" + ID + ",Last Name=" + lastName +
",First Name=" + firstName + ",Position=" + position +
",Birth Year=" + birthYear + ",DebutYear=" +
debutYear;
}
// setters and getters...
}
要将FieldSet
Map 到Player
对象,需要定义一个返回玩家的FieldSetMapper
,如以下示例所示:
protected static class PlayerFieldSetMapper implements FieldSetMapper<Player> {
public Player mapFieldSet(FieldSet fieldSet) {
Player player = new Player();
player.setID(fieldSet.readString(0));
player.setLastName(fieldSet.readString(1));
player.setFirstName(fieldSet.readString(2));
player.setPosition(fieldSet.readString(3));
player.setBirthYear(fieldSet.readInt(4));
player.setDebutYear(fieldSet.readInt(5));
return player;
}
}
然后可以通过正确构造FlatFileItemReader
并调用read
来读取文件,如以下示例所示:
FlatFileItemReader<Player> itemReader = new FlatFileItemReader<Player>();
itemReader.setResource(new FileSystemResource("resources/players.csv"));
//DelimitedLineTokenizer defaults to comma as its delimiter
DefaultLineMapper<Player> lineMapper = new DefaultLineMapper<Player>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new PlayerFieldSetMapper());
itemReader.setLineMapper(lineMapper);
itemReader.open(new ExecutionContext());
Player player = itemReader.read();
每次对read
的调用都会从文件的每一行返回一个新的Player
对象。当到达文件末尾时,返回null
。
按名称 Map 字段
DelimitedLineTokenizer
和FixedLengthTokenizer
都允许有一项附加功能,其功能与 JDBC ResultSet
类似。字段的名称可以被注入到这些LineTokenizer
实现中,以提高 Map 函数的可读性。首先,将平面文件中所有字段的列名注入令牌生成器,如以下示例所示:
tokenizer.setNames(new String[] {"ID", "lastName","firstName","position","birthYear","debutYear"});
FieldSetMapper
可以使用以下信息:
public class PlayerMapper implements FieldSetMapper<Player> {
public Player mapFieldSet(FieldSet fs) {
if(fs == null){
return null;
}
Player player = new Player();
player.setID(fs.readString("ID"));
player.setLastName(fs.readString("lastName"));
player.setFirstName(fs.readString("firstName"));
player.setPosition(fs.readString("position"));
player.setDebutYear(fs.readInt("debutYear"));
player.setBirthYear(fs.readInt("birthYear"));
return player;
}
}
将字段集自动 Map 到域对象
对于许多人来说,必须编写特定的FieldSetMapper
与为JdbcTemplate
编写特定的RowMapper
一样麻烦。 Spring Batch 通过提供FieldSetMapper
来简化此过程,该FieldSetMapper
通过使用 JavaBean 规范将字段名称与对象上的设置器进行匹配来自动 Map 字段。再次使用 Football 示例,BeanWrapperFieldSetMapper
配置类似于以下代码片段:
XML Configuration
<bean id="fieldSetMapper"
class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<property name="prototypeBeanName" value="player" />
</bean>
<bean id="player"
class="org.springframework.batch.sample.domain.Player"
scope="prototype" />
Java Configuration
@Bean
public FieldSetMapper fieldSetMapper() {
BeanWrapperFieldSetMapper fieldSetMapper = new BeanWrapperFieldSetMapper();
fieldSetMapper.setPrototypeBeanName("player");
return fieldSetMapper;
}
@Bean
@Scope("prototype")
public Player player() {
return new Player();
}
对于FieldSet
中的每个条目,Map 器都在Player
对象的新实例上寻找对应的 setter(因此,需要原型作用域),就像 Spring 容器寻找与属性名称匹配的 setter 一样。MapFieldSet
中的每个可用字段,并返回生成的Player
对象,而无需代码。
定长文件格式
到目前为止,仅详细讨论了定界文件。但是,它们仅代表文件读取图片的一半。许多使用平面文件的组织都使用固定长度格式。固定长度文件示例如下:
UK21341EAH4121131.11customer1
UK21341EAH4221232.11customer2
UK21341EAH4321333.11customer3
UK21341EAH4421434.11customer4
UK21341EAH4521535.11customer5
虽然这 Watch 起来像一个大字段,但实际上代表了 4 个不同的字段:
ISIN:所 Order 商品的唯一标识符-12 个字符长。
数量:所 Order 商品的数量-3 个字符长。
价格:商品价格-5 个字符长。
Client:Order 商品的 Client 的 ID-9 个字符长。
配置FixedLengthLineTokenizer
时,必须以范围的形式提供每个长度,如以下示例所示:
XML Configuration
<bean id="fixedLengthLineTokenizer"
class="org.springframework.batch.io.file.transform.FixedLengthTokenizer">
<property name="names" value="ISIN,Quantity,Price,Customer" />
<property name="columns" value="1-12, 13-15, 16-20, 21-29" />
</bean>
因为FixedLengthLineTokenizer
使用与上面讨论的相同的LineTokenizer
接口,所以它返回的FieldSet
就像使用了分隔符一样。这允许在处理其输出时使用相同的方法,例如使用BeanWrapperFieldSetMapper
。
Note
要支持上述范围语法,需要在ApplicationContext
中配置专门的属性编辑器RangeArrayPropertyEditor
。但是,此 Bean 是在使用批处理名称空间的ApplicationContext
中自动声明的。
Java Configuration
@Bean
public FixedLengthTokenizer fixedLengthTokenizer() {
FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();
tokenizer.setNames("ISIN", "Quantity", "Price", "Customer");
tokenizer.setColumns(new Range(1-12),
new Range(13-15),
new Range(16-20),
new Range(21-29));
return tokenizer;
}
因为FixedLengthLineTokenizer
使用与上面讨论的相同的LineTokenizer
接口,所以它返回的FieldSet
就像使用了分隔符一样。这样就可以使用相同的方法来处理其输出,例如使用BeanWrapperFieldSetMapper
。
单个文件中的多种记录类型
到目前为止,为简单起见,所有文件读取示例都作了一个关键假设:文件中的所有记录都具有相同的格式。但是,并非总是如此。通常,文件中的记录可能具有不同的格式,需要对其进行不同的标记和 Map 到不同的对象。以下文件摘录对此进行了说明:
USER;Smith;Peter;;T;20014539;F
LINEA;1044391041ABC037.49G201XX1383.12H
LINEB;2134776319DEF422.99M005LI
在此文件中,我们有三种类型的记录:“ USER”,“ LINEA”和“ LINEB”。 “ USER”行对应于User
对象。尽管“ LINEA”比“ LINEB”具有更多信息,但“ LINEA”和“ LINEB”都对应于Line
个对象。
ItemReader
分别读取每一行,但是我们必须指定不同的LineTokenizer
和FieldSetMapper
对象,以便ItemWriter
接收正确的 Item。 PatternMatchingCompositeLineMapper
允许配置模式到LineTokenizer
实例的 Map 以及模式到FieldSetMapper
实例的 Map,因此很容易,如以下示例所示:
XML Configuration
<bean id="orderFileLineMapper"
class="org.spr...PatternMatchingCompositeLineMapper">
<property name="tokenizers">
<map>
<entry key="USER*" value-ref="userTokenizer" />
<entry key="LINEA*" value-ref="lineATokenizer" />
<entry key="LINEB*" value-ref="lineBTokenizer" />
</map>
</property>
<property name="fieldSetMappers">
<map>
<entry key="USER*" value-ref="userFieldSetMapper" />
<entry key="LINE*" value-ref="lineFieldSetMapper" />
</map>
</property>
</bean>
Java Configuration
@Bean
public PatternMatchingCompositeLineMapper orderFileLineMapper() {
PatternMatchingCompositeLineMapper lineMapper =
new PatternMatchingCompositeLineMapper();
Map<String, LineTokenizer> tokenizers = new HashMap<>(3);
tokenizers.put("USER*", userTokenizer());
tokenizers.put("LINEA*", lineATokenizer());
tokenizers.put("LINEB*", lineBTokenizer());
lineMapper.setTokenizers(tokenizers);
Map<String, FieldSetMapper> mappers = new HashMap<>(2);
mappers.put("USER*", userFieldSetMapper());
mappers.put("LINE*", lineFieldSetMapper());
lineMapper.setFieldSetMappers(mappers);
return lineMapper;
}
在此示例中,“ LINEA”和“ LINEB”具有单独的LineTokenizer
实例,但它们都使用相同的FieldSetMapper
。
PatternMatchingCompositeLineMapper
使用PatternMatcher#match
方法以便为每行选择正确的委托。 PatternMatcher
允许使用两个具有特殊含义的通配符:问号(“?”)恰好匹配一个字符,而星号(“ *”)则匹配零个或多个字符。请注意,在上述配置中,所有模式都以星号结尾,从而使它们有效地成为行的前缀。 PatternMatcher
始终匹配最可能的特定模式,而不考虑配置中的 Sequences。因此,如果将“ LINE *”和“ LINEA *”都列为模式,则“ LINEA”将匹配模式“ LINEA *”,而“ LINEB”将匹配模式“ LINE *”。此外,单个星号(“ *”)可以通过匹配未与任何其他模式匹配的任何行作为默认值,如以下示例所示。
XML Configuration
<entry key="*" value-ref="defaultLineTokenizer" />
Java Configuration
...
tokenizers.put("*", defaultLineTokenizer());
...
还有一个PatternMatchingCompositeLineTokenizer
可以单独用于令牌化。
平面文件包含每个跨越多行的记录也是很常见的。为了处理这种情况,需要更复杂的策略。 multiLineRecords
示例中提供了这种常见模式的演示。
平面文件中的异常处理
在很多情况下,对行进行标记可能会引发异常。许多平面文件并不完美,并且包含格式错误的记录。许多用户选择在记录问题,原始行和行号时跳过这些错误的行。以后可以手动或通过其他批处理作业检查这些日志。因此,Spring Batch 提供了一个用于处理解析异常的异常层次结构:FlatFileParseException
和FlatFileFormatException
。尝试读取文件时遇到任何错误,FlatFileItemReader
会引发FlatFileParseException
。 FlatFileFormatException
由LineTokenizer
接口的实现抛出,并指示在标记化时遇到的更具体的错误。
IncorrectTokenCountException
DelimitedLineTokenizer
和FixedLengthLineTokenizer
都可以指定可用于创建FieldSet
的列名。但是,如果列名的数量与对行进行标记时找到的列数不匹配,则无法创建FieldSet
,并抛出IncorrectTokenCountException
,其中包含遇到的标记数和期望的数,如以下示例:
tokenizer.setNames(new String[] {"A", "B", "C", "D"});
try {
tokenizer.tokenize("a,b,c");
}
catch(IncorrectTokenCountException e){
assertEquals(4, e.getExpectedCount());
assertEquals(3, e.getActualCount());
}
因为令牌化程序配置了 4 个列名,但是在文件中仅找到 3 个令牌,所以抛出了IncorrectTokenCountException
。
IncorrectLineLengthException
解析为固定长度格式的文件在解析时还有其他要求,因为与分隔格式不同,每一列必须严格遵守其 sched 义宽度。如果总行长不等于此列的最宽值,则将引发异常,如以下示例所示:
tokenizer.setColumns(new Range[] { new Range(1, 5),
new Range(6, 10),
new Range(11, 15) });
try {
tokenizer.tokenize("12345");
fail("Expected IncorrectLineLengthException");
}
catch (IncorrectLineLengthException ex) {
assertEquals(15, ex.getExpectedLength());
assertEquals(5, ex.getActualLength());
}
上面标记器的配置范围是:1-5、6-10 和 11-15.因此,该行的总长度为 15.但是,在前面的示例中,传入了一条长度为 5 的行,从而引发了IncorrectLineLengthException
。在这里抛出异常,而不是仅 Map 第一列,可以使行的处理更早地失败,并且具有比尝试在FieldSetMapper
的第 2 列中读取失败时包含的更多信息。但是,在某些情况下,线的长度并不总是恒定的。因此,可以通过'strict'属性关闭行长的验证,如以下示例所示:
tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10) });
tokenizer.setStrict(false);
FieldSet tokens = tokenizer.tokenize("12345");
assertEquals("12345", tokens.readString(0));
assertEquals("", tokens.readString(1));
前面的示例几乎与之前的示例相同,只是调用了tokenizer.setStrict(false)
。此设置告诉令牌化程序在对行进行令牌化时不要强制行长。现在已正确创建并返回FieldSet
。但是,对于其余值,它仅包含空标记。
1.6.3. FlatFileItemWriter
写入平面文件具有相同的问题和必须从文件中读取的问题。步骤必须能够以事务方式编写定界或定长格式。
LineAggregator
就像需要LineTokenizer
接口来获取一项并将其变成String
一样,文件写入必须具有一种将多个字段聚合到单个字符串中以写入文件的方法。在 Spring Batch 中,这是LineAggregator
,如以下接口定义所示:
public interface LineAggregator<T> {
public String aggregate(T item);
}
LineAggregator
与LineTokenizer
逻辑相反。 LineTokenizer
取String
并返回FieldSet
,而LineAggregator
取item
并返回String
。
PassThroughLineAggregator
LineAggregator
接口的最基本实现是PassThroughLineAggregator
,它假定对象已经是一个字符串或它的字符串表示形式可以接受写入,如以下代码所示:
public class PassThroughLineAggregator<T> implements LineAggregator<T> {
public String aggregate(T item) {
return item.toString();
}
}
如果需要直接控制创建字符串,但是FlatFileItemWriter
的优点(例如事务和重新启动支持)是必需的,则上述实现很有用。
简化文件编写示例
既然已经定义了LineAggregator
接口及其最基本的实现PassThroughLineAggregator
,那么可以说明基本的编写流程:
要写入的对象被传递到
LineAggregator
以获得String
。返回的
String
将被写入配置的文件。
FlatFileItemWriter
的以下摘录用代码表示:
public void write(T item) throws Exception {
write(lineAggregator.aggregate(item) + LINE_SEPARATOR);
}
一个简单的配置可能如下所示:
XML Configuration
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" value="file:target/test-outputs/output.txt" />
<property name="lineAggregator">
<bean class="org.spr...PassThroughLineAggregator"/>
</property>
</bean>
Java Configuration
@Bean
public FlatFileItemWriter itemWriter() {
return new FlatFileItemWriterBuilder<Foo>()
.name("itemWriter")
.resource(new FileSystemResource("target/test-outputs/output.txt"))
.lineAggregator(new PassThroughLineAggregator<>())
.build();
}
FieldExtractor
前面的示例对于写入文件的最基本用途可能很有用。但是,FlatFileItemWriter
的大多数用户都有一个域对象,需要将该域对象写出,因此必须将其转换为一行。在读取文件时,需要满足以下条件:
从文件中读取一行。
将行传递到
LineTokenizer#tokenize()
方法中,以便检索FieldSet
。将标记化返回的
FieldSet
传递给FieldSetMapper
,并从ItemReader#read()
方法返回结果。
文件写入具有相似但相反的步骤:
将要写入的 Item 传递给 Writer。
将 Item 上的字段转换为数组。
将结果数组聚合为一行。
因为框架没有办法知道需要写出对象中的哪些字段,所以必须写一个FieldExtractor
来完成将 Item 变成数组的任务,如以下接口定义所示:
public interface FieldExtractor<T> {
Object[] extract(T item);
}
FieldExtractor
接口的实现应从提供的对象的字段中创建一个数组,然后可以使用元素之间的分隔符将其写出,也可以将其写为固定宽度的行的一部分。
PassThroughFieldExtractor
在许多情况下,需要写出集合,例如数组Collection
或FieldSet
。从这些集合类型之一中“提取”数组非常简单。为此,请将集合转换为数组。因此,在这种情况下应使用PassThroughFieldExtractor
。应当注意,如果传入的对象不是集合的类型,则PassThroughFieldExtractor
返回仅包含要提取的 Item 的数组。
BeanWrapperFieldExtractor
与在文件读取部分中介绍的BeanWrapperFieldSetMapper
一样,通常最好配置如何将域对象转换为对象数组,而不是自己编写转换。 BeanWrapperFieldExtractor
提供了此功能,如以下示例所示:
BeanWrapperFieldExtractor<Name> extractor = new BeanWrapperFieldExtractor<Name>();
extractor.setNames(new String[] { "first", "last", "born" });
String first = "Alan";
String last = "Turing";
int born = 1912;
Name n = new Name(first, last, born);
Object[] values = extractor.extract(n);
assertEquals(first, values[0]);
assertEquals(last, values[1]);
assertEquals(born, values[2]);
此提取器实现只有一个必需的属性:要 Map 的字段的名称。就像BeanWrapperFieldSetMapper
需要字段名称将FieldSet
上的字段 Map 到所提供对象上的 setter 一样,BeanWrapperFieldExtractor
也需要名称 Map 到 getter 来创建对象数组。值得注意的是,名称的 Sequences 决定了数组中字段的 Sequences。
分隔文件写入示例
最基本的平面文件格式是其中所有字段都由定界符分隔的格式。这可以使用DelimitedLineAggregator
完成。下面的示例写出一个简单的域对象,该对象代表 Client 帐户的贷方:
public class CustomerCredit {
private int id;
private String name;
private BigDecimal credit;
//getters and setters removed for clarity
}
由于正在使用域对象,因此必须提供FieldExtractor
接口的实现以及要使用的分隔符,如以下示例所示:
XML Configuration
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator">
<bean class="org.spr...DelimitedLineAggregator">
<property name="delimiter" value=","/>
<property name="fieldExtractor">
<bean class="org.spr...BeanWrapperFieldExtractor">
<property name="names" value="name,credit"/>
</bean>
</property>
</bean>
</property>
</bean>
Java Configuration
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] {"name", "credit"});
fieldExtractor.afterPropertiesSet();
DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
lineAggregator.setFieldExtractor(fieldExtractor);
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.lineAggregator(lineAggregator)
.build();
}
在前面的示例中,本章前面介绍的BeanWrapperFieldExtractor
用于将CustomerCredit
中的名称和贷方字段转换为对象数组,然后将其写成每个字段之间的逗号。
也可以使用FlatFileItemWriterBuilder.DelimitedBuilder
自动创建BeanWrapperFieldExtractor
和DelimitedLineAggregator
,如以下示例所示:
Java Configuration
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.delimited()
.delimiter("|")
.names(new String[] {"name", "credit"})
.build();
}
定宽文件写入示例
分隔不是平面文件格式的唯一类型。许多人更喜欢为每个列使用固定宽度来在字段之间划定轮廓,这通常称为“固定宽度”。 Spring Batch 通过FormatterLineAggregator
支持此功能。使用上述相同的CustomerCredit
域对象,可以将其配置如下:
XML Configuration
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator">
<bean class="org.spr...FormatterLineAggregator">
<property name="fieldExtractor">
<bean class="org.spr...BeanWrapperFieldExtractor">
<property name="names" value="name,credit" />
</bean>
</property>
<property name="format" value="%-9s%-2.0f" />
</bean>
</property>
</bean>
Java Configuration
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] {"name", "credit"});
fieldExtractor.afterPropertiesSet();
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
lineAggregator.setFieldExtractor(fieldExtractor);
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.lineAggregator(lineAggregator)
.build();
}
前面的大多数示例应该 Watch 起来很熟悉。但是,format 属性的值是 new,并显示在以下元素中:
<property name="format" value="%-9s%-2.0f" />
...
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
...
使用与 Java 5 相同的Formatter
构建底层实现。Java Formatter
基于 C 编程语言的printf
功能。有关如何配置格式化程序的大多数详细信息,请参见Formatter的 Javadoc。
也可以使用FlatFileItemWriterBuilder.FormattedBuilder
自动创建BeanWrapperFieldExtractor
和FormatterLineAggregator
,如以下示例所示:
Java Configuration
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.formatted()
.format("%-9s%-2.0f")
.names(new String[] {"name", "credit"})
.build();
}
处理文件创建
FlatFileItemReader
与文件资源的关系非常简单。初始化 Reader 后,它将打开文件(如果存在),如果没有,则引发异常。文件写入并不是那么简单。乍一 Watch,似乎应该为FlatFileItemWriter
构建类似的简单约定:如果文件已经存在,则引发异常,如果不存在,则创建它并开始写入。但是,潜在地重新启动Job
可能会导致问题。在正常的重新启动方案中,Contract 被撤销:如果文件存在,则从最后一个已知的好的位置开始对其进行写入,如果不存在,则引发异常。但是,如果此作业的文件名始终相同会怎样?在这种情况下,您希望删除该文件(如果存在),除非重新启动。由于这种可能性,FlatFileItemWriter
包含属性shouldDeleteIfExists
。将此属性设置为 true 会导致在打开编写器时删除具有相同名称的现有文件。
1.7. XML Item 读取器和写入器
Spring Batch 提供了用于读取 XML 记录并将它们 Map 到 Java 对象以及将 Java 对象编写为 XML 记录的事务性基础结构。
Definition of Actuator
StAX API 用于 I/O,因为其他标准 XML 解析 API 不能满足批处理要求(DOM 一次将整个 Importing 加载到内存中,而 SAX 通过允许用户仅提供回调来控制解析过程)。
我们需要考虑在 Spring Batch 中 XML Importing 和输出如何工作。首先,有一些概念与文件读写不同,但在 Spring Batch XML 处理中很常见。使用 XML 处理时,代替需要标记的记录行(FieldSet
个实例),假定 XML 资源是与各个记录相对应的“片段”的集合,如下图所示:
图 1. XML Importing
在上述方案中,“贸易”标签被定义为“根元素”。 ''和'</ trade>'之间的所有内容都被视为一个“片段”。 Spring Batch 使用对象/ XML Map(OXM)将片段绑定到对象。但是,Spring Batch 不与任何特定的 XML 绑定技术绑定。典型的用法是委托Spring OXM,它为最流行的 OXM 技术提供统一的抽象。对 Spring OXM 的依赖关系是可选的,如果需要,您可以选择实现特定于 Spring Batch 的接口。下图显示了与 OXM 支持的技术的关系:
图 2. OXM 绑定
通过对 OXM 的介绍以及如何使用 XML 片段表示记录,我们现在可以更仔细地检查 Reader 和 Writer。
1.7.1. StaxEventItemReader
StaxEventItemReader
配置提供了用于处理 XML Importing 流中的记录的典型设置。首先,考虑StaxEventItemReader
可以处理的以下 XML 记录集:
<?xml version="1.0" encoding="UTF-8"?>
<records>
<trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0001</isin>
<quantity>5</quantity>
<price>11.39</price>
<customer>Customer1</customer>
</trade>
<trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0002</isin>
<quantity>2</quantity>
<price>72.99</price>
<customer>Customer2c</customer>
</trade>
<trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0003</isin>
<quantity>9</quantity>
<price>99.99</price>
<customer>Customer3</customer>
</trade>
</records>
为了能够处理 XML 记录,需要满足以下条件:
根元素名称:构成要 Map 的对象的片段的根元素的名称。示例配置通过贸易价值展示了这一点。
资源:表示要读取文件的 Spring 资源。
Unmarshaller
:Spring OXM 提供的解组工具,用于将 XML 片段 Map 到对象。
下面的示例显示如何定义与名为trade
的根元素,org/springframework/batch/item/xml/domain/trades.xml
的资源以及称为tradeMarshaller
的解组器一起使用的StaxEventItemReader
。
XML Configuration
<bean id="itemReader" class="org.springframework.batch.item.xml.StaxEventItemReader">
<property name="fragmentRootElementName" value="trade" />
<property name="resource" value="org/springframework/batch/item/xml/domain/trades.xml" />
<property name="unmarshaller" ref="tradeMarshaller" />
</bean>
Java Configuration
@Bean
public StaxEventItemReader itemReader() {
return new StaxEventItemReaderBuilder<Trade>()
.name("itemReader")
.resource(new FileSystemResource("org/springframework/batch/item/xml/domain/trades.xml"))
.addFragmentRootElements("trade")
.unmarshaller(tradeMarshaller())
.build();
}
请注意,在此示例中,我们选择使用XStreamMarshaller
,它接受作为 Map 传入的别名,其中第一个键和值是片段的名称(即根元素)以及要绑定的对象类型。然后,类似于FieldSet
,Map 到对象类型内字段的其他元素的名称在 Map 中描述为键/值对。在配置文件中,我们可以使用 Spring 配置 Util 来描述所需的别名,如下所示:
XML Configuration
<bean id="tradeMarshaller"
class="org.springframework.oxm.xstream.XStreamMarshaller">
<property name="aliases">
<util:map id="aliases">
<entry key="trade"
value="org.springframework.batch.sample.domain.trade.Trade" />
<entry key="price" value="java.math.BigDecimal" />
<entry key="isin" value="java.lang.String" />
<entry key="customer" value="java.lang.String" />
<entry key="quantity" value="java.lang.Long" />
</util:map>
</property>
</bean>
Java Configuration
@Bean
public XStreamMarshaller tradeMarshaller() {
Map<String, Class> aliases = new HashMap<>();
aliases.put("trade", Trade.class);
aliases.put("price", BigDecimal.class);
aliases.put("isin", String.class);
aliases.put("customer", String.class);
aliases.put("quantity", Long.class);
XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
return marshaller;
}
在 Importing 时,Reader 读取 XML 资源,直到它识别出一个新的片段即将开始。默认情况下,读取器将匹配元素名称以识别新片段即将开始。读取器从该片段创建一个独立的 XML 文档,并将该文档传递给解串器(通常是 Spring OXM Unmarshaller
的包装器),以将 XML Map 到 Java 对象。
总之,此过程类似于以下 Java 代码,该代码使用 Spring 配置提供的注入:
StaxEventItemReader<Trade> xmlStaxEventItemReader = new StaxEventItemReader<>();
Resource resource = new ByteArrayResource(xmlResource.getBytes());
Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
XStreamMarshaller unmarshaller = new XStreamMarshaller();
unmarshaller.setAliases(aliases);
xmlStaxEventItemReader.setUnmarshaller(unmarshaller);
xmlStaxEventItemReader.setResource(resource);
xmlStaxEventItemReader.setFragmentRootElementName("trade");
xmlStaxEventItemReader.open(new ExecutionContext());
boolean hasNext = true;
Trade trade = null;
while (hasNext) {
trade = xmlStaxEventItemReader.read();
if (trade == null) {
hasNext = false;
}
else {
System.out.println(trade);
}
}
1.7.2. StaxEventItemWriter
输出与 Importing 对称地工作。 StaxEventItemWriter
需要Resource
,编组和rootTagName
。将 Java 对象传递到编组器(通常是标准的 Spring OXM Marshaller),编组器通过使用自定义事件编写器写入Resource
来过滤 OXM 工具为每个片段生成的StartDocument
和EndDocument
事件。以下示例使用StaxEventItemWriter
:
XML Configuration
<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" ref="outputResource" />
<property name="marshaller" ref="tradeMarshaller" />
<property name="rootTagName" value="trade" />
<property name="overwriteOutput" value="true" />
</bean>
Java Configuration
@Bean
public StaxEventItemWriter itemWriter(Resource outputResource) {
return new StaxEventItemWriterBuilder<Trade>()
.name("tradesWriter")
.marshaller(tradeMarshaller())
.resource(outputResource)
.rootTagName("trade")
.overwriteOutput(true)
.build();
}
前面的配置设置了三个必需的属性,并设置了可选的overwriteOutput=true
属性,本章前面已经提到了该属性,用于指定是否可以覆盖现有文件。应当注意,以下示例中用于编写程序的编组器与本章前面的阅读示例中使用的编组器完全相同:
XML Configuration
<bean id="customerCreditMarshaller"
class="org.springframework.oxm.xstream.XStreamMarshaller">
<property name="aliases">
<util:map id="aliases">
<entry key="customer"
value="org.springframework.batch.sample.domain.trade.Trade" />
<entry key="price" value="java.math.BigDecimal" />
<entry key="isin" value="java.lang.String" />
<entry key="customer" value="java.lang.String" />
<entry key="quantity" value="java.lang.Long" />
</util:map>
</property>
</bean>
Java Configuration
@Bean
public XStreamMarshaller customerCreditMarshaller() {
XStreamMarshaller marshaller = new XStreamMarshaller();
Map<String, Class> aliases = new HashMap<>();
aliases.put("trade", Trade.class);
aliases.put("price", BigDecimal.class);
aliases.put("isin", String.class);
aliases.put("customer", String.class);
aliases.put("quantity", Long.class);
marshaller.setAliases(aliases);
return marshaller;
}
总结一下 Java 示例,以下代码说明了所有讨论的要点,展示了所需属性的编程设置:
FileSystemResource resource = new FileSystemResource("data/outputFile.xml")
Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
Marshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
StaxEventItemWriter staxItemWriter =
new StaxEventItemWriterBuilder<Trade>()
.name("tradesWriter")
.marshaller(marshaller)
.resource(resource)
.rootTagName("trade")
.overwriteOutput(true)
.build();
staxItemWriter.afterPropertiesSet();
ExecutionContext executionContext = new ExecutionContext();
staxItemWriter.open(executionContext);
Trade trade = new Trade();
trade.setPrice(11.39);
trade.setIsin("XYZ0001");
trade.setQuantity(5L);
trade.setCustomer("Customer1");
staxItemWriter.write(trade);
1.8. JSON Item 读取器和写入器
Spring Batch 提供了以下格式的读取和写入 JSON 资源的支持:
[
{
"isin": "123",
"quantity": 1,
"price": 1.2,
"customer": "foo"
},
{
"isin": "456",
"quantity": 2,
"price": 1.4,
"customer": "bar"
}
]
假定 JSON 资源是对应于各个 Item 的 JSON 对象数组。 Spring Batch 未绑定到任何特定的 JSON 库。
1.8.1. JsonItemReader
JsonItemReader
将 JSON 解析和绑定委托给org.springframework.batch.item.json.JsonObjectReader
接口的实现。该接口旨在通过使用流 API 读取大块的 JSON 对象来实现。当前提供了两种实现:
Jackson到
org.springframework.batch.item.json.JacksonJsonObjectReader
Jackson到
org.springframework.batch.item.json.JacksonJsonObjectReader
为了能够处理 JSON 记录,需要满足以下条件:
Resource
:一个表示要读取的 JSON 文件的 Spring 资源。JsonObjectReader
:JSON 对象读取器,用于解析 JSON 对象并将其绑定到 Item
以下示例显示了如何定义与先前的 JSON 资源org/springframework/batch/item/json/trades.json
一起使用的JsonItemReader
和基于 Jackson 的JsonObjectReader
:
@Bean
public JsonItemReader<Trade> jsonItemReader() {
return new JsonItemReaderBuilder<Trade>()
.jsonObjectReader(new JacksonJsonObjectReader<>(Trade.class))
.resource(new ClassPathResource("trades.json"))
.name("tradeJsonItemReader")
.build();
}
1.8.2. JsonFileItemWriter
JsonFileItemWriter
将 Item 的编组委派到org.springframework.batch.item.json.JsonObjectMarshaller
界面。该接口的约定是获取一个对象并将其编组为 JSON String
。当前提供了两种实现:
Jackson到
org.springframework.batch.item.json.JacksonJsonObjectReader
Jackson到
org.springframework.batch.item.json.JacksonJsonObjectReader
为了能够写入 JSON 记录,需要满足以下条件:
Resource
:SpringResource
,代表要写入的 JSON 文件JsonObjectMarshaller
:将 JSON 对象编组为 JSON 格式的 JSON 对象编组器
以下示例显示了如何定义JsonFileItemWriter
:
@Bean
public JsonFileItemWriter<Trade> jsonFileItemWriter() {
return new JsonFileItemWriterBuilder<Trade>()
.jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
.resource(new ClassPathResource("trades.json"))
.name("tradeJsonFileItemWriter")
.build();
}
1.9. 多文件 Importing
通常在单个Step
中处理多个文件。假设文件格式相同,则MultiResourceItemReader
支持 XML 和平面文件处理的这种类型的 Importing。考虑目录中的以下文件:
file-1.txt file-2.txt ignored.txt
file-1.txt
和file-2.txt
的格式相同,并且出于业务原因,应一起处理。可以使用通配符使用MultiResourceItemReader
读取两个文件,如以下示例所示:
XML Configuration
<bean id="multiResourceReader" class="org.spr...MultiResourceItemReader">
<property name="resources" value="classpath:data/input/file-*.txt" />
<property name="delegate" ref="flatFileItemReader" />
</bean>
Java Configuration
@Bean
public MultiResourceItemReader multiResourceReader() {
return new MultiResourceItemReaderBuilder<Foo>()
.delegate(flatFileItemReader())
.resources(resources())
.build();
}
引用的委托是简单的FlatFileItemReader
。上面的配置从两个文件读取 Importing,处理回滚和重新启动方案。应当注意,与任何ItemReader
一样,添加额外的 Importing(在这种情况下为文件)可能会在重新启动时引起潜在的问题。建议批处理作业使用其各自的目录,直到成功完成为止。
Note
通过使用MultiResourceItemReader#setComparator(Comparator)
对 Importing 资源进行排序,以确保在重新启动方案中在作业运行之间保留资源排序。
1.10. Database
像大多数企业应用程序样式一样,数据库是批处理的中央存储机制。但是,批处理与其他应用程序样式不同,这是由于系统必须使用的数据集的绝对大小。如果 SQL 语句返回 100 万行,则结果集可能将所有返回的结果保存在内存中,直到读取了所有行。 Spring Batch 针对此问题提供了两种类型的解决方案:
1.10.1. 基于游标的 ItemReader 实现
通常,使用数据库游标是大多数批处理开发人员的默认方法,因为它是数据库解决“流式”关系数据问题的方法。 Java ResultSet
类实质上是一种用于操纵游标的面向对象的机制。 ResultSet
将光标保留到当前数据行。在ResultSet
上调用next
会将光标移到下一行。基于 Spring Batch 游标的ItemReader
实现会在初始化时打开游标,并将对read
的每次调用都将游标向前移动一行,返回可用于处理的 Map 对象。然后调用close
方法以确保释放所有资源。 Spring 核心JdbcTemplate
通过使用回调模式完全 MapResultSet
中的所有行并在将控制权返回给方法调用者之前关闭来解决此问题。但是,必须成批处理,直到步骤完成。下图显示了基于游标的ItemReader
的工作原理图。请注意,尽管示例使用 SQL(因为 SQL 众所周知),但是任何技术都可以实现基本方法。
图 3.游标示例
本示例说明了基本模式。给定一个'FOO'表,该表具有三列:ID
,NAME
和BAR
,选择 ID 大于 1 但小于 7 的所有行。这会将光标的开始(行 1)放在 ID 2 上。结果该行的Foo
应该完全 Map。再次调用read()
会将光标移动到 ID 为 3 的下一行,即Foo
。这些读取的结果将在每个read
之后写出,从而可以对对象进行垃圾回收(假定没有实例变量维护对其的引用) )。
JdbcCursorItemReader
JdbcCursorItemReader
是基于游标的技术的 JDBC 实现。它直接与ResultSet
一起使用,并且需要 SQL 语句针对从DataSource
获得的连接运行。以以下数据库模式为例:
CREATE TABLE CUSTOMER (
ID BIGINT IDENTITY PRIMARY KEY,
NAME VARCHAR(45),
CREDIT FLOAT
);
许多人更喜欢为每行使用一个域对象,因此以下示例使用RowMapper
接口的实现来 MapCustomerCredit
对象:
public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {
public static final String ID_COLUMN = "id";
public static final String NAME_COLUMN = "name";
public static final String CREDIT_COLUMN = "credit";
public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
CustomerCredit customerCredit = new CustomerCredit();
customerCredit.setId(rs.getInt(ID_COLUMN));
customerCredit.setName(rs.getString(NAME_COLUMN));
customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));
return customerCredit;
}
}
因为JdbcCursorItemReader
与JdbcTemplate
共享关键接口,所以 Watch 一个示例如何使用JdbcTemplate
读取此数据以与ItemReader
进行对比很有用。就本示例而言,假设CUSTOMER
数据库中有 1,000 行。第一个示例使用JdbcTemplate
:
//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
new CustomerCreditRowMapper());
运行前面的代码段后,customerCredits
列表包含 1,000 个CustomerCredit
对象。在查询方法中,从DataSource
获得连接,对它运行提供的 SQL,并为ResultSet
中的每一行调用mapRow
方法。将此与JdbcCursorItemReader
的方法进行对比,如下面的示例所示:
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
运行前面的代码段后,计数器等于 1,000.如果上面的代码已将返回的customerCredit
放入列表,则结果将与JdbcTemplate
示例完全相同。但是,ItemReader
的最大优点是它允许“流式传输”Item。可以一次调用read
方法,可以用ItemWriter
写出该 Item,然后可以使用read
获得下一个 Item。这样就可以在“块”中进行 Item 读取和写入,并定期进行,这是高性能批处理的本质。此外,它很容易配置为注入到 Spring Batch Step
中,如以下示例所示:
XML Configuration
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
Java Configuration
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
return new JdbcCursorItemReaderBuilder<CustomerCredit>()
.dataSource(this.dataSource)
.name("creditReader")
.sql("select ID, NAME, CREDIT from CUSTOMER")
.rowMapper(new CustomerCreditRowMapper())
.build();
}
Additional Properties
由于在 Java 中有很多用于打开游标的选项,因此JdbcCursorItemReader
上可以设置许多属性,如下表所示:
表 2. JdbcCursorItemReader 属性
ignoreWarnings | 确定是否记录 SQLWarnings 或导致异常。默认值为true (表示已记录警告)。 |
fetchSize | 当ItemReader 使用的ResultSet 对象需要更多行时,为 JDBC 驱动程序提供提示,提示应从数据库中获取的行数。默认情况下,不给出任何提示。 |
maxRows | 设置基础ResultSet 一次可以容纳的最大行数限制。 |
queryTimeout | 设置驱动程序 awaitStatement 对象运行的秒数。如果超出限制,则抛出DataAccessException 。 (有关详细信息,请咨询您的驱动程序供应商文档)。 |
verifyCursorPosition | 由于ItemReader 持有的ResultSet 被传递给RowMapper ,因此用户可以自己调用ResultSet.next() ,这可能会导致读取器的内部计数出现问题。将此值设置为true 会导致抛出异常,如果RowMapper 调用之后的游标位置与之前不同。 |
saveState | 指示是否将 Reader 的状态保存在ItemStream#update(ExecutionContext) 提供的ExecutionContext 中。默认值为true 。 |
driverSupportsAbsolute | 指示 JDBC 驱动程序是否支持在ResultSet 上设置绝对行。对于支持ResultSet.absolute() 的 JDBC 驱动程序,建议将其设置为true ,因为它可以提高性能,特别是如果在处理大型数据集时某个步骤失败时。默认为false 。 |
setUseSharedExtendedConnection | 指示用于游标的连接是否应由所有其他处理使用,从而共享同一事务。如果将其设置为false ,那么游标将以其自己的连接打开,并且不参与其余步骤处理中启动的任何事务。如果将此标志设置为true ,则必须将数据源包装在ExtendedConnectionDataSourceProxy 中,以防止每次提交后关闭和释放连接。当将此选项设置为true 时,将同时使用'READ_ONLY'和'HOLD_CURSORS_OVER_COMMIT'选项创建用于打开游标的语句。这样就可以使游标在事务开始时保持打开状态,并在步骤处理中执行提交。要使用此功能,您需要一个支持此功能的数据库和一个支持 JDBC 3.0 或更高版本的 JDBC 驱动程序。默认为false 。 |
HibernateCursorItemReader
就像普通的 Spring 用户在决定是否使用 ORM 解决方案(这会影响他们使用JdbcTemplate
还是HibernateTemplate
)上做出重要决定一样,Spring Batch 用户也具有相同的选择。 HibernateCursorItemReader
是光标技术的 Hibernate 实现。 Hibernate 的批量使用方式一直存在争议。这主要是因为 Hibernate 最初是为支持在线应用程序样式而开发的。但是,这并不意味着它不能用于批处理。解决此问题的最简单方法是使用StatelessSession
而不是标准会话。这消除了 Hibernate 使用的所有缓存和脏检查,并且可能在批处理方案中引起问题。有关 Stateless 和正常休眠会话之间差异的更多信息,请参阅特定休眠版本的文档。 HibernateCursorItemReader
允许您声明 HQL 语句并传递SessionFactory
,这将使每个调用返回一个 Item,以与JdbcCursorItemReader
相同的基本方式进行读取。以下示例配置使用与 JDBC Reader 相同的“Client 信用”示例:
HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
假设已为Customer
表正确创建了休眠 Map 文件,此配置的ItemReader
以与JdbcCursorItemReader
所述完全相同的方式返回CustomerCredit
个对象。 “ useStatelessSession”属性默认为 true,但已在此处添加,以引起人们注意打开或关闭该属性的能力。还值得注意的是,可以通过setFetchSize
属性设置基础游标的获取大小。与JdbcCursorItemReader
一样,配置非常简单,如以下示例所示:
XML Configuration
<bean id="itemReader"
class="org.springframework.batch.item.database.HibernateCursorItemReader">
<property name="sessionFactory" ref="sessionFactory" />
<property name="queryString" value="from CustomerCredit" />
</bean>
Java Configuration
@Bean
public HibernateCursorItemReader itemReader(SessionFactory sessionFactory) {
return new HibernateCursorItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.sessionFactory(sessionFactory)
.queryString("from CustomerCredit")
.build();
}
StoredProcedureItemReader
有时有必要通过使用存储过程来获取游标数据。 StoredProcedureItemReader
的工作方式类似于JdbcCursorItemReader
,不同之处在于,它运行查询返回游标的存储过程,而不是运行查询以获取游标。存储过程可以通过三种不同的方式返回游标:
作为返回的
ResultSet
(由 SQL Server,Sybase,DB2,Derby 和 MySQL 使用)。作为参考光标,作为 out 参数返回(由 Oracle 和 PostgreSQL 使用)。
作为存储函数调用的返回值。
以下示例配置使用与先前示例相同的“Client 信用”示例:
XML Configuration
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
Java Configuration
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
return reader;
}
前面的示例依赖于存储过程来提供ResultSet
作为返回结果(先前的选项 1)。
如果存储过程返回了ref-cursor
(选项 2),则我们需要提供 out 参数的位置,即返回的ref-cursor
。以下示例显示如何将第一个参数用作 ref 游标:
XML Configuration
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
Java Configuration
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setRefCursorPosition(1);
return reader;
}
如果游标是从存储的函数(选项 3)返回的,则需要将属性“ function”设置为true
。默认为false
。以下示例显示了如下内容:
XML Configuration
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="function" value="true"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
Java Configuration
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setFunction(true);
return reader;
}
在所有这些情况下,我们都需要定义一个RowMapper
以及一个DataSource
以及实际的过程名称。
如果存储过程或函数接受参数,则必须通过parameters
属性对其进行声明和设置。对于 Oracle,以下示例声明了三个参数。第一个是返回参量的 out 参数,第二个和第三个是采用INTEGER
类型值的 in 参数。
XML Configuration
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="spring.cursor_func"/>
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="newid"/>
<constructor-arg index="1">
<util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="amount"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="custid"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper" ref="rowMapper"/>
<property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>
Java Configuration
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
List<SqlParameter> parameters = new ArrayList<>();
parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
parameters.add(new SqlParameter("amount", Types.INTEGER);
parameters.add(new SqlParameter("custId", Types.INTEGER);
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("spring.cursor_func");
reader.setParameters(parameters);
reader.setRefCursorPosition(1);
reader.setRowMapper(rowMapper());
reader.setPreparedStatementSetter(parameterSetter());
return reader;
}
除了参数声明之外,我们还需要指定PreparedStatementSetter
实现,该实现为调用设置参数值。这与上面的JdbcCursorItemReader
相同。 Additional Properties中列出的所有其他属性也适用于StoredProcedureItemReader
。
1.10.2. 分页 ItemReader 实现
使用数据库游标的一种替代方法是运行多个查询,其中每个查询获取一部分结果。我们将此部分称为页面。每个查询必须指定起始行号和我们要在页面中返回的行数。
JdbcPagingItemReader
寻呼ItemReader
的一种实现是JdbcPagingItemReader
。 JdbcPagingItemReader
需要一个PagingQueryProvider
负责提供用于检索组成页面的行的 SQL 查询。由于每个数据库都有其提供分页支持的策略,因此我们需要为每种受支持的数据库类型使用不同的PagingQueryProvider
。还有SqlPagingQueryProviderFactoryBean
自动检测正在使用的数据库并确定适当的PagingQueryProvider
实现。这简化了配置,是推荐的最佳实践。
SqlPagingQueryProviderFactoryBean
要求您指定select
子句和from
子句。您还可以提供一个可选的where
子句。这些子句和必需的sortKey
用于构建 SQL 语句。
Note
重要的是在sortKey
上具有唯一的键约束,以确保两次执行之间不会丢失任何数据。
打开 Reader 后,它会按照每次调用ItemReader
的基本方式,将每次调用将一项返回给read
。当需要其他行时,分页将在幕后进行。
以下示例配置使用与前面显示的基于光标的ItemReaders
类似的“Client 信用”示例:
XML Configuration
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="queryProvider">
<bean class="org.spr...SqlPagingQueryProviderFactoryBean">
<property name="selectClause" value="select id, name, credit"/>
<property name="fromClause" value="from customer"/>
<property name="whereClause" value="where status=:status"/>
<property name="sortKey" value="id"/>
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="status" value="NEW"/>
</map>
</property>
<property name="pageSize" value="1000"/>
<property name="rowMapper" ref="customerMapper"/>
</bean>
Java Configuration
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
此配置的ItemReader
使用RowMapper
返回CustomerCredit
对象,必须指定该对象。 “ pageSize”属性确定每个查询运行从数据库读取的实体数。
'parameterValues'属性可用于为查询指定参数值的Map
。如果在where
子句中使用命名参数,则每个条目的键应与命名参数的名称匹配。如果使用传统的“?”占位符,则每个条目的键应为占位符的编号,从 1 开始。
JpaPagingItemReader
寻呼ItemReader
的另一种实现是JpaPagingItemReader
。 JPA 没有类似于 Hibernate StatelessSession
的概念,因此我们必须使用 JPA 规范提供的其他功能。由于 JPA 支持分页,因此在使用 JPA 进行批处理时,这是自然的选择。读取每个页面后,实体将分离,并清除持久性上下文,以允许在处理页面后对实体进行垃圾回收。
JpaPagingItemReader
可让您声明 JPQL 语句并传递EntityManagerFactory
。然后,它每次调用都传回一项内容,以与其他任何ItemReader
相同的基本方式进行读取。当需要其他实体时,分页发生在幕后。以下示例配置使用与前面显示的 JDBC Reader 相同的“Client 信用”示例:
XML Configuration
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
<property name="queryString" value="select c from CustomerCredit c"/>
<property name="pageSize" value="1000"/>
</bean>
Java Configuration
@Bean
public JpaPagingItemReader itemReader() {
return new JpaPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.entityManagerFactory(entityManagerFactory())
.queryString("select c from CustomerCredit c")
.pageSize(1000)
.build();
}
假设CustomerCredit
对象具有正确的 JPA 注解或 ORM Map 文件,则此配置的ItemReader
以与上述JdbcPagingItemReader
所述完全相同的方式返回CustomerCredit
个对象。 “ pageSize”属性确定每次查询执行时从数据库读取的实体数。
1.10.3. 数据库 ItemWriters
虽然平面文件和 XML 文件都具有特定的ItemWriter
实例,但是在数据库世界中没有确切的实例。这是因为事务提供了所有必需的功能。 ItemWriter
实现对于文件来说是必需的,因为它们必须表现得像是事务性的,并跟踪书面 Item 并在适当的时候进行刷新或清除。数据库不需要此功能,因为写入已包含在事务中。用户可以创建自己的 DAO,以实现ItemWriter
接口,也可以使用自定义ItemWriter
中的一个 DAO 来处理一般的处理问题。无论哪种方式,它们都应该正常工作。需要注意的一件事是批处理输出所提供的性能和错误处理功能。这在使用 hibernate 作为ItemWriter
时最常见,但是在使用 JDBC 批处理模式时可能会有相同的问题。假设我们要小心刷新并且数据中没有错误,则批处理数据库的输出没有任何固有的缺陷。但是,写入时发生的任何错误都可能引起混乱,因为无法知道是哪一个单独的 Item 导致了异常,甚至没有任何一个单独的 Item 负责,如下图所示:
图 4.冲洗错误
如果在写入之前对 Item 进行了缓冲,则在提交之前刷新缓冲区之前,不会引发任何错误。例如,假设每个块写入 20 个 Item,而第 15 个 Item 则抛出DataIntegrityViolationException
。就Step
而言,所有 20 个 Item 均已成功写入,因为在实际写入之前无法知道发生错误。 Session#flush()
被调用后,缓冲区将被清空,并会触发异常。此时,Step
无能为力。事务必须回滚。通常,此异常可能会导致 Item 被跳过(取决于跳过/重试策略),然后不会再次写入。但是,在批处理方案中,无法知道是哪个 Item 引起了问题。发生故障时将写入整个缓冲区。解决此问题的唯一方法是在每个 Item 之后冲洗,如下图所示:
图 5.写入错误
这是一个常见的用例,尤其是在使用 Hibernate 时,实现ItemWriter
的简单准则是在每次对write()
的调用时刷新。这样做可以使 Item 可靠地被跳过,Spring Batch 在内部负责处理发生错误后对ItemWriter
的调用的粒度。
1.11. 重用现有服务
批处理系统通常与其他应用程序样式结合使用。最常见的是在线系统,但它也可以通过移动每种应用程序样式使用的必要批量数据来支持集成甚至胖 Client 端应用程序。因此,许多用户通常都想在其批处理作业中重用现有的 DAO 或其他服务。通过允许注入任何必需的类,Spring 容器本身使此操作相当容易。但是,在某些情况下,现有服务需要充当ItemReader
或ItemWriter
,以满足另一个 Spring Batch 类的依赖关系,或者因为它确实是某个步骤的主要ItemReader
。为每个需要包装的服务编写一个适配器类很简单,但是由于这是一个普遍的问题,Spring Batch 提供了ItemReaderAdapter
和ItemWriterAdapter
的实现。这两个类都通过调用委托模式来实现标准的 Spring 方法,并且设置起来非常简单。以下示例使用ItemReaderAdapter
:
XML Configuration
<bean id="itemReader" class="org.springframework.batch.item.adapter.ItemReaderAdapter">
<property name="targetObject" ref="fooService" />
<property name="targetMethod" value="generateFoo" />
</bean>
<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />
Java Configuration
@Bean
public ItemReaderAdapter itemReader() {
ItemReaderAdapter reader = new ItemReaderAdapter();
reader.setTargetObject(fooService());
reader.setTargetMethod("generateFoo");
return reader;
}
@Bean
public FooService fooService() {
return new FooService();
}
需要注意的重要一点是targetMethod
的 Contract 必须与read
的 Contract 相同:用尽后,它将返回null
。否则,它返回Object
。根据ItemWriter
的实现,其他任何因素都会阻止框架知道处理应何时结束,从而导致无限循环或错误失败。以下示例使用ItemWriterAdapter
:
XML Configuration
<bean id="itemWriter" class="org.springframework.batch.item.adapter.ItemWriterAdapter">
<property name="targetObject" ref="fooService" />
<property name="targetMethod" value="processFoo" />
</bean>
<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />
Java Configuration
@Bean
public ItemWriterAdapter itemWriter() {
ItemWriterAdapter writer = new ItemWriterAdapter();
writer.setTargetObject(fooService());
writer.setTargetMethod("processFoo");
return writer;
}
@Bean
public FooService fooService() {
return new FooService();
}
1.12. 验证 Importing
在本章的过程中,讨论了多种解析 Importing 的方法。如果每个主要实现的格式都不正确,则将引发异常。如果缺少一系列数据,FixedLengthTokenizer
将引发异常。同样,尝试访问RowMapper
或FieldSetMapper
的索引不存在或格式与预期的索引不同,则将引发异常。所有这些类型的异常都在read
返回之前引发。但是,它们不能解决退回商品是否有效的问题。例如,如果字段之一是年龄,则显然不能为负。它可以正确解析,因为它存在并且是一个数字,但是不会引起异常。由于已经有大量的验证框架,因此 Spring Batch 不会尝试提供其他验证框架。而是,它提供了一个名为Validator
的简单接口,可以通过任意数量的框架来实现,如以下接口定义所示:
public interface Validator<T> {
void validate(T value) throws ValidationException;
}
约定是,如果对象无效,则validate
方法将引发异常,如果有效,则正常返回。 Spring Batch 提供了开箱即用的ValidatingItemProcessor
,如以下 bean 定义所示:
XML Configuration
<bean class="org.springframework.batch.item.validator.ValidatingItemProcessor">
<property name="validator" ref="validator" />
</bean>
<bean id="validator" class="org.springframework.batch.item.validator.SpringValidator">
<property name="validator">
<bean class="org.springframework.batch.sample.domain.trade.internal.validator.TradeValidator"/>
</property>
</bean>
Java Configuration
@Bean
public ValidatingItemProcessor itemProcessor() {
ValidatingItemProcessor processor = new ValidatingItemProcessor();
processor.setValidator(validator());
return processor;
}
@Bean
public SpringValidator validator() {
SpringValidator validator = new SpringValidator();
validator.setValidator(new TradeValidator());
return validator;
}
您还可以使用BeanValidatingItemProcessor
来验证使用 Bean Validation API(JSR-303)注解进行注解的 Item。例如,给定以下类型Person
:
class Person {
@NotEmpty
private String name;
public Person(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
您可以通过在应用程序上下文中声明`` bean 来验证 Item,并在面向块的步骤中将其注册为处理器:
@Bean
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
beanValidatingItemProcessor.setFilter(true);
return beanValidatingItemProcessor;
}
1.13. 防止国家持续存在
默认情况下,所有ItemReader
和ItemWriter
实现都在提交前将其当前状态存储在ExecutionContext
中。但是,这可能并不总是所需的行为。例如,许多开发人员选择使用过程指示器来使其数据库读取器“可重新运行”。在 Importing 数据中添加了一个额外的列,以指示是否已对其进行处理。当读取(或写入)特定记录时,已处理标志从false
翻转到true
。然后,SQL 语句可以在where
子句中包含一个额外的语句,例如where PROCESSED_IND = false
,从而确保在重新启动的情况下仅返回未处理的记录。在这种情况下,最好不要存储任何状态,例如当前行号,因为它与重新启动无关。因此,所有读取器和写入器都包含'saveState'属性,如以下示例所示:
XML Configuration
<bean id="playerSummarizationSource" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource" />
<property name="rowMapper">
<bean class="org.springframework.batch.sample.PlayerSummaryMapper" />
</property>
<property name="saveState" value="false" />
<property name="sql">
<value>
SELECT games.player_id, games.year_no, SUM(COMPLETES),
SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),
SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),
SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)
from games, players where players.player_id =
games.player_id group by games.player_id, games.year_no
</value>
</property>
</bean>
Java Configuration
@Bean
public JdbcCursorItemReader playerSummarizationSource(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<PlayerSummary>()
.dataSource(dataSource)
.rowMapper(new PlayerSummaryMapper())
.saveState(false)
.sql("SELECT games.player_id, games.year_no, SUM(COMPLETES),"
+ "SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),"
+ "SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),"
+ "SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)"
+ "from games, players where players.player_id ="
+ "games.player_id group by games.player_id, games.year_no")
.build();
}
上面配置的ItemReader
不会对其参与的执行在ExecutionContext
中进行任何 Importing。
1.14. 创建自定义 ItemReaders 和 ItemWriters
到目前为止,本章已经讨论了 Spring Batch 中读写的基本契约以及这样做的一些常见实现。但是,这些都是相当通用的,开箱即用的实现可能无法涵盖许多潜在的场景。本节通过一个简单的示例说明如何创建自定义ItemReader
和ItemWriter
实现并正确实现其 Contract。 ItemReader
还实现了ItemStream
,以说明如何使读取器或写入器可重启。
1.14.1. 自定义 ItemReader 示例
就此示例而言,我们创建一个简单的ItemReader
实现,该实现从提供的列表中读取。我们首先实现ItemReader
,read
方法的最基本协定,如以下代码所示:
public class CustomItemReader<T> implements ItemReader<T>{
List<T> items;
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
NonTransientResourceException, ParseException {
if (!items.isEmpty()) {
return items.remove(0);
}
return null;
}
}
上一类获取 Item 列表,然后一次返回一个,将其从列表中删除。当列表为空时,它将返回null
,从而满足ItemReader
的最基本要求,如以下测试代码所示:
List<String> items = new ArrayList<String>();
items.add("1");
items.add("2");
items.add("3");
ItemReader itemReader = new CustomItemReader<String>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
使 ItemReader 可重新启动
最后的挑战是使ItemReader
可重新启动。当前,如果处理中断并再次开始,则ItemReader
必须从头开始。这实际上在许多情况下都是有效的,但有时最好将批处理作业从中断处重新开始。关键的区别通常是 Reader 是有状态的还是 Stateless 的。Stateless 读取器无需担心可重新启动性,但是有状态读取器必须尝试在重新启动时重新构造其最后一个已知状态。因此,我们建议您尽可能使自定义 Reader 保持 Stateless,因此您不必担心可重新启动性。
如果确实需要存储状态,则应使用ItemStream
接口:
public class CustomItemReader<T> implements ItemReader<T>, ItemStream {
List<T> items;
int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (currentIndex < items.size()) {
return items.get(currentIndex++);
}
return null;
}
public void open(ExecutionContext executionContext) throws ItemStreamException {
if(executionContext.containsKey(CURRENT_INDEX)){
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
}
else{
currentIndex = 0;
}
}
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}
public void close() throws ItemStreamException {}
}
在每次调用ItemStream
update
方法时,ItemReader
的当前索引都使用键“ current.index”存储在提供的ExecutionContext
中。调用ItemStream
open
方法时,将检查ExecutionContext
以查 Watch 其是否包含具有该键的条目。如果找到该键,则当前索引将移动到该位置。这是一个非常琐碎的示例,但仍然符合一般约定:
ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);
List<String> items = new ArrayList<String>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<String>(items);
((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());
大多数ItemReaders
具有更复杂的重启逻辑。例如,JdbcCursorItemReader
将最后处理的行的行 ID 存储在游标中。
还值得注意的是,ExecutionContext
中使用的密钥不应太小。这是因为Step
中的所有ItemStreams
使用相同的ExecutionContext
。在大多数情况下,只需在键之前加上类名就足以保证唯一性。但是,在极少数情况下,在同一步骤中使用了两个相同类型的ItemStream
(如果需要两个文件进行输出,可能会发生这种情况),则需要一个更唯一的名称。因此,许多 Spring Batch ItemReader
和ItemWriter
实现都具有setName()
属性,该属性可以覆盖此键名。
1.14.2. 自定义 ItemWriter 示例
实现自定义ItemWriter
的方式与上述ItemReader
的示例在很多方面相似,但是在足以保证其自己的示例方面的差异也很大。但是,添加可重新启动性本质上是相同的,因此在本示例中未涉及。与ItemReader
示例一样,使用List
是为了使示例尽可能简单:
public class CustomItemWriter<T> implements ItemWriter<T> {
List<T> output = TransactionAwareProxyFactory.createTransactionalList();
public void write(List<? extends T> items) throws Exception {
output.addAll(items);
}
public List<T> getOutput() {
return output;
}
}
使 ItemWriter 可重新启动
为了使ItemWriter
可重新启动,我们将遵循与ItemReader
相同的过程,添加并实现ItemStream
接口以同步执行上下文。在示例中,我们可能必须计算已处理的 Item 数,并将其添加为页脚记录。如果需要这样做,则可以在ItemWriter
中实现ItemStream
,以便在重新打开流时从执行上下文重构计数器。
在许多实际情况下,自定义ItemWriters
还会委派给另一个本身可重新启动的编写器(例如,当写入文件时),或者它写入事务性资源,因此不需要重新启动,因为它是 Stateless 的。当您有状态 Writer 时,您可能应该确保同时实现_和ItemWriter
。还请记住,Writer 的 Client 端需要注意ItemStream
,因此您可能需要将其注册为流,并在配置中。
1.15. Item 读取器和写入器实现
在本节中,我们将向您介绍之前各节中未讨论的 Reader 和 Writer。
1.15.1. Decorators
在某些情况下,用户需要将特定的行为附加到预先存在的ItemReader
。 Spring Batch 提供了一些现成的装饰器,它们可以为ItemReader
和ItemWriter
实现添加其他行为。
Spring Batch 包含以下装饰器:
SynchronizedItemStreamReader
当使用不是线程安全的ItemReader
时,Spring Batch 提供了SynchronizedItemStreamReader
装饰器,可用于使ItemReader
线程安全。 Spring Batch 提供了SynchronizedItemStreamReaderBuilder
来构造SynchronizedItemStreamReader
的实例。
SingleItemPeekableItemReader
Spring Batch 包含一个装饰器,该装饰器向ItemReader
添加了 peek 方法。这种窥视方法使用户可以窥视前面的一项。重复调用 peek 会返回相同的 Item,这是从read
方法返回的下一个 Item。 Spring Batch 提供了SingleItemPeekableItemReaderBuilder
来构造SingleItemPeekableItemReader
的实例。
Note
SingleItemPeekableItemReader 的 peek 方法不是线程安全的,因为不可能在多个线程中使用 peek。偷 Watch 的线程中只有一个会在下一次调用 read 中得到该 Item。
MultiResourceItemWriter
当当前资源中写入的 Item 数超过itemCountLimitPerResource
时,MultiResourceItemWriter
包装ResourceAwareItemWriterItemStream
并创建一个新的输出资源。 Spring Batch 提供了MultiResourceItemWriterBuilder
来构造MultiResourceItemWriter
的实例。
ClassifierCompositeItemWriter
ClassifierCompositeItemWriter
根据通过提供的Classifier
实现的 Router 模式,为每个 Item 调用ItemWriter
实现的集合之一。如果所有委托都是线程安全的,则实现是线程安全的。 Spring Batch 提供了ClassifierCompositeItemWriterBuilder
来构造ClassifierCompositeItemWriter
的实例。
ClassifierCompositeItemProcessor
ClassifierCompositeItemProcessor
是ItemProcessor
,它基于通过提供的Classifier
实现的 Router 模式调用ItemProcessor
实现的集合之一。 Spring Batch 提供了ClassifierCompositeItemProcessorBuilder
来构造ClassifierCompositeItemProcessor
的实例。
1.15.2. 消息 Reader 和 Writer
Spring Batch 为常用消息传递系统提供以下读取器和写入器:
AmqpItemReader
AmqpItemReader
是使用AmqpTemplate
来接收或转换来自交换机的消息的ItemReader
。 Spring Batch 提供了AmqpItemReaderBuilder
来构造AmqpItemReader
的实例。
AmqpItemWriter
AmqpItemWriter
是使用AmqpTemplate
将消息发送到 AMQP 交换机的ItemWriter
。如果未在提供的AmqpTemplate
中指定名称,则消息将发送到无名交换机。 Spring Batch 提供了AmqpItemWriterBuilder
来构造AmqpItemWriter
的实例。
JmsItemReader
JmsItemReader
是使用JmsTemplate
的 JMS 的ItemReader
。模板应具有默认目标,该目标用于提供read()
方法的 Item。 Spring Batch 提供了JmsItemReaderBuilder
来构造JmsItemReader
的实例。
JmsItemWriter
JmsItemWriter
是使用JmsTemplate
的 JMS 的ItemWriter
。模板应具有默认目的地,该目的地用于发送write(List)
中的 Item。 Spring Batch 提供了JmsItemWriterBuilder
来构造JmsItemWriter
的实例。
1.15.3. 数据库 Reader
Spring Batch 提供以下数据库 Reader:
Neo4jItemReader
Neo4jItemReader
是ItemReader
,它通过使用分页技术从图形数据库 Neo4j 中读取对象。 Spring Batch 提供了Neo4jItemReaderBuilder
来构造Neo4jItemReader
的实例。
MongoItemReader
MongoItemReader
是ItemReader
,它通过使用分页技术从 MongoDB 中读取文档。 Spring Batch 提供了MongoItemReaderBuilder
来构造MongoItemReader
的实例。
HibernateCursorItemReader
HibernateCursorItemReader
是ItemStreamReader
,用于读取基于 Hibernate 构建的数据库记录。它执行 HQL 查询,然后在初始化时在调用read()
方法时对结果集进行迭代,依次返回与当前行相对应的对象。 Spring Batch 提供了HibernateCursorItemReaderBuilder
来构造HibernateCursorItemReader
的实例。
HibernatePagingItemReader
HibernatePagingItemReader
是ItemReader
,用于读取构建在 Hibernate 之上的数据库记录,并且一次最多只能读取固定数量的 Item。 Spring Batch 提供了HibernatePagingItemReaderBuilder
来构造HibernatePagingItemReader
的实例。
RepositoryItemReader
RepositoryItemReader
是使用PagingAndSortingRepository
读取记录的ItemReader
。 Spring Batch 提供了RepositoryItemReaderBuilder
来构造RepositoryItemReader
的实例。
1.15.4. 数据库 Writer
Spring Batch 提供以下数据库编写器:
Neo4jItemWriter
Neo4jItemWriter
是ItemWriter
实现,可写入 Neo4j 数据库。 Spring Batch 提供了Neo4jItemWriterBuilder
来构造Neo4jItemWriter
的实例。
MongoItemWriter
MongoItemWriter
是ItemWriter
实现,它使用 Spring Data 的MongoOperations
实现写入 MongoDB 存储。 Spring Batch 提供了MongoItemWriterBuilder
来构造MongoItemWriter
的实例。
RepositoryItemWriter
RepositoryItemWriter
是 Spring Data 中CrudRepository
的ItemWriter
包装器。 Spring Batch 提供了RepositoryItemWriterBuilder
来构造RepositoryItemWriter
的实例。
HibernateItemWriter
HibernateItemWriter
是ItemWriter
,它使用 Hibernate 会话保存或更新不属于当前 Hibernate 会话的实体。 Spring Batch 提供了HibernateItemWriterBuilder
来构造HibernateItemWriter
的实例。
JdbcBatchItemWriter
JdbcBatchItemWriter
是ItemWriter
,它使用NamedParameterJdbcTemplate
中的批处理功能为提供的所有 Item 执行一批语句。 Spring Batch 提供了JdbcBatchItemWriterBuilder
来构造JdbcBatchItemWriter
的实例。
JpaItemWriter
JpaItemWriter
是使用 JPA EntityManagerFactory
合并不属于持久性上下文一部分的任何实体的ItemWriter
。 Spring Batch 提供了JpaItemWriterBuilder
来构造JpaItemWriter
的实例。
GemfireItemWriter
GemfireItemWriter
是使用GemfireTemplate
并将GemfireTemplate
存储在 GemFire 中作为键/值对的ItemWriter
。 Spring Batch 提供了GemfireItemWriterBuilder
来构造GemfireItemWriter
的实例。
1.15.5. 专业 Reader
Spring Batch 提供以下专业 Reader:
LdifReader
LdifReader
从Resource
读取 LDIF(LDAP 数据交换格式)记录,对其进行解析,然后为每个执行的read
返回LdapAttribute
对象。 Spring Batch 提供了LdifReaderBuilder
来构造LdifReader
的实例。
MappingLdifReader
MappingLdifReader
从Resource
读取 LDIF(LDAP 数据交换格式)记录,对其进行解析,然后将每个 LDIF 记录 Map 到 POJO(普通的旧 Java 对象)。每次读取都会返回一个 POJO。 Spring Batch 提供了MappingLdifReaderBuilder
来构造MappingLdifReader
的实例。
1.15.6. 专业 Writer
Spring Batch 提供以下专业 Writer:
SimpleMailMessageItemWriter
SimpleMailMessageItemWriter
是可以发送邮件的ItemWriter
。它将消息的实际发送委派给MailSender
的实例。 Spring Batch 提供了SimpleMailMessageItemWriterBuilder
来构造SimpleMailMessageItemWriter
的实例。
1.15.7. 专用处理器
Spring Batch 提供以下专用处理器:
ScriptItemProcessor
ScriptItemProcessor
是ItemProcessor
,它将当前 Item 传递给提供的脚本以进行处理,并且脚本的结果由处理器返回。 Spring Batch 提供了ScriptItemProcessorBuilder
来构造ScriptItemProcessor
的实例。