1. ItemReaders 和 ItemWriters


XML

Java

所有批处理都可以用最简单的形式描述为读取大量数据,执行某种类型的计算或转换,以及将结果写出来。 Spring Batch 提供了三个 key 接口来帮助执行批量读取和写入:ItemReaderItemProcessorItemWriter

1.1. ItemReader

虽然是一个简单的概念,ItemReader是从许多不同类型的输入提供数据的手段。最一般的例子包括:

  • 平面文件:Flat-file item readers 从平面文件中读取数据的 lines,该文件通常描述具有由文件中的固定位置定义的数据字段或由某些特殊字符(例如逗号)分隔的数据字段的记录。

  • XML:XML ItemReaders process XML 独立于用于解析,映射和验证 objects 的技术。输入数据允许针对 XSD schema 验证 XML 文件。

  • 数据库:访问数据库资源以返回 return 结果集,该结果集可以映射到 objects 进行处理。默认的 SQL ItemReader implementations 调用RowMapper到 return objects,如果需要重新启动,则跟踪当前行,store 基本统计信息,并提供稍后解释的一些 transaction 增强功能。

还有更多的可能性,但我们将重点放在本章的基本内容上。可以在附录 A.中找到所有可用ItemReader __mplement 的完整列表。

ItemReader是通用输入操作的基本接口,如以下接口定义所示:

public interface ItemReader<T> {

    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;

}

read方法定义ItemReader的最重要的 contract。如果没有剩余项目,则调用它将返回一个 item 或null。 item 可能表示文件中的 line,数据库中的行或 XML 文件中的元素。通常期望这些映射到可用的 domain object(例如TradeFoo或其他),但 contract 中没有要求这样做。

预计ItemReader接口的_implement 只是前向的。但是,如果底层资源是 transactional(例如 JMS 队列),则在回滚方案中,调用read可能会在后续 calls 上返回相同的逻辑 item。还值得注意的是缺少 process 的项不会导致抛出 exception。对于 example,使用返回 0 结果的查询配置的数据库ItemReader在第一次调用 read 时返回null

1.2. ItemWriter

ItemWriter在功能上类似于ItemReader但具有反向操作。资源仍然需要定位,打开和关闭,但它们的不同之处在于ItemWriter写出而不是读入。在数据库或队列的情况下,这些操作可能是插入,更新或发送。输出序列化的格式特定于每个批 job。

ItemReader一样,ItemWriter是一个相当通用的接口,如以下接口定义所示:

public interface ItemWriter<T> {

    void write(List<? extends T> items) throws Exception;

}

ItemReader上的read一样,write提供了ItemWriter的基本 contract。它会尝试写出以 long 打开时传入的项目列表。因为通常期望将项目“批处理”到一个块然后输出,所以接口接受项目列表,而不是 item 本身。在写出列表之后,可以在从 write 方法返回之前执行任何可能需要的刷新。例如,如果写入 Hibernate DAO,则可以进行多次 calls 写入,每个 item 一次。然后 writer 可以在 hibernate session 上调用flush,然后返回。

1.3. ItemProcessor 中

ItemReaderItemWriter接口对于它们的特定任务都非常有用,但是如果你想在写入之前插入业务逻辑怎么办?读取和写入的一个选项是使用复合 pattern:创建包含另一个ItemWriterItemWriter或包含另一个ItemReaderItemReader。以下 code 显示了一个 example:

public class CompositeItemWriter<T> implements ItemWriter<T> {

    ItemWriter<T> itemWriter;

    public CompositeItemWriter(ItemWriter<T> itemWriter) {
        this.itemWriter = itemWriter;
    }

    public void write(List<? extends T> items) throws Exception {
        //Add business logic here
       itemWriter.write(items);
    }

    public void setDelegate(ItemWriter<T> itemWriter){
        this.itemWriter = itemWriter;
    }
}

前面的 class 包含在提供一些业务逻辑后委托给它的另一个ItemWriter。这个 pattern 也可以很容易地用于ItemReader,也许是为了根据 main ItemReader提供的输入获得更多的 reference 数据。如果您需要自己控制对write的调用,这也很有用。但是,如果您只想在实际写入之前“转换”传入的 item 进行写入,则不需要write自己。你可以修改 item。对于此场景,Spring Batch 提供ItemProcessor接口,如以下接口定义所示:

public interface ItemProcessor<I, O> {

    O process(I item) throws Exception;
}

ItemProcessor很简单。给定一个 object,转换它并 return 另一个。提供的 object 可能是也可能不是同一类型。关键是业务逻辑可以在 process 中应用,完全取决于开发人员创建该逻辑。 ItemProcessor可以直接连接到 step。对于 example,假设ItemReader提供类型Foo的 class,并且在写出之前需要将其转换为Bar类型。以下 example 显示了执行转换的ItemProcessor

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class FooProcessor implements ItemProcessor<Foo,Bar>{
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarWriter implements ItemWriter<Bar>{
    public void write(List<? extends Bar> bars) throws Exception {
        //write bars
    }
}

在前面的 example 中,有一个 class Foo,一个 class Bar和一个 class FooProcessor,它们遵循ItemProcessor接口。转换很简单,但任何类型的转换都可以在这里完成。 BarWriter写入Bar objects,如果提供任何其他类型则抛出 exception。类似地,如果提供了除之外的任何内容,将抛出 exception。然后可以将FooProcessor注入Step,如下面的示例所示:

XML Configuration

<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="fooProcessor" writer="barWriter"
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>

Java Configuration

@Bean
public Job ioSampleJob() {
        return this.jobBuilderFactory.get("ioSampleJOb")
                                .start(step1())
                                .end()
                                .build();
}

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(fooReader())
                                .processor(fooProcessor())
                                .writer(barWriter())
                                .build();
}

1.3.1. 链接 ItemProcessors

在许多情况下执行单个转换很有用,但如果要将多个ItemProcessor 实现“链接”在一起会怎么样?这可以使用前面提到的复合 pattern 来完成。要更新先前的单个转换,example,Foo将转换为Bar,转换为Foobar并写出,如下面的 example 所示:

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class Foobar {
    public Foobar(Bar bar) {}
}

public class FooProcessor implements ItemProcessor<Foo,Bar>{
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarProcessor implements ItemProcessor<Bar,Foobar>{
    public Foobar process(Bar bar) throws Exception {
        return new Foobar(bar);
    }
}

public class FoobarWriter implements ItemWriter<Foobar>{
    public void write(List<? extends Foobar> items) throws Exception {
        //write items
    }
}

FooProcessorBarProcessor可以“链接”在一起以得到Foobar,如下面的 example 所示:

CompositeItemProcessor<Foo,Foobar> compositeProcessor =
                                      new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooTransformer());
itemProcessors.add(new BarTransformer());
compositeProcessor.setDelegates(itemProcessors);

与前面的示例一样,复合处理器可以配置为Step

XML Configuration

<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="compositeItemProcessor" writer="foobarWriter"
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>

<bean id="compositeItemProcessor"
      class="org.springframework.batch.item.support.CompositeItemProcessor">
    <property name="delegates">
        <list>
            <bean class="..FooProcessor" />
            <bean class="..BarProcessor" />
        </list>
    </property>
</bean>

Java Configuration

@Bean
public Job ioSampleJob() {
        return this.jobBuilderFactory.get("ioSampleJob")
                                .start(step1())
                                .end()
                                .build();
}

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(fooReader())
                                .processor(compositeProcessor())
                                .writer(foobarWriter())
                                .build();
}

@Bean
public CompositeItemProcessor compositeProcessor() {
        List<ItemProcessor> delegates = new ArrayList<>(2);
        delegates.add(new FooProcessor());
        delegates.add(new BarProcessor());

        CompositeItemProcessor processor = new CompositeItemProcessor();

        processor.setDelegates(delegates);

        return processor;
}

1.3.2. 过滤记录

item 处理器的一个典型用途是在将记录传递给ItemWriter之前过滤掉记录。过滤是一种与跳过不同的操作。跳过表示 record 无效,而过滤只表示不应写入 record。

例如,考虑一个批处理 job,它读取包含三种不同类型记录的文件:insert 记录,要更新的记录和要删除的记录。如果系统不支持 record 删除,那么我们不希望向ItemWriter发送任何“删除”记录。但是,由于这些记录实际上并不是坏记录,我们希望将它们过滤掉而不是跳过它们。因此,ItemWriter只会收到“insert”和“update”记录。

要过滤 record,您可以从ItemProcessor return null。 framework 检测到结果为null,并避免将 item 添加到传递给ItemWriter的记录列表中。像往常一样,从ItemProcessor抛出的 exception 导致跳过。

1.3.3. 容错

回滚块时,可以重新处理在读取期间缓存的项目。如果 step 配置为容错(通常通过使用跳过或重试处理),则任何使用的ItemProcessor都应以幂等的方式实现。通常,这将包括对ItemProcessor的输入 item 不执行任何更改,并且仅更新作为结果的实例。

1.4. ItemStream

ItemReadersItemWriters都很好地服务于他们的个人目的,但是他们两个都需要另外一个界面。通常,作为批处理 job 范围的一部分,需要打开,关闭 readers 和 writers,并且需要一种持久化 state 的机制。 ItemStream接口用于此目的,如以下 example 所示:

public interface ItemStream {

    void open(ExecutionContext executionContext) throws ItemStreamException;

    void update(ExecutionContext executionContext) throws ItemStreamException;

    void close() throws ItemStreamException;
}

在描述每种方法之前,我们应该提到ExecutionContext。 的_Cl也实现ItemStream的客户端应该在 calls 之前调用openread,在 order 中打开任何资源,如 files 或获取连接。类似的限制适用于实现ItemStreamItemWriter。如第 2 章所述,如果在ExecutionContext中找到预期数据,则可以使用它在初始 state 以外的位置启动ItemReaderItemWriter。相反,调用close以确保在打开期间分配的任何资源都安全释放。主要调用update以确保当前持有的任何 state 被加载到提供的ExecutionContext中。在提交之前调用此方法,以确保在提交之前当前 state 持久保存在数据库中。

ItemStream的 client 是Step(来自 Spring Batch Core)的特殊情况下,为每个 StepExecution 创建一个ExecutionContext,以允许用户存储特定执行的 state,并期望在返回时返回它。 JobInstance再次启动。对于熟悉 Quartz 的人来说,语义与 Quartz JobDataMap非常相似。

1.5. 委托 Pattern 并注册 Step

请注意,CompositeItemWriter是委托 pattern 的 example,在 Spring Batch 中是 common。委托自己可以实现回调接口,例如StepListener。如果他们这样做,并且如果它们与 Spring Batch Core 一起用作JobStep的一部分,那么它们几乎肯定需要用Step手动注册。直接连接到Step的 reader,writer 或处理器如果实现ItemStreamStepListener接口,将自动注册。但是,因为Step不知道委托,所以需要将它们作为 listeners 或 stream(或者两者都适当)注入,如下面的示例所示:

XML Configuration

<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="fooProcessor" writer="compositeItemWriter"
                   commit-interval="2">
                <streams>
                    <stream ref="barWriter" />
                </streams>
            </chunk>
        </tasklet>
    </step>
</job>

<bean id="compositeItemWriter" class="...CustomCompositeItemWriter">
    <property name="delegate" ref="barWriter" />
</bean>

<bean id="barWriter" class="...BarWriter" />

Java Configuration

@Bean
public Job ioSampleJob() {
        return this.jobBuilderFactory.get("ioSampleJob")
                                .start(step1())
                                .end()
                                .build();
}

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(fooReader())
                                .processor(fooProcessor())
                                .writer(compositeItemWriter())
                                .stream(barWriter())
                                .build();
}

@Bean
public CustomCompositeItemWriter compositeItemWriter() {

        CustomCompositeItemWriter writer = new CustomCompositeItemWriter();

        writer.setDelegate(barWriter());

        return writer;
}

@Bean
public BarWriter barWriter() {
        return new BarWriter();
}

1.6. 平 Files

用于交换批量数据的最常见机制之一一直是平面文件。与 XML 有着一致的标准来定义它的结构(XSD),任何阅读平面文件的人都必须在 time 之前准确理解文件的结构。通常,所有 flat files 都分为两种类型:分隔和固定长度。分隔的 files 是其中字段由分隔符分隔的那些文件,例如逗号。固定长度 files 具有设定长度的字段。

1.6.1. FieldSet

在 Spring Batch 中使用 flat files 时,无论是输入还是输出,最重要的类之一是FieldSet。许多体系结构和 libraries 包含帮助您从文件读入的抽象,但它们通常_ret @这真的只能让你到达那里。 FieldSet是 Spring Batch 的抽象,用于从文件资源启用 binding 字段。它允许开发人员使用文件输入,就像使用数据库输入一样。 FieldSet在概念上类似于 JDBC ResultSetFieldSet只需要一个参数:一个String array 的标记。或者,您也可以配置字段的名称,以便在ResultSet之后可以通过索引或 name 访问字段,如下面的 example 所示:

String[] tokens = new String[]{"foo", "1", "true"};
FieldSet fs = new DefaultFieldSet(tokens);
String name = fs.readString(0);
int value = fs.readInt(1);
boolean booleanValue = fs.readBoolean(2);

FieldSet接口上还有更多选项,例如Date,long,BigDecimal等。 FieldSet的最大优点是它提供了对平面文件输入的一致解析。而不是每个批处理 job 以可能意外的方式进行不同的解析,它在处理由格式 exception 引起的错误或进行简单的数据转换时都是一致的。

1.6.2. FlatFileItemReader

平面文件是包含最多 two-dimensional(表格)数据的任何类型的文件。在 Spring Batch framework 中读取平 files 是由 class FlatFileItemReader促进的,它提供了读取和解析 flat files 的基本功能。 FlatFileItemReader的两个最重要的必需依赖项是ResourceLineMapper。在下一节中将更多地探讨LineMapper接口。资源 property 表示 Spring Core Resource。可以在Spring Framework,第 5 章。资源中找到解释如何创建此类型 beans 的文档。因此,除了显示以下简单的 example 之外,本指南不会详细介绍 creating Resource objects:

Resource resource = new FileSystemResource("resources/trades.csv");

在复杂的批处理环境中,目录结构通常由 EAI 基础结构管理,其中建立用于外部接口的 drop zones,用于将 files 从 FTP 位置移动到批处理位置,反之亦然。文件移动实用程序超出了 Spring Batch architecture 的范围,但批处理 job 流包含文件移动实用程序作为 job 流中的步骤并不罕见。批处理 architecture 只需知道如何找到要处理的 files。 Spring Batch 开始从这个起点将数据输入管道的 process。但是,Spring Integration提供了许多这类服务。

FlatFileItemReader中的其他 properties 允许您进一步指定数据的解释方式,如下面的 table 中所述:

属性类型描述
评论String [169]指定指示 comment 行的 line 前缀。
编码指定要使用的文本编码。默认值为Charset.defaultCharset()的 value。
lineMapperLineMapperString转换为表示 item 的Object
linesToSkipINT要在文件顶部忽略的 lines 数。
recordSeparatorPolicyRecordSeparatorPolicy用于确定 line 结尾的位置,并且如果在引用的 string 中,则执行_继续 line 结束。
资源Resource要从中读取的资源。
skippedLinesCallbackLineCallbackHandler传递要跳过的文件中 lines 的原始 line 内容的接口。如果linesToSkip设置为 2,则调用此接口两次。
严格boolean在严格模式下,如果输入资源不存在,reader 将在ExecutionContext上抛出 exception。否则,它会记录问题并继续。
LineMapper

RowMapper一样,它采用 low-level 构造如ResultSet并返回Object,平面文件处理需要相同的构造将String line 转换为Object,如以下接口定义所示:

public interface LineMapper<T> {

    T mapLine(String line, int lineNumber) throws Exception;

}

基本的 contract 是,给定当前 line 和与之关联的 line 数,mapper 应该 return 一个结果 domain object。这类似于RowMapper,因为每个 line 与其 line 编号相关联,就像ResultSet中的每一行都与其行号相关联一样。这允许将 line 编号绑定到生成的 domain object 以进行身份比较或获取更多信息 logging。然而,与RowMapper不同,LineMapper被赋予一个原始 line,如上所述,它只会让你到达中途。必须将 line 标记为FieldSet,然后可以将其映射到 object,如本文档后面所述。

LineTokenizer

将输入的 line 转换为FieldSet的抽象是必要的,因为可能有许多格式的平面文件数据需要转换为FieldSet。在 Spring Batch 中,此接口是LineTokenizer

public interface LineTokenizer {

    FieldSet tokenize(String line);

}

的 contract 是这样的,给定一个 line 输入(理论上String可以包含多个 line),返回表示 line 的FieldSet。这个FieldSet然后可以传递给FieldSetMapper。 Spring Batch 包含以下LineTokenizer implementations:

  • DelimitedLineTokenizer:用于 files,其中 record 中的字段由分隔符分隔。最常见的分隔符是逗号,但通常也使用管道或分号。

  • FixedLengthTokenizer:用于 files,其中 record 中的字段均为“固定宽度”。必须为每个 record 类型定义每个字段的宽度。

  • PatternMatchingCompositeLineTokenizer:通过检查 pattern 来确定应该在特定 line 上使用的 tokenizer 列表中的LineTokenizer

FieldSetMapper

FieldSetMapper接口定义了一个方法mapFieldSet,它将FieldSet object 和 maps 的内容带到 object。这个 object 可能是自定义 DTO,domain object 或 array,具体取决于 job 的需要。 FieldSetMapperLineTokenizer结合使用,将数据的 line 转换为所需类型的 object,如以下接口定义所示:

public interface FieldSetMapper<T> {

    T mapFieldSet(FieldSet fieldSet) throws BindException;

}

使用的 pattern 与JdbcTemplate使用的RowMapper相同。

DefaultLineMapper

现在已经定义了用于读取 flat files 的基本接口,很明显需要三个基本步骤:

  • 从文件中读取一个 line。

  • String line 传递给LineTokenizer#tokenize()方法以检索FieldSet

  • 将从标记化返回的FieldSet传递给FieldSetMapper,从ItemReader#read()方法返回结果。

上面描述的两个接口代表两个独立的任务:将 line 转换为FieldSet并将FieldSet映射到 domainobject。因为LineTokenizer的输入与LineMapper(line)的输入匹配,并且FieldSetMapper的输出与LineMapper的输出匹配,所以提供了同时使用LineTokenizerFieldSetMapper的默认 implementation。 DefaultLineMapper,如以下 class 定义所示,表示大多数用户需要的行为:

public class DefaultLineMapper<T> implements LineMapper<>, InitializingBean {

    private LineTokenizer tokenizer;

    private FieldSetMapper<T> fieldSetMapper;

    public T mapLine(String line, int lineNumber) throws Exception {
        return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
    }

    public void setLineTokenizer(LineTokenizer tokenizer) {
        this.tokenizer = tokenizer;
    }

    public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) {
        this.fieldSetMapper = fieldSetMapper;
    }
}

上述功能是在默认的 implementation 中提供的,而不是内置在 reader 本身中(就像以前版本的 framework 中所做的那样),以便用户在控制解析 process 时具有更大的灵活性,尤其是在需要访问原始 line 时。

简单分隔文件读取示例

以下 example 说明了如何使用实际的 domain 方案读取平面文件。这个特定批 job 从以下文件中读取足球运动员:

ID,lastName,firstName,position,birthYear,debutYear
"AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996",
"AbduRa00,Abdullah,Rabih,rb,1975,1999",
"AberWa00,Abercrombie,Walter,rb,1959,1982",
"AbraDa00,Abramowicz,Danny,wr,1945,1967",
"AdamBo00,Adams,Bob,te,1946,1969",
"AdamCh00,Adams,Charlie,wr,1979,2003"

此文件的内容映射到以下Player domain object:

public class Player implements Serializable {

    private String ID;
    private String lastName;
    private String firstName;
    private String position;
    private int birthYear;
    private int debutYear;

    public String toString() {
        return "PLAYER:ID=" + ID + ",Last Name=" + lastName +
            ",First Name=" + firstName + ",Position=" + position +
            ",Birth Year=" + birthYear + ",DebutYear=" +
            debutYear;
    }

    // setters and getters...
}

要将FieldSet map 映射到Player object,需要定义返回玩家的FieldSetMapper,如下面的示例所示:

protected static class PlayerFieldSetMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fieldSet) {
        Player player = new Player();

        player.setID(fieldSet.readString(0));
        player.setLastName(fieldSet.readString(1));
        player.setFirstName(fieldSet.readString(2));
        player.setPosition(fieldSet.readString(3));
        player.setBirthYear(fieldSet.readInt(4));
        player.setDebutYear(fieldSet.readInt(5));

        return player;
    }
}

然后可以通过正确构造FlatFileItemReader并调用read来读取该文件,如下面的示例所示:

FlatFileItemReader<Player> itemReader = new FlatFileItemReader<Player>();
itemReader.setResource(new FileSystemResource("resources/players.csv"));
//DelimitedLineTokenizer defaults to comma as its delimiter
DefaultLineMapper<Player> lineMapper = new DefaultLineMapper<Player>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new PlayerFieldSetMapper());
itemReader.setLineMapper(lineMapper);
itemReader.open(new ExecutionContext());
Player player = itemReader.read();

每次调用read都会从文件中的每个 line 返回一个新的Player object。到达文件末尾时,返回null

按 Name 映射字段

DelimitedLineTokenizerFixedLengthTokenizer都允许一个额外的功能,它在 function 中与 JDBC ResultSet类似。可以将这些字段的名称注入到这些LineTokenizer __mplement 中的任何一个中,以增加映射 function 的可读性。首先,将平面文件中所有字段的列名注入到 tokenizer 中,如下面的示例所示:

tokenizer.setNames(new String[] {"ID", "lastName","firstName","position","birthYear","debutYear"});

FieldSetMapper可以使用以下信息:

public class PlayerMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fs) {

       if(fs == null){
           return null;
       }

       Player player = new Player();
       player.setID(fs.readString("ID"));
       player.setLastName(fs.readString("lastName"));
       player.setFirstName(fs.readString("firstName"));
       player.setPosition(fs.readString("position"));
       player.setDebutYear(fs.readInt("debutYear"));
       player.setBirthYear(fs.readInt("birthYear"));

       return player;
   }
}
自动将 FieldSet 设置为 Domain Objects

对于许多人来说,必须编写特定的FieldSetMapper与为JdbcTemplate编写特定的RowMapper一样繁琐。 Spring Batch 通过使用 JavaBean 规范通过在 object 上使用 setter 匹配字段 name 来自动 maps 字段来使这更容易。再次使用 football example,BeanWrapperFieldSetMapper configuration 看起来像下面的代码片段:

XML Configuration

<bean id="fieldSetMapper"
      class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
    <property name="prototypeBeanName" value="player" />
</bean>

<bean id="player"
      class="org.springframework.batch.sample.domain.Player"
      scope="prototype" />

Java Configuration

@Bean
public FieldSetMapper fieldSetMapper() {
        BeanWrapperFieldSetMapper fieldSetMapper = new BeanWrapperFieldSetMapper();

        fieldSetMapper.setPrototypeBeanName("player");

        return fieldSetMapper;
}

@Bean
@Scope("prototype")
public Player player() {
        return new Player();
}

对于FieldSet中的每个条目,映射器在Player object 的新实例上查找相应的 setter(因此,需要原型范围),就像 Spring 容器查找匹配 property name 的 setter 一样。映射FieldSet中的每个可用字段,并返回结果Player object,不需要 code。

固定长度文件格式

到目前为止,只详细讨论了分隔的 files。但是,它们只占文件阅读图片的一半。许多使用 flat files 的组织使用固定长度格式。一个 example 定长文件如下:

UK21341EAH4121131.11customer1
UK21341EAH4221232.11customer2
UK21341EAH4321333.11customer3
UK21341EAH4421434.11customer4
UK21341EAH4521535.11customer5

虽然这看起来像一个大字段,但它实际上代表了 4 个不同的字段:

  • ISIN:正在订购的 item 的唯一标识符 - 12 个字符 long。

  • 数量:订购的 item 的数量 - 3 个字符 long。

  • 价格:item 的价格 - 5 个字符 long。

  • 客户:订购 item 的客户 ID - 9 个字符 long。

配置FixedLengthLineTokenizer时,必须以范围的形式提供每个长度,如下面的示例所示:

XML Configuration

<bean id="fixedLengthLineTokenizer"
      class="org.springframework.batch.io.file.transform.FixedLengthTokenizer">
    <property name="names" value="ISIN,Quantity,Price,Customer" />
    <property name="columns" value="1-12, 13-15, 16-20, 21-29" />
</bean>

因为FixedLengthLineTokenizer使用与上面讨论的相同的LineTokenizer接口,所以它返回相同的FieldSet,就好像使用了分隔符一样。这允许在处理其输出时使用相同的方法,例如使用BeanWrapperFieldSetMapper

支持范围的上述语法要求在ApplicationContext中配置专用的 property 编辑器RangeArrayPropertyEditor。但是,此 bean 在ApplicationContext中自动声明,其中使用批处理命名空间。

Java Configuration

@Bean
public FixedLengthTokenizer fixedLengthTokenizer() {
        FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();

        tokenizer.setNames("ISIN", "Quantity", "Price", "Customer");
        tokenizer.setColumns(new Range(1-12),
                                                new Range(13-15),
                                                new Range(16-20),
                                                new Range(21-29));

        return tokenizer;
}

因为FixedLengthLineTokenizer使用与上面讨论的相同的LineTokenizer接口,所以它返回相同的FieldSet,就好像使用了分隔符一样。这允许在处理其输出时使用相同的方法,例如使用BeanWrapperFieldSetMapper

单个文件中的多个 Record 类型

到目前为止,所有文件读取示例都为了简单起见而做出了 key 假设:文件中的所有记录都具有相同的格式。但是,情况可能并非总是如此。非常常见的是,文件可能具有不同格式的记录,需要以不同方式进行标记并映射到不同的 objects。以下文件摘录说明了这一点:

USER;Smith;Peter;;T;20014539;F
LINEA;1044391041ABC037.49G201XX1383.12H
LINEB;2134776319DEF422.99M005LI

在这个文件中,我们有三种类型的记录,“USER”,“LINEA”和“LINEB”。 “USER”line 对应于User object。 “LINEA”和“LINEB”都对应于Line objects,尽管“LINEA”比“LINEB”具有更多信息。

ItemReader单独读取每个 line,但我们必须指定不同的LineTokenizerFieldSetMapper objects,以便ItemWriter接收正确的项目。 PatternMatchingCompositeLineMapper通过允许 maps 模式到LineTokenizer实例和模式到FieldSetMapper实例来配置,这使得这很简单,如下面的示例所示:

XML Configuration

<bean id="orderFileLineMapper"
      class="org.spr...PatternMatchingCompositeLineMapper">
    <property name="tokenizers">
        <map>
            <entry key="USER*" value-ref="userTokenizer" />
            <entry key="LINEA*" value-ref="lineATokenizer" />
            <entry key="LINEB*" value-ref="lineBTokenizer" />
        </map>
    </property>
    <property name="fieldSetMappers">
        <map>
            <entry key="USER*" value-ref="userFieldSetMapper" />
            <entry key="LINE*" value-ref="lineFieldSetMapper" />
        </map>
    </property>
</bean>

Java Configuration

@Bean
public PatternMatchingCompositeLineMapper orderFileLineMapper() {
        PatternMatchingCompositeLineMapper lineMapper =
                new PatternMatchingCompositeLineMapper();

        Map<String, LineTokenizer> tokenizers = new HashMap<>(3);
        tokenizers.put("USER*", userTokenizer());
        tokenizers.put("LINEA*", lineATokenizer());
        tokenizers.put("LINEB*", lineBTokenizer());

        lineMapper.setTokenizers(tokenizers);

        Map<String, FieldSetMapper> mappers = new HashMap<>(2);
        mappers.put("USER*", userFieldSetMapper());
        mappers.put("LINE*", lineFieldSetMapper());

        lineMapper.setFieldSetMappers(mappers);

        return lineMapper;
}

在此 example 中,“LINEA”和“LINEB”具有单独的LineTokenizer实例,但它们都使用相同的FieldSetMapper

PatternMatchingCompositeLineMapper使用 order 中的PatternMatcher#match方法为每个 line 选择正确的委托。 PatternMatcher允许两个具有特殊含义的通配符:问号(“?”)恰好匹配一个字符,而星号(“*”)匹配零个或多个字符。请注意,在前面的 configuration 中,所有模式都以星号结尾,这使它们成为 lines 的有效前缀。无论 configuration 中的 order 如何,PatternMatcher始终匹配可能的最具体的 pattern。因此,如果“ LINE *”和“LINEA *”都被列为模式,“LINEA”将 match pattern“LINEA *”,而“LINEB”将 match pattern“LINE ”。此外,单个星号(“”)可以通过匹配任何其他 pattern 不匹配的 line 作为默认值,如下面的 example 所示。

XML Configuration

<entry key="*" value-ref="defaultLineTokenizer" />

Java Configuration

...
tokenizers.put("*", defaultLineTokenizer());
...

还有PatternMatchingCompositeLineTokenizer可以单独用于标记化。

对于平面文件来说,包含每个 span 多个 lines 的记录也是如此。要处理这种情况,需要采用更复杂的策略。可以在multiLineRecords sample 中找到此 common pattern 的演示。

Exception 处理 Flat Files

标记 line 时可能会导致抛出 exceptions。许多平 files 不完美,包含格式不正确的记录。许多用户选择在 logging 问题,原始 line 和 line 编号时跳过这些错误的 lines。稍后可以手动或通过另一批 job 检查这些日志。因此,Spring Batch 为处理解析 exceptions 提供了 exceptions 层次结构:FlatFileParseExceptionFlatFileFormatException。当尝试读取文件时遇到任何错误时,FlatFileItemReader会抛出FlatFileParseException。 由LineTokenizer接口的_implement 抛出,表示在标记化时遇到更具体的错误。

IncorrectTokenCountException

DelimitedLineTokenizerFixedLengthLineTokenizer都能够指定可用于创建FieldSet的列名。但是,如果列名称的数量不匹配在标记 line 时找到的列数,则无法创建FieldSet,并抛出IncorrectTokenCountException,其中包含遇到的标记数和预期数,如以下 example:

tokenizer.setNames(new String[] {"A", "B", "C", "D"});

try {
    tokenizer.tokenize("a,b,c");
}
catch(IncorrectTokenCountException e){
    assertEquals(4, e.getExpectedCount());
    assertEquals(3, e.getActualCount());
}

由于 tokenizer 配置了 4 个列名,但在文件中只找到 3 个令牌,因此抛出了IncorrectTokenCountException

IncorrectLineLengthException

_格式化为 fixed-length 格式的文件在解析时有其他要求,因为与分隔格式不同,每列必须严格遵守其预定义的宽度。如果总 line 长度不等于此列的最宽 value,则抛出 exception,如下面的 example 所示:

tokenizer.setColumns(new Range[] { new Range(1, 5),
                                   new Range(6, 10),
                                   new Range(11, 15) });
try {
    tokenizer.tokenize("12345");
    fail("Expected IncorrectLineLengthException");
}
catch (IncorrectLineLengthException ex) {
    assertEquals(15, ex.getExpectedLength());
    assertEquals(5, ex.getActualLength());
}

上面的 tokenizer 的配置范围是:1-5,6-10 和 11-15。因此,line 的总长度为 15.但是,在前面的 example 中,传入了长度为 5 的 line,导致IncorrectLineLengthException被抛出。在这里抛出 exception 而不是仅映射第一列允许 line 的处理更早失败,并且如果在尝试读取FieldSetMapper中的第 2 列时失败,则允许的信息比它包含的信息多。但是,有些情况下 line 的长度并不总是恒定的。因此,可以通过'strict'property 关闭 line length 的验证,如下面的示例所示:

tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10) });
tokenizer.setStrict(false);
FieldSet tokens = tokenizer.tokenize("12345");
assertEquals("12345", tokens.readString(0));
assertEquals("", tokens.readString(1));

除了tokenizer.setStrict(false)被调用之外,前面的 example 几乎与之前的 example 相同。此设置告诉标记生成器在标记 line 时不强制 line 长度。现在可以正确创建并返回FieldSet。但是,它仅包含剩余值的空标记。

1.6.3. FlatFileItemWriter

写出平 files 有同样的问题和问题,从文件读入必须克服。 step 必须能够以 transactional 方式编写分隔或固定长度格式。

LineAggregator

正如LineTokenizer接口是获取 item 并将其转换为String所必需的一样,文件编写必须能够将多个字段聚合为单个 string 以写入文件。在 Spring Batch 中,这是LineAggregator,如以下接口定义所示:

public interface LineAggregator<T> {

    public String aggregate(T item);

}

LineAggregatorLineTokenizer的逻辑相反。 LineTokenizer接受String并返回FieldSet,而LineAggregator接受item并返回String

PassThroughLineAggregator

LineAggregator接口最基本的 implementation 是PassThroughLineAggregator,它假定 object 已经是 string 或者它的 string 表示可以写入,如下面的 code 所示:

public class PassThroughLineAggregator<T> implements LineAggregator<T> {

    public String aggregate(T item) {
        return item.toString();
    }
}

如果需要直接控制 creating string 但是的优点(例如 transaction 和 restart support)是必要的,那么前面的 implementation 很有用。

简化文件写入示例

现在已经定义了LineAggregator接口及其最基本的 implementation PassThroughLineAggregator,可以解释基本的写入流程:

  • 要写入的 object 传递给 order 中的LineAggregator以获得String

  • 返回的String将写入配置的文件。

以下摘自FlatFileItemWriter在 code 中表示:

public void write(T item) throws Exception {
    write(lineAggregator.aggregate(item) + LINE_SEPARATOR);
}

一个简单的 configuration 可能如下所示:

XML Configuration

<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
    <property name="resource" value="file:target/test-outputs/output.txt" />
    <property name="lineAggregator">
        <bean class="org.spr...PassThroughLineAggregator"/>
    </property>
</bean>

Java Configuration

@Bean
public FlatFileItemWriter itemWriter() {
        return  new FlatFileItemWriterBuilder<Foo>()
                                   .name("itemWriter")
                                   .resource(new FileSystemResource("target/test-outputs/output.txt"))
                                   .lineAggregator(new PassThroughLineAggregator<>())
                                   .build();
}
FieldExtractor

前面的 example 可能对写入文件的最基本用法很有用。但是,FlatFileItemWriter的大多数用户都有一个需要写出的 domain object,因此必须转换为 line。在文件阅读中,需要以下内容:

  • 从文件中读取一个 line。

  • 将 line 传递给LineTokenizer#tokenize()方法,在 order 中检索FieldSet

  • 将从标记化返回的FieldSet传递给FieldSetMapper,从ItemReader#read()方法返回结果。

文件编写有类似但相反的步骤:

  • 传递 item 以写入 writer。

  • 将 item 上的字段转换为 array。

  • 将生成的 array 聚合为 line。

因为 framework 无法知道需要写出 object 中的哪些字段,所以必须编写FieldExtractor来完成将 item 转换为 array 的任务,如以下接口定义所示:

public interface FieldExtractor<T> {

    Object[] extract(T item);

}

接口的实现应该从提供的 object 的字段创建一个 array,然后可以使用元素之间的分隔符或作为 fixed-widthline 的一部分写出。

PassThroughFieldExtractor

在许多情况下,需要写出集合,例如 array,CollectionFieldSet。从这些集合类型之一“提取”array 是非常简单的。为此,请将集合转换为 array。因此,应在此方案中使用PassThroughFieldExtractor。应该注意的是,如果传入的 object 不是一种集合,那么PassThroughFieldExtractor将返回一个仅包含要提取的 item 的 array。

BeanWrapperFieldExtractor

与文件读取部分中描述的BeanWrapperFieldSetMapper一样,通常最好配置如何将 domain object 转换为 object array,而不是自己编写转换。 BeanWrapperFieldExtractor提供此功能,如以下 example 所示:

BeanWrapperFieldExtractor<Name> extractor = new BeanWrapperFieldExtractor<Name>();
extractor.setNames(new String[] { "first", "last", "born" });

String first = "Alan";
String last = "Turing";
int born = 1912;

Name n = new Name(first, last, born);
Object[] values = extractor.extract(n);

assertEquals(first, values[0]);
assertEquals(last, values[1]);
assertEquals(born, values[2]);

这个提取器 implementation 只有一个必需的 property:map 的字段名称。正如BeanWrapperFieldSetMapper需要在FieldSet上的 map 字段上的 map 字段到提供的 object 上的 setter,BeanWrapperFieldExtractor需要 map 的名称来为创建 object array 的 getter。值得注意的是,名称的 order 决定了 array 中字段的 order。

分隔文件写入示例

最基本的平面文件格式是所有字段由分隔符分隔的格式。这可以使用DelimitedLineAggregator来完成。以下 example 会写出一个简单的 domain object,表示对客户帐户的信用:

public class CustomerCredit {

    private int id;
    private String name;
    private BigDecimal credit;

    //getters and setters removed for clarity
}

因为正在使用 domain object,所以必须提供FieldExtractor接口的 implementation,以及要使用的分隔符,如下面的示例所示:

XML Configuration

<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator">
        <bean class="org.spr...DelimitedLineAggregator">
            <property name="delimiter" value=","/>
            <property name="fieldExtractor">
                <bean class="org.spr...BeanWrapperFieldExtractor">
                    <property name="names" value="name,credit"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

Java Configuration

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
        BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
        fieldExtractor.setNames(new String[] {"name", "credit"});
        fieldExtractor.afterPropertiesSet();

        DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>();
        lineAggregator.setDelimiter(",");
        lineAggregator.setFieldExtractor(fieldExtractor);

        return new FlatFileItemWriterBuilder<CustomerCredit>()
                                .name("customerCreditWriter")
                                .resource(outputResource)
                                .lineAggregator(lineAggregator)
                                .build();
}

在上一个 example 中,本章前面描述的BeanWrapperFieldExtractor用于将CustomerCredit中的 name 和 credit 字段转换为 object array,然后在每个字段之间用逗号写出。

也可以使用FlatFileItemWriterBuilder.DelimitedBuilder自动创建BeanWrapperFieldExtractorDelimitedLineAggregator,如下面的示例所示:

Java Configuration

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
        return new FlatFileItemWriterBuilder<CustomerCredit>()
                                .name("customerCreditWriter")
                                .resource(outputResource)
                                .delimited()
                                .delimiter("|")
                                .names(new String[] {"name", "credit"})
                                .build();
}
固定宽度文件写入 Example

分隔不是唯一的平面文件格式。许多人更喜欢使用每列的设定宽度来描绘字段之间,这通常被称为“固定宽度”。 Spring Batch 支持使用FormatterLineAggregator进行文件写入。使用上面描述的相同CustomerCredit domain object,可以配置如下:

XML Configuration

<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator">
        <bean class="org.spr...FormatterLineAggregator">
            <property name="fieldExtractor">
                <bean class="org.spr...BeanWrapperFieldExtractor">
                    <property name="names" value="name,credit" />
                </bean>
            </property>
            <property name="format" value="%-9s%-2.0f" />
        </bean>
    </property>
</bean>

Java Configuration

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
        BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
        fieldExtractor.setNames(new String[] {"name", "credit"});
        fieldExtractor.afterPropertiesSet();

        FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
        lineAggregator.setFormat("%-9s%-2.0f");
        lineAggregator.setFieldExtractor(fieldExtractor);

        return new FlatFileItemWriterBuilder<CustomerCredit>()
                                .name("customerCreditWriter")
                                .resource(outputResource)
                                .lineAggregator(lineAggregator)
                                .build();
}

大多数前面的例子应该看起来很熟悉。但是,property 格式的 value 是新的,并显示在以下元素中:

<property name="format" value="%-9s%-2.0f" />
...
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
...

底层的 implementation 是使用与 Java 5 的一部分相同的Formatter构建的.Java Formatter基于 C 编程语言的printf功能。有关如何配置格式化程序的大多数详细信息都可以在格式化的 Javadoc 中找到。

也可以使用FlatFileItemWriterBuilder.FormattedBuilder自动创建BeanWrapperFieldExtractorFormatterLineAggregator,如下面的 example 所示:

Java Configuration

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
        return new FlatFileItemWriterBuilder<CustomerCredit>()
                                .name("customerCreditWriter")
                                .resource(outputResource)
                                .formatted()
                                .format("%-9s%-2.0f")
                                .names(new String[] {"name", "credit"})
                                .build();
}
处理文件创建

FlatFileItemReader与文件资源的关系非常简单。初始化 reader 时,它会打开文件(如果存在),如果不存在则抛出 exception。文件写作并不那么简单。乍一看,似乎FlatFileItemWriter应该存在类似的简单 contract:如果文件已经存在,则抛出 exception,如果不存在,则创建它并开始编写。但是,可能重新启动Job可能会导致问题。在正常重启方案中,contract 是相反的:如果文件存在,则从最后一个已知的好位置开始写入,如果不存在,则抛出 exception。但是,如果此 job 的文件 name 始终相同,会发生什么?在这种情况下,除非重新启动,否则您希望删除该文件(如果存在)。由于这种可能性,FlatFileItemWriter包含 property,shouldDeleteIfExists。将此 property 设置为 true 会导致在打开 writer 时删除具有相同 name 的现有文件。

1.7. XML Item Readers 和 Writers

Spring Batch 提供 transactional 基础结构,既可以读取 XML 记录,也可以将它们映射到 Java objects,也可以将 Java objects 编写为 XML 记录。

流 XML 的约束

StAX API 用于 I/O,因为其他标准 XML 解析 API 不适合批处理要求(DOM 将整个输入一次加载到 memory,而 SAX 通过允许用户仅提供回调来控制解析 process)。

我们需要考虑 Spring Batch 中 XML 输入和输出的工作原理。首先,有一些概念因文件读写而异,但在 Spring Batch XML 处理中是 common。使用 XML 处理,而不是需要进行标记化的行记录(FieldSet实例),假设 XML 资源是与各个记录对应的“片段”的集合,如下图所示:

图 1. XML 输入

'trade'标签被定义为上述场景中的'根元素'。 '<trade>'和'</trade>'之间的所有内容都被视为一个'片段'。 Spring Batch 使用 Object/XML Mapping(OXM)将片段绑定到 objects。但是,Spring Batch 并不依赖于任何特定的 XML binding 技术。典型的用途是委托Spring OXM,它为最流行的 OXM 技术提供统一的抽象。对 Spring OXM 的依赖是可选的,如果需要,您可以选择实现 Spring Batch 特定接口。与 OXM 支持的技术的关系如下图所示:

图 2. OXM Binding

通过介绍 OXM 以及如何使用 XML 片段来表示记录,我们现在可以更仔细地检查 readers 和 writers。

1.7.1. StaxEventItemReader

StaxEventItemReader configuration 提供了从 XML 输入流处理记录的典型设置。首先,考虑StaxEventItemReader可以 process 的以下 XML 记录集:

<?xml version="1.0" encoding="UTF-8"?>
<records>
    <trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0001</isin>
        <quantity>5</quantity>
        <price>11.39</price>
        <customer>Customer1</customer>
    </trade>
    <trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0002</isin>
        <quantity>2</quantity>
        <price>72.99</price>
        <customer>Customer2c</customer>
    </trade>
    <trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0003</isin>
        <quantity>9</quantity>
        <price>99.99</price>
        <customer>Customer3</customer>
    </trade>
</records>

为了能够处理 XML 记录,需要以下内容:

  • Root Element Name:构成要映射的 object 的片段的根元素的 name。 example configuration 使用 value of trade 来演示这一点。

  • 资源:表示要读取的文件的 Spring 资源。

  • Unmarshaller:Spring OXM 提供的用于将 XML 片段映射到 object 的解组工具。

以下 example 显示了如何定义一个StaxEventItemReader,它使用名为trade的根元素,org/springframework/batch/item/xml/domain/trades.xml的资源和一个名为tradeMarshaller的 unmarshaller。

XML Configuration

<bean id="itemReader" class="org.springframework.batch.item.xml.StaxEventItemReader">
    <property name="fragmentRootElementName" value="trade" />
    <property name="resource" value="org/springframework/batch/item/xml/domain/trades.xml" />
    <property name="unmarshaller" ref="tradeMarshaller" />
</bean>

Java Configuration

@Bean
public StaxEventItemReader itemReader() {
        return new StaxEventItemReaderBuilder<Trade>()
                        .name("itemReader")
                        .resource(new FileSystemResource("org/springframework/batch/item/xml/domain/trades.xml"))
                        .addFragmentRootElements("trade")
                        .unmarshaller(tradeMarshaller())
                        .build();

}

请注意,在此 example 中,我们选择使用XStreamMarshaller,它接受作为 map 传入的别名,第一个 key 和 value 是片段的 name(即根元素)和 object 类型要绑定。然后,类似于FieldSet,map 类型中 map 的其他元素的名称在 map 中被描述为 key/value 对。在 configuration 文件中,我们可以使用 Spring configuration 实用程序来描述所需的别名,如下所示:

XML Configuration

<bean id="tradeMarshaller"
      class="org.springframework.oxm.xstream.XStreamMarshaller">
    <property name="aliases">
        <util:map id="aliases">
            <entry key="trade"
                   value="org.springframework.batch.sample.domain.trade.Trade" />
            <entry key="price" value="java.math.BigDecimal" />
            <entry key="isin" value="java.lang.String" />
            <entry key="customer" value="java.lang.String" />
            <entry key="quantity" value="java.lang.Long" />
        </util:map>
    </property>
</bean>

Java Configuration

@Bean
public XStreamMarshaller tradeMarshaller() {
        Map<String, Class> aliases = new HashMap<>();
        aliases.put("trade", Trade.class);
        aliases.put("price", BigDecimal.class);
        aliases.put("isin", String.class);
        aliases.put("customer", String.class);
        aliases.put("quantity", Long.class);

        XStreamMarshaller marshaller = new XStreamMarshaller();

        marshaller.setAliases(aliases);

        return marshaller;
}

在输入时,reader 读取 XML 资源,直到它识别出新片段即将开始。默认情况下,reader 与元素 name 匹配,以识别新片段即将启动。 reader 从片段创建一个独立的 XML 文档,并将文档传递给反序列化器(通常是 Spring OXM Unmarshaller周围的 wrapper),以将 XML 映射到 Java object。

总之,此过程类似于以下 Java code,它使用 Spring configuration 提供的注入:

StaxEventItemReader<Trade> xmlStaxEventItemReader = new StaxEventItemReader<>();
Resource resource = new ByteArrayResource(xmlResource.getBytes());

Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
XStreamMarshaller unmarshaller = new XStreamMarshaller();
unmarshaller.setAliases(aliases);
xmlStaxEventItemReader.setUnmarshaller(unmarshaller);
xmlStaxEventItemReader.setResource(resource);
xmlStaxEventItemReader.setFragmentRootElementName("trade");
xmlStaxEventItemReader.open(new ExecutionContext());

boolean hasNext = true;

Trade trade = null;

while (hasNext) {
    trade = xmlStaxEventItemReader.read();
    if (trade == null) {
        hasNext = false;
    }
    else {
        System.out.println(trade);
    }
}

1.7.2. StaxEventItemWriter

输出与输入对称。 StaxEventItemWriter需要Resource,marshaller 和rootTagName。 Java object 被传递给 marshaller(通常是标准的 Spring OXM Marshaller),它通过使用自定义 event writer 写入Resource,该自定义 event writer 过滤 OXM 工具为每个片段生成的StartDocumentEndDocument events。以下 example 使用StaxEventItemWriter

XML Configuration

<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="marshaller" ref="tradeMarshaller" />
    <property name="rootTagName" value="trade" />
    <property name="overwriteOutput" value="true" />
</bean>

Java Configuration

@Bean
public StaxEventItemWriter itemWriter(Resource outputResource) {
        return new StaxEventItemWriterBuilder<Trade>()
                        .name("tradesWriter")
                        .marshaller(tradeMarshaller())
                        .resource(outputResource)
                        .rootTagName("trade")
                        .overwriteOutput(true)
                        .build();

}

前面的 configuration 设置了三个必需的 properties 和 sets 可选overwriteOutput=true属性,本章前面提到过指定是否可以覆盖现有文件。应该注意的是,以下 example 中用于 writer 的编组器与本章前面的阅读 example 中使用的编组器完全相同:

XML Configuration

<bean id="customerCreditMarshaller"
      class="org.springframework.oxm.xstream.XStreamMarshaller">
    <property name="aliases">
        <util:map id="aliases">
            <entry key="customer"
                   value="org.springframework.batch.sample.domain.trade.Trade" />
            <entry key="price" value="java.math.BigDecimal" />
            <entry key="isin" value="java.lang.String" />
            <entry key="customer" value="java.lang.String" />
            <entry key="quantity" value="java.lang.Long" />
        </util:map>
    </property>
</bean>

Java Configuration

@Bean
public XStreamMarshaller customerCreditMarshaller() {
        XStreamMarshaller marshaller = new XStreamMarshaller();

        Map<String, Class> aliases = new HashMap<>();
        aliases.put("trade", Trade.class);
        aliases.put("price", BigDecimal.class);
        aliases.put("isin", String.class);
        aliases.put("customer", String.class);
        aliases.put("quantity", Long.class);

        marshaller.setAliases(aliases);

        return marshaller;
}

总结一下 Java example,下面的 code 说明了所讨论的所有要点,展示了所需 properties 的编程设置:

FileSystemResource resource = new FileSystemResource("data/outputFile.xml")

Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
Marshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

StaxEventItemWriter staxItemWriter =
        new StaxEventItemWriterBuilder<Trade>()
                                .name("tradesWriter")
                                .marshaller(marshaller)
                                .resource(resource)
                                .rootTagName("trade")
                                .overwriteOutput(true)
                                .build();

staxItemWriter.afterPropertiesSet();

ExecutionContext executionContext = new ExecutionContext();
staxItemWriter.open(executionContext);
Trade trade = new Trade();
trade.setPrice(11.39);
trade.setIsin("XYZ0001");
trade.setQuantity(5L);
trade.setCustomer("Customer1");
staxItemWriter.write(trade);

1.8. JSON Item Readers 和 Writers

Spring Batch 支持以下列格式读取和编写 JSON 资源:

[
  {
    "isin": "123",
    "quantity": 1,
    "price": 1.2,
    "customer": "foo"
  },
  {
    "isin": "456",
    "quantity": 2,
    "price": 1.4,
    "customer": "bar"
  }
]

假设 JSON 资源是对应于各个项的 JSON object 的 array。 Spring Batch 与任何特定的 JSON library 无关。

1.8.1. JsonItemReader

JsonItemReader将 JSON 解析和 binding 委托给org.springframework.batch.item.json.JsonObjectReader接口的 implementations。此接口旨在通过使用流 API 来实现以块的形式读取 JSON objects。目前提供了两个 implementations:

  • Jackson通过org.springframework.batch.item.json.JacksonJsonObjectReader

  • GSON通过org.springframework.batch.item.json.GsonJsonObjectReader

为了能够处理 JSON 记录,需要以下内容:

  • Resource:表示要读取的 JSON 文件的 Spring 资源。

  • JsonObjectReader:JSON object reader,用于解析 JSON object 并将其绑定到项目

以下 example 显示了如何定义适用于以前的 JSON 资源org/springframework/batch/item/json/trades.jsonJsonItemReader和基于 Jackson 的JsonObjectReader

@Bean
public JsonItemReader<Trade> jsonItemReader() {
   return new JsonItemReaderBuilder<Trade>()
                 .jsonObjectReader(new JacksonJsonObjectReader<>(Trade.class))
                 .resource(new ClassPathResource("trades.json"))
                 .name("tradeJsonItemReader")
                 .build();
}

1.8.2. JsonFileItemWriter

JsonFileItemWriter将项目的编组委托给org.springframework.batch.item.json.JsonObjectMarshaller接口。此接口的 contract 是将 object 并将其编组为 JSON String。目前提供了两个 implementations:

  • Jackson通过org.springframework.batch.item.json.JacksonJsonObjectMarshaller

  • GSON通过org.springframework.batch.item.json.GsonJsonObjectMarshaller

为了能够编写 JSON 记录,需要以下内容:

  • Resource:Spring Resource表示要写入的 JSON 文件

  • JsonObjectMarshaller:JSON object marshaller 将 objects 编组为 JSON 格式

以下 example 显示了如何定义JsonFileItemWriter

@Bean
public JsonFileItemWriter<Trade> jsonFileItemWriter() {
   return new JsonFileItemWriterBuilder<Trade>()
                 .jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
                 .resource(new ClassPathResource("trades.json"))
                 .name("tradeJsonFileItemWriter")
                 .build();
}

1.9. Multi-File 输入

在单个Step中处理多个 files 是一个常见的要求。假设 files 都具有相同的格式,MultiResourceItemReader支持 XML 和平面文件处理的这种类型的输入。考虑目录中的以下 files:

file-1.txt  file-2.txt  ignored.txt

file-1.txtfile-2.txt的格式相同,并且出于商业原因,应该一起处理。 MultiResourceItemReader可用于通过使用通配符读取两个 files,如下面的示例所示:

XML Configuration

<bean id="multiResourceReader" class="org.spr...MultiResourceItemReader">
    <property name="resources" value="classpath:data/input/file-*.txt" />
    <property name="delegate" ref="flatFileItemReader" />
</bean>

Java Configuration

@Bean
public MultiResourceItemReader multiResourceReader() {
        return new MultiResourceItemReaderBuilder<Foo>()
                                        .delegate(flatFileItemReader())
                                        .resources(resources())
                                        .build();
}

引用的委托是一个简单的FlatFileItemReader。上述 configuration 从 files 读取输入,处理回滚和重启方案。应该注意的是,与任何ItemReader一样,添加额外输入(在这种情况下是文件)可能会在重新启动时导致潜在问题。建议批处理作业使用各自的目录,直到成功完成。

通过使用MultiResourceItemReader#setComparator(Comparator)来排序输入资源,以确保在重新启动方案中运行 job 之间保留资源 ordering。

1.10. 数据库

与大多数企业应用程序样式一样,数据库是批处理的中央存储机制。但是,由于系统必须使用的数据集的大小,批处理与其他 application 样式不同。如果 SQL 语句返回 100 万行,则结果集可能会将所有返回的结果保存在 memory 中,直到读取完所有行为止。 Spring Batch 为此问题提供了两种类型的解决方案:

  • Cursor-based ItemReader Implementations

  • 分页 ItemReader Implementations

1.10.1. Cursor-based ItemReader Implementations

使用数据库游标通常是大多数批处理开发人员的默认方法,因为它是数据库解决“流”关系数据问题的方法。 Java ResultSet class 本质上是一个面向对象的机制,用于操作游标。 ResultSet将光标维持到当前数据行。在ResultSet上调用next会将此光标移动到下一行。 Spring Batch cursor-based ItemReader implementation 在初始化时打开一个游标,并在每次调用read时将光标向前移动一行,返回一个可用于处理的映射对象。然后调用close方法以确保释放所有资源。 Spring 核心JdbcTemplate通过使用回调 pattern 完全 map ResultSet中的所有行并关闭然后将控制权返回给方法调用者来解决此问题。但是,在批处理中,这必须等到 step 完成。下图显示了 cursor-based ItemReader的工作原理图。请注意,虽然 example 使用 SQL(因为 SQL 广为人知),但任何技术都可以实现基本方法。

图 3.光标 Example

这个 example 说明了基本的 pattern。给定'FOO'table,它有三列:IDNAMEBAR,选择 ID 大于 1 但小于 7 的所有行。这将光标(第 1 行)的开头放在 ID 2 上。结果这一行应该是一个完全映射的Foo object。再次调用read()会将光标移动到下一行,即 ID 为 3 的Foo。这些读取的结果在每个read之后写出,允许 objects 被垃圾收集(假设没有实例变量维护 references 给它们) )。

JdbcCursorItemReader

JdbcCursorItemReader是 cursor-based 技术的 JDBC implementation。它直接与ResultSet一起工作,并且需要一个 SQL 语句来运行从DataSource获得的连接。以下数据库 schema 用作 example:

CREATE TABLE CUSTOMER (
   ID BIGINT IDENTITY PRIMARY KEY,
   NAME VARCHAR(45),
   CREDIT FLOAT
);

许多人更喜欢为每一行使用 domain object,因此以下 example 使用RowMapper接口的 implementation 来 map object:

public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {

    public static final String ID_COLUMN = "id";
    public static final String NAME_COLUMN = "name";
    public static final String CREDIT_COLUMN = "credit";

    public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
        CustomerCredit customerCredit = new CustomerCredit();

        customerCredit.setId(rs.getInt(ID_COLUMN));
        customerCredit.setName(rs.getString(NAME_COLUMN));
        customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));

        return customerCredit;
    }
}

因为JdbcCursorItemReaderJdbcTemplate共享 key 接口,所以查看如何使用JdbcTemplate读取此数据的 example 非常有用,在 order 中将其与ItemReader进行对比。出于此 example 的目的,假设CUSTOMER数据库中有 1,000 行。第一个 example 使用JdbcTemplate

//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
                                          new CustomerCreditRowMapper());

在 running 前面的 code 片段后,customerCredits列表包含 1,000 个CustomerCredit objects。在查询方法中,从DataSource获取连接,提供的 SQL 是 run,并且为ResultSet中的每一行调用mapRow方法。将此与JdbcCursorItemReader的方法进行对比,如下例所示:

JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close();

在 running 前面的 code 片段后,计数器等于 1,000。如果上面的 code 将返回的customerCredit放入列表中,则结果与JdbcTemplate example 完全相同。然而,ItemReader的最大优点是它允许项目“流式传输”。 read方法可以调用一次,item 可以用ItemWriter写出,然后下一个 item 可以用read获得。这允许 item 读取和写入以“块”完成并定期提交,这是高性能批处理的本质。此外,它非常容易配置为注入 Spring Batch Step,如下面的示例所示:

XML Configuration

<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

Java Configuration

@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
        return new JdbcCursorItemReaderBuilder<CustomerCredit>()
                        .dataSource(this.dataSource)
                        .name("creditReader")
                        .sql("select ID, NAME, CREDIT from CUSTOMER")
                        .rowMapper(new CustomerCreditRowMapper())
                        .build();

}
其他 Properties

因为在 Java 中打开游标有很多不同的选项,所以JdbcCursorItemReader上有许多 properties 可以设置,如下面的 table 中所述:

ignoreWarnings确定是否记录 SQLWarning 或导致 exception。默认值为true(表示记录了警告)。
FETCHSIZEItemReader使用的ResultSet object 需要更多行时,为 JDBC 驱动程序提供有关应从数据库中提取的行数的提示。默认情况下,不提供任何提示。
maxRows 进行设置底层ResultSet在任何一个 time 时可以容纳的最大行数限制。
QueryTimeout设置驱动程序等待Statement object 为 run 的秒数。如果超出限制,则抛出DataAccessException。 (有关详细信息,请参阅驱动程序供应商文档
verifyCursorPosition因为ItemReader持有的ResultSet被传递给RowMapper,所以用户可以自己调用ResultSet.next(),这可能会导致 reader 的内部计数出现问题。如果在RowMapper调用之后光标位置与之前不同,则将此 value 设置为true会导致抛出 exception。
saveState 和指示 reader 的 state 是否应保存在ItemStream#update(ExecutionContext)提供的ExecutionContext中。默认值为true
driverSupportsAbsolute指示 JDBC 驱动程序是否支持在ResultSet上设置绝对行。对于支持ResultSet.absolute()的 JDBC 驱动程序,建议将其设置为true,因为它可能会提高 performance,尤其是在处理大型数据集时 step 失败时。默认为false
setUseSharedExtendedConnection指示是否应该由所有其他处理使用用于游标的连接,从而共享相同的 transaction。如果将其设置为false,则使用自己的连接打开游标,并且不参与为 step 处理的 rest 启动的任何 transactions。如果将此 flag 设置为true,则必须将 DataSource 包装在ExtendedConnectionDataSourceProxy中,以防止在每次提交后关闭和释放连接。将此选项设置为true时,将使用“READ_ONLY”和“HOLD_CURSORS_OVER_COMMIT”选项创建用于打开游标的语句。这允许在 transaction start 上保持游标打开,并在 step 处理中执行提交。要使用此 feature,您需要一个支持此功能的数据库和一个支持 JDBC 3.0 或更高版本的 JDBC 驱动程序。默认为false
HibernateCursorItemReader

正如正常的 Spring 用户做出关于是否使用 ORM 解决方案的重要决定,这会影响他们是否使用JdbcTemplateHibernateTemplate,Spring Batch 用户具有相同的选项。 HibernateCursorItemReader是游标技术的 Hibernate implementation。 Hibernate 在批处理中的使用一直存在争议。这主要是因为 Hibernate 最初是为支持在线 application 样式而开发的。但是,这并不意味着它不能用于批处理。解决此问题的最简单方法是使用StatelessSession而不是标准 session。这将删除 Hibernate 使用的所有缓存和脏检查,这可能会导致批处理方案中出现问题。有关 stateless 和常规 hibernate 会话之间差异的更多信息,请参阅特定 hibernate 发行版的文档。 HibernateCursorItemReader允许您声明一个 HQL 语句并传入SessionFactory,它将以与JdbcCursorItemReader相同的基本方式传回每个调用一个 item。以下 example configuration 使用与 JDBC reader 相同的“customer credit”example:

HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close();

这个配置的ItemReader以与JdbcCursorItemReader描述的完全相同的方式返回CustomerCredit objects,假设已经为Customer table 正确创建了 hibernate 映射 files。 'useStatelessSession'property 默认为 true,但已添加到此处以引起注意打开或关闭它的能力。还值得注意的是,可以通过setFetchSize property 设置底层游标的获取大小。与JdbcCursorItemReader一样,configuration 很简单,如下面的示例所示:

XML Configuration

<bean id="itemReader"
      class="org.springframework.batch.item.database.HibernateCursorItemReader">
    <property name="sessionFactory" ref="sessionFactory" />
    <property name="queryString" value="from CustomerCredit" />
</bean>

Java Configuration

@Bean
public HibernateCursorItemReader itemReader(SessionFactory sessionFactory) {
        return new HibernateCursorItemReaderBuilder<CustomerCredit>()
                        .name("creditReader")
                        .sessionFactory(sessionFactory)
                        .queryString("from CustomerCredit")
                        .build();
}
StoredProcedureItemReader

有时需要使用存储过程获取游标数据。 StoredProcedureItemReader的工作方式与JdbcCursorItemReader类似,不同之处在于,它不是运行查询来获取游标,而是运行一个返回游标的存储过程。存储过程可以以三种不同的方式返回光标:

  • 作为返回的ResultSet(由 SQL Server,Sybase,DB2,Derby 和 MySQL 使用)。

  • 作为 out 参数返回 ref-cursor(由 Oracle 和 PostgreSQL 使用)。

  • 作为存储的 function 调用的 return value。

以下 example configuration 使用与前面示例相同的“customer credit”example:

XML Configuration

<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

Java Configuration

@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
        StoredProcedureItemReader reader = new StoredProcedureItemReader();

        reader.setDataSource(dataSource);
        reader.setProcedureName("sp_customer_credit");
        reader.setRowMapper(new CustomerCreditRowMapper());

        return reader;
}

前面的 example 依赖于存储过程来提供ResultSet作为返回结果(前面的选项 1)。

如果存储过程返回ref-cursor(选项 2),那么我们需要提供 out 参数的位置,即返回的ref-cursor。以下 example 显示了如何使用第一个参数 ref-cursor:

XML Configuration

<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

Java Configuration

@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
        StoredProcedureItemReader reader = new StoredProcedureItemReader();

        reader.setDataSource(dataSource);
        reader.setProcedureName("sp_customer_credit");
        reader.setRowMapper(new CustomerCreditRowMapper());
        reader.setRefCursorPosition(1);

        return reader;
}

如果光标是从存储的 function(选项 3)返回的,我们需要将 property“function”设置为true。它默认为false。以下 example 显示了它的外观:

XML Configuration

<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="function" value="true"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

Java Configuration

@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
        StoredProcedureItemReader reader = new StoredProcedureItemReader();

        reader.setDataSource(dataSource);
        reader.setProcedureName("sp_customer_credit");
        reader.setRowMapper(new CustomerCreditRowMapper());
        reader.setFunction(true);

        return reader;
}

在所有这些情况下,我们需要定义一个RowMapper以及一个DataSource和实际的过程 name。

如果存储过程或 function 接受参数,则必须通过parameters property 声明和设置它们。以下示例,对于 Oracle,声明了三个参数。第一个是返回 ref-cursor 的 out 参数,第二个和第三个参数是 value 类型为INTEGER的参数。

XML Configuration

<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="spring.cursor_func"/>
    <property name="parameters">
        <list>
            <bean class="org.springframework.jdbc.core.SqlOutParameter">
                <constructor-arg index="0" value="newid"/>
                <constructor-arg index="1">
                    <util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="amount"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="custid"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
        </list>
    </property>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper" ref="rowMapper"/>
    <property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>

Java Configuration

@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
        List<SqlParameter> parameters = new ArrayList<>();
        parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
        parameters.add(new SqlParameter("amount", Types.INTEGER);
        parameters.add(new SqlParameter("custId", Types.INTEGER);

        StoredProcedureItemReader reader = new StoredProcedureItemReader();

        reader.setDataSource(dataSource);
        reader.setProcedureName("spring.cursor_func");
        reader.setParameters(parameters);
        reader.setRefCursorPosition(1);
        reader.setRowMapper(rowMapper());
        reader.setPreparedStatementSetter(parameterSetter());

        return reader;
}

除了参数声明之外,我们还需要指定一个PreparedStatementSetter implementation 来设置调用的参数值。这与上面的JdbcCursorItemReader相同。 额外的 Properties中列出的所有其他 properties 也适用于StoredProcedureItemReader

1.10.2. 分页 ItemReader Implementations

使用数据库游标的另一种方法是 running 多个查询,其中每个查询都获取一部分结果。我们将此部分称为页面。每个查询都必须指定起始行号和我们想要在页面中返回的行数。

JdbcPagingItemReader

一个实现分页ItemReaderJdbcPagingItemReaderJdbcPagingItemReader需要PagingQueryProvider负责提供用于检索构成页面的行的 SQL 查询。由于每个数据库都有自己的提供分页支持的策略,因此我们需要为每个受支持的数据库类型使用不同的PagingQueryProvider。还有SqlPagingQueryProviderFactoryBean auto-detects 正在使用的数据库并确定适当的PagingQueryProvider implementation。这简化了 configuration,是推荐的最佳实践。

SqlPagingQueryProviderFactoryBean要求您指定select子句和from子句。您还可以提供可选的where子句。这些子句和必需的sortKey用于 build SQL 语句。

sortKey上设置唯一的 key 约束非常重要,以确保执行之间不会丢失任何数据。

打开 reader 后,它会以与任何其他ItemReader相同的基本方式将每次调用一个 item 传回read。当需要额外的行时,分页发生在幕后。

以下 example configuration 使用与之前显示的 cursor-based ItemReaders类似的“customer credit”example:

XML Configuration

<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="queryProvider">
        <bean class="org.spr...SqlPagingQueryProviderFactoryBean">
            <property name="selectClause" value="select id, name, credit"/>
            <property name="fromClause" value="from customer"/>
            <property name="whereClause" value="where status=:status"/>
            <property name="sortKey" value="id"/>
        </bean>
    </property>
    <property name="parameterValues">
        <map>
            <entry key="status" value="NEW"/>
        </map>
    </property>
    <property name="pageSize" value="1000"/>
    <property name="rowMapper" ref="customerMapper"/>
</bean>

Java Configuration

@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("status", "NEW");

        return new JdbcPagingItemReaderBuilder<CustomerCredit>()
                                           .name("creditReader")
                                           .dataSource(dataSource)
                                           .queryProvider(queryProvider)
                                           .parameterValues(parameterValues)
                                           .rowMapper(customerCreditMapper())
                                           .pageSize(1000)
                                           .build();
}

@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

        provider.setSelectClause("select id, name, credit");
        provider.setFromClause("from customer");
        provider.setWhereClause("where status=:status");
        provider.setSortKey("id");

        return provider;
}

这个配置ItemReader使用RowMapper返回CustomerCredit objects,必须指定。 'pageSize'property 确定每个查询 run 从数据库读取的实体数。

'parameterValues'property 可用于为查询指定Map参数值。如果在where子句中使用命名参数,则每个条目的 key 应匹配命名参数的 name。如果你使用传统的'?'占位符,然后每个条目的 key 应该是占位符的编号,从 1 开始。

JpaPagingItemReader

另一个实现分页ItemReaderJpaPagingItemReader。 JPA 没有类似于 Hibernate StatelessSession的概念,因此我们必须使用 JPA 规范提供的其他 features。由于 JPA 支持分页,因此在使用 JPA 进行批处理时这是一个自然的选择。读取每个页面后,实体将分离并清除持久性 context,以允许在处理页面后对实体进行垃圾回收。

JpaPagingItemReader允许您声明 JPQL 语句并传入EntityManagerFactory。然后,每次调用返回一个 item,以与任何其他ItemReader相同的基本方式读取。当需要其他实体时,分页发生在幕后。以下 example configuration 使用与先前显示的 JDBC reader 相同的“customer credit”example:

XML Configuration

<bean id="itemReader" class="org.spr...JpaPagingItemReader">
    <property name="entityManagerFactory" ref="entityManagerFactory"/>
    <property name="queryString" value="select c from CustomerCredit c"/>
    <property name="pageSize" value="1000"/>
</bean>

Java Configuration

@Bean
public JpaPagingItemReader itemReader() {
        return new JpaPagingItemReaderBuilder<CustomerCredit>()
                                           .name("creditReader")
                                           .entityManagerFactory(entityManagerFactory())
                                           .queryString("select c from CustomerCredit c")
                                           .pageSize(1000)
                                           .build();
}

假设CustomerCredit object 具有正确的 JPA annotations 或 ORM 映射文件,此配置ItemReader以与上述JdbcPagingItemReader所述完全相同的方式返回CustomerCredit objects。 'pageSize'property 确定每次查询执行时从数据库读取的实体数。

1.10.3. 数据库 ItemWriters

虽然 flat files 和 XML files 都有一个特定的ItemWriter实例,但在数据库世界中没有确切的等价物。这是因为 transactions 提供了所有必需的功能。 ItemWriter __mplemplement 对于 files 是必要的,因为它们必须表现为 transactional,跟踪书面项目并在适当的时间刷新或清除。数据库不需要此功能,因为写入已包含在 transaction 中。用户可以创建自己的 DAO 来实现ItemWriter接口,也可以使用自定义ItemWriter中的一个来编写通用处理问题。无论哪种方式,他们应该没有任何问题。需要注意的一件事是通过批量输出提供的 performance 和错误处理功能。当使用 hibernate 作为ItemWriter时,这是最常见的,但在使用 JDBC 批处理模式时可能会遇到相同的问题。批处理数据库输出没有任何固有的缺陷,假设我们小心刷新并且数据中没有错误。但是,写入时的任何错误都可能导致混淆,因为无法知道哪个 item 导致 exception 或者即使任何单个 item 负责,如下图所示:

图 4.刷新时的错误

如果在写入之前缓冲了项目,则在提交之前刷新缓冲区之前不会抛出任何错误。对于 example,假设每个块写入 20 个项目,并且第 15 个 item 抛出DataIntegrityViolationException。就Step而言,所有 20 个 item 都被成功写入,因为在实际写入之前无法知道错误发生。调用Session#flush()后,清空缓冲区并触发 exception。在这一点上,Step无能为力。 transaction 必须回滚。通常,此 exception 可能会导致 item 被跳过(取决于 skip/retry policies),然后不会再次写入。但是,在批处理方案中,无法知道哪个 item 导致了该问题。当故障发生时,整个缓冲区被写入。解决此问题的唯一方法是在每个 item 之后刷新,如下图所示:

图 5.写入错误

这是一个 common 用例,特别是在使用 Hibernate 时,ItemWriter的_imple 实现的简单指南是在每次调用write()时刷新。这样做可以可靠地跳过项目,Spring Batch 在发生错误后在内部处理 calls 的粒度为ItemWriter

1.11. 重用现有服务

批处理系统通常与其他 application 样式一起使用。最常见的是一个在线系统,但它也可以通过移动每个 application 样式使用的必要批量数据来支持 integration 甚至厚 client application。因此,很多用户希望在批处理作业中重用现有的 DAO 或其他服务。 Spring 容器本身通过允许注入任何必要的 class 来使这相当容易。但是,可能存在这样的情况:现有服务需要充当ItemReaderItemWriter,以满足另一个 Spring Batch class 的依赖性,或者因为它确实是_ste的主要ItemReader。为每个需要包装的服务编写一个适配器 class 是相当简单的,但由于它是一个常见的问题,Spring Batch 提供 implementations:ItemReaderAdapterItemWriterAdapter。两个 classes 都通过调用委托 pattern 来实现标准的 Spring 方法,并且设置起来相当简单。以下 example 使用ItemReaderAdapter

XML Configuration

<bean id="itemReader" class="org.springframework.batch.item.adapter.ItemReaderAdapter">
    <property name="targetObject" ref="fooService" />
    <property name="targetMethod" value="generateFoo" />
</bean>

<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />

Java Configuration

@Bean
public ItemReaderAdapter itemReader() {
        ItemReaderAdapter reader = new ItemReaderAdapter();

        reader.setTargetObject(fooService());
        reader.setTargetMethod("generateFoo");

        return reader;
}

@Bean
public FooService fooService() {
        return new FooService();
}

需要注意的一点是,targetMethod的 contract 必须与read的 contract 相同:当用尽时,它返回null。否则,它返回Object。其他任何事情都会阻止 framework 知道何时应该结束处理,导致无限循环或不正确的失败,这取决于ItemWriter的 implementation。以下 example 使用ItemWriterAdapter

XML Configuration

<bean id="itemWriter" class="org.springframework.batch.item.adapter.ItemWriterAdapter">
    <property name="targetObject" ref="fooService" />
    <property name="targetMethod" value="processFoo" />
</bean>

<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />

Java Configuration

@Bean
public ItemWriterAdapter itemWriter() {
        ItemWriterAdapter writer = new ItemWriterAdapter();

        writer.setTargetObject(fooService());
        writer.setTargetMethod("processFoo");

        return writer;
}

@Bean
public FooService fooService() {
        return new FooService();
}

1.12. 验证输入

在本章的过程中,讨论了多种解析输入的方法。如果不是'well-formed',每个主要的 implementation 都会抛出一个 exception。如果缺少一系列数据,FixedLengthTokenizer将抛出 exception。类似地,尝试访问RowMapperFieldSetMapper中不存在的索引或与预期格式不同的索引会导致抛出 exception。在read返回之前抛出所有这些类型的 exceptions。但是,它们没有解决返回的 item 是否有效的问题。例如,如果其中一个字段是年龄,则显然不能为负数。它可能正确解析,因为它存在且是一个数字,但它不会导致 exception。由于已经有大量的验证框架,Spring Batch 不会尝试提供另一个。相反,它提供了一个名为Validator的简单接口,可以通过任意数量的框架实现,如以下接口定义所示:

public interface Validator<T> {

    void validate(T value) throws ValidationException;

}

contract 是validate方法如果 object 无效则抛出 exception,如果有效则返回正常。 Spring Batch 提供了一个开箱即用的ValidatingItemProcessor,如下面的 bean 定义所示:

XML Configuration

<bean class="org.springframework.batch.item.validator.ValidatingItemProcessor">
    <property name="validator" ref="validator" />
</bean>

<bean id="validator" class="org.springframework.batch.item.validator.SpringValidator">
        <property name="validator">
                <bean class="org.springframework.batch.sample.domain.trade.internal.validator.TradeValidator"/>
        </property>
</bean>

Java Configuration

@Bean
public ValidatingItemProcessor itemProcessor() {
        ValidatingItemProcessor processor = new ValidatingItemProcessor();

        processor.setValidator(validator());

        return processor;
}

@Bean
public SpringValidator validator() {
        SpringValidator validator = new SpringValidator();

        validator.setValidator(new TradeValidator());

        return validator;
}

您还可以使用BeanValidatingItemProcessor验证使用 Bean Validation API(JSR-303)注释注释的项目。对于 example,给定以下类型Person

class Person {

    @NotEmpty
    private String name;

    public Person(String name) {
     this.name = name;
    }

    public String getName() {
     return name;
    }

    public void setName(String name) {
     this.name = name;
    }

}

您可以通过在 application context 中声明BeanValidatingItemProcessor bean 并在 chunk-oriented step 中将其注册为处理器来验证项目:

@Bean
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
    BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
    beanValidatingItemProcessor.setFilter(true);

    return beanValidatingItemProcessor;
}

1.13. 防止 State 持久化

默认情况下,所有ItemReaderItemWriter _mplempleations _在提交之前将当前 state 存储在ExecutionContext中。但是,这可能并不总是理想的行为。例如,许多开发人员选择使用 process 指标使其数据库 readers'可重新运行'。将一个额外的列添加到输入数据以指示它是否已被处理。当正在读取(或写入)特定的 record 时,处理后的 flag 将从false翻转到true。然后,SQL 语句可以在where子句中包含一个额外的语句,例如where PROCESSED_IND = false,从而确保在重新启动时只返回未处理的记录。在这种情况下,最好不要 store 任何 state,例如当前行号,因为它在重新启动时无关紧要。因此,所有 readers 和 writers 都包含'saveState'property,如下面的 example 所示:

XML Configuration

<bean id="playerSummarizationSource" class="org.spr...JdbcCursorItemReader">
    <property name="dataSource" ref="dataSource" />
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.PlayerSummaryMapper" />
    </property>
    <property name="saveState" value="false" />
    <property name="sql">
        <value>
            SELECT games.player_id, games.year_no, SUM(COMPLETES),
            SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),
            SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),
            SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)
            from games, players where players.player_id =
            games.player_id group by games.player_id, games.year_no
        </value>
    </property>
</bean>

Java Configuration

@Bean
public JdbcCursorItemReader playerSummarizationSource(DataSource dataSource) {
        return new JdbcCursorItemReaderBuilder<PlayerSummary>()
                                .dataSource(dataSource)
                                .rowMapper(new PlayerSummaryMapper())
                                .saveState(false)
                                .sql("SELECT games.player_id, games.year_no, SUM(COMPLETES),"
                                  + "SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),"
                                  + "SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),"
                                  + "SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)"
                                  + "from games, players where players.player_id ="
                                  + "games.player_id group by games.player_id, games.year_no")
                                .build();

}

上面配置的ItemReader不会在ExecutionContext中为其参与的任何执行创建任何条目。

1.14. Creating Custom ItemReaders 和 ItemWriters

到目前为止,本章已经讨论了 Spring Batch 中读写的基本 contracts 以及一些 common implementations。但是,这些都是相当通用的,out-of-the-box implementations 可能没有涵盖许多潜在的场景。本节通过使用简单的 example 显示如何创建自定义ItemReaderItemWriter implementation 并正确实现其 contracts。 ItemReader还在 order 中实现了ItemStream,以说明如何使 reader 或 writer 可重启。

1.14.1. 自定义 ItemReader Example

出于此 example 的目的,我们创建了一个简单的ItemReader implementation,它从提供的列表中读取。我们首先实现ItemReader的最基本 contract,read方法,如下面的 code 所示:

public class CustomItemReader<T> implements ItemReader<T>{

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NonTransientResourceException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

前面的 class 获取一个项目列表,并在 time 返回一个项目,从列表中删除每个项目。当列表为空时,它返回null,从而满足ItemReader的最基本要求,如下面的 test code 所示:

List<String> items = new ArrayList<String>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<String>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
使 ItemReader 可重新启动

最后的挑战是让ItemReader重新启动。目前,如果处理被中断并再次开始,则ItemReader必须从头开始。这在许多情况下实际上都是有效的,但有时最好是批量 job 从它停止的地方重新启动。 key 判别式通常是 reader 是有状态的还是 stateless 的。 stateless reader 不需要担心可重启性,但有状态的 reader 必须尝试在重启时重建其最后一个已知的 state。出于这个原因,我们建议您尽可能保持自定义 readers stateless,因此您不必担心可重启性。

如果确实需要 store state,则应使用ItemStream接口:

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException, NonTransientResourceException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }

        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if(executionContext.containsKey(CURRENT_INDEX)){
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else{
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

在每次调用ItemStream update方法时,ItemReader的当前索引存储在提供的ExecutionContext中,key 为_current.index。调用ItemStream open方法时,将检查ExecutionContext以查看它是否包含带有该 key 的条目。如果找到 key,则将当前索引移动到该位置。这是一个相当简单的例子,但它仍然符合一般的 contract:

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<String>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<String>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

大多数ItemReaders都有更复杂的重启逻辑。 JdbcCursorItemReader,对于 example,存储游标中最后处理的行的行 ID。

值得注意的是中使用的 key 不应该是微不足道的。那是因为Step用于Step中的所有ItemStreams。在大多数情况下,只需在 class name 之前加上 key 就足以保证唯一性。但是,在极少数情况下,同一个 step 中使用了两个相同类型的ItemStream(如果输出需要两个 files,则会发生这种情况),需要一个更独特的 name。出于这个原因,许多 Spring Batch ItemReaderItemWriter __mplement 都有一个setName() property,可以覆盖这个 key name。

1.14.2. 自定义 ItemWriter Example

实现自定义ItemWriter在很多方面类似于上面的ItemReader example,但在足够的方面有所不同,以保证自己的 example。但是,添加可重启性基本相同,因此本示例中未涉及。与ItemReader example 一样,order 中使用List以使 example 尽可能简单:

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(List<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}
使 ItemWriter 可重新启动

为了使ItemWriter可重新启动,我们将遵循与ItemReader相同的 process,添加和实现ItemStream接口以同步执行 context。在 example 中,我们可能必须计算处理的项目数,并将其添加为页脚 record。如果我们需要这样做,我们可以在ItemWriter中实现ItemStream,这样如果流是 re-opened,则从执行 context 重构计数器。

在许多实际情况中,自定义ItemWriters也委托给另一个本身可重新启动的 writer(例如,在写入文件时),或者它写入 transactional 资源,因此不需要重新启动,因为它是 stateless。当你有一个有状态的 writer 时,你应该确保实现ItemStream以及ItemWriter。还要记住 writer 的 client 需要知道ItemStream,因此您可能需要在 configuration 中将其注册为流。

1.15. Item Reader 和 Writer Implementations

在本节中,我们将向您介绍尚未在前面部分中讨论过的 readers 和 writers。

1.15.1. 装饰

在某些情况下,用户需要将特殊行为附加到 pre-existing ItemReader。 Spring Batch 提供了一些开箱即用的装饰器,可以为ItemReaderItemWriter __mplement 添加额外的行为。

Spring Batch 包含以下装饰器:

  • SynchronizedItemStreamReader

  • SingleItemPeekableItemReader

  • MultiResourceItemWriter

  • ClassifierCompositeItemWriter

  • ClassifierCompositeItemProcessor

SynchronizedItemStreamReader

当使用非线程安全的ItemReader时,Spring Batch 提供SynchronizedItemStreamReader装饰器,可用于使ItemReader线程安全。 Spring Batch 提供SynchronizedItemStreamReaderBuilder来构造SynchronizedItemStreamReader的实例。

SingleItemPeekableItemReader

Spring Batch 包含一个装饰器,它为ItemReader添加一个 peek 方法。这个 peek 方法让用户可以向前看一个 item。重复 calls 到 peek 返回相同的 item,这是从read方法返回的下一个 item。 Spring Batch 提供SingleItemPeekableItemReaderBuilder来构造SingleItemPeekableItemReader的实例。

SingleItemPeekableItemReader 的 peek 方法不是 thread-safe,因为它无法在多个线程中实现窥视。只有一个偷看的线程会在下一次调用中获得 item。

MultiResourceItemWriter

当当前资源中写入的项数超过itemCountLimitPerResource时,MultiResourceItemWriter包装ResourceAwareItemWriterItemStream并创建新的输出资源。 Spring Batch 提供MultiResourceItemWriterBuilder来构造MultiResourceItemWriter的实例。

ClassifierCompositeItemWriter

ClassifierCompositeItemWriter _call 为每个 item 的一个ItemWriter __mplement 的集合之一,基于通过提供的Classifier实现的 router pattern。如果所有委托都是 thread-safe,则 implementation 为 thread-safe。 Spring Batch 提供ClassifierCompositeItemWriterBuilder来构造ClassifierCompositeItemWriter的实例。

ClassifierCompositeItemProcessor

ClassifierCompositeItemProcessor是一个ItemProcessor,它根据通过提供的Classifier实现的 router pattern 来调用ItemProcessor __mplementations 的集合之一。 Spring Batch 提供ClassifierCompositeItemProcessorBuilder来构造ClassifierCompositeItemProcessor的实例。

1.15.2. 消息读者和 Writers

Spring Batch 为常用的消息传递系统提供以下 readers 和 writers:

  • AmqpItemReader

  • AmqpItemWriter

  • JmsItemReader

  • JmsItemWriter

AmqpItemReader

AmqpItemReaderItemReader,它使用AmqpTemplate来接收或转换来自交换的消息。 Spring Batch 提供AmqpItemReaderBuilder来构造AmqpItemReader的实例。

AmqpItemWriter

AmqpItemWriterItemWriter,它使用AmqpTemplate将消息发送到 AMQP 交换机。如果未在提供的AmqpTemplate中指定 name,则将消息发送到无名交换。 Spring Batch 提供AmqpItemWriterBuilder来构造AmqpItemWriter的实例。

JmsItemReader

对于使用JmsTemplate的 JMS,JmsItemReaderItemReader。模板应具有默认目标,该目标用于为read()方法提供项目。 Spring Batch 提供JmsItemReaderBuilder来构造JmsItemReader的实例。

JmsItemWriter

对于使用JmsTemplate的 JMS,JmsItemWriterItemWriter。模板应该有一个默认目标,用于在write(List)中发送项目。 Spring Batch 提供JmsItemWriterBuilder来构造JmsItemWriter的实例。

1.15.3. 数据库读者

Spring Batch 提供以下数据库 readers:

  • Neo4jItemReader

  • MongoItemReader

  • HibernateCursorItemReader

  • HibernatePagingItemReader

  • RepositoryItemReader

Neo4jItemReader

Neo4jItemReaderItemReader,它通过使用分页技术从图形数据库 Neo4j 中读取 objects。 Spring Batch 提供Neo4jItemReaderBuilder来构造Neo4jItemReader的实例。

MongoItemReader

MongoItemReaderItemReader,它使用分页技术从 MongoDB 中读取文档。 Spring Batch 提供MongoItemReaderBuilder来构造MongoItemReader的实例。

HibernateCursorItemReader

HibernateCursorItemReaderItemStreamReader用于读取构建在 Hibernate 之上的数据库记录。它执行 HQL 查询,然后在初始化时,在调用read()方法时迭代结果集,连续返回对应于当前行的 object。 Spring Batch 提供HibernateCursorItemReaderBuilder来构造HibernateCursorItemReader的实例。

HibernatePagingItemReader

HibernatePagingItemReader是一个ItemReader,用于读取构建在 Hibernate 之上的数据库记录,并且只能在 time 读取固定数量的项目。 Spring Batch 提供HibernatePagingItemReaderBuilder来构造HibernatePagingItemReader的实例。

RepositoryItemReader

RepositoryItemReaderItemReader,它使用PagingAndSortingRepository读取记录。 Spring Batch 提供RepositoryItemReaderBuilder来构造RepositoryItemReader的实例。

1.15.4. 数据库 Writers

Spring Batch 提供以下数据库 writers:

  • Neo4jItemWriter

  • MongoItemWriter

  • RepositoryItemWriter

  • HibernateItemWriter

  • JdbcBatchItemWriter

  • JpaItemWriter

  • GemfireItemWriter

Neo4jItemWriter

Neo4jItemWriter是一个写入 Neo4j 数据库的ItemWriter implementation。 Spring Batch 提供Neo4jItemWriterBuilder来构造Neo4jItemWriter的实例。

MongoItemWriter

MongoItemWriter是一个ItemWriter implementation,它使用 Spring Data 的MongoOperations的 implementation 来写入 MongoDB store。 Spring Batch 提供MongoItemWriterBuilder来构造MongoItemWriter的实例。

RepositoryItemWriter

RepositoryItemWriter是来自 Spring Data 的CrudRepositoryItemWriter wrapper。 Spring Batch 提供RepositoryItemWriterBuilder来构造RepositoryItemWriter的实例。

HibernateItemWriter

HibernateItemWriterItemWriter,它使用 Hibernate session 来保存或更新不属于当前 Hibernate session 的实体。 Spring Batch 提供HibernateItemWriterBuilder来构造HibernateItemWriter的实例。

JdbcBatchItemWriter

JdbcBatchItemWriter是一个ItemWriter,它使用NamedParameterJdbcTemplate中的批处理 features 为所有提供的项目执行一批 statements。 Spring Batch 提供JdbcBatchItemWriterBuilder来构造JdbcBatchItemWriter的实例。

JpaItemWriter

JpaItemWriter是一个ItemWriter,它使用 JPA EntityManagerFactory来合并任何不属于持久性 context 的实体。 Spring Batch 提供JpaItemWriterBuilder来构造JpaItemWriter的实例。

GemfireItemWriter

GemfireItemWriter是一个ItemWriter,它使用store GemFire 中的项目作为 key/value 对。 Spring Batch 提供GemfireItemWriterBuilder来构造GemfireItemWriter的实例。

1.15.5. 专业读者

Spring Batch 提供以下专业 readers:

  • LdifReader

  • MappingLdifReader

LdifReader

LdifReaderResource读取 LDIF(LDAP 数据交换格式)记录,解析它们,并为每个read执行返回LdapAttribute object。 Spring Batch 提供LdifReaderBuilder来构造LdifReader的实例。

MappingLdifReader

MappingLdifReaderResource读取 LDIF(LDAP 数据交换格式)记录,解析它们然后 maps 每个 LDIF 记录到 POJO(Plain Old Java Object)。每次读取都返回一个 POJO。 Spring Batch 提供MappingLdifReaderBuilder来构造MappingLdifReader的实例。

1.15.6. 专业的作家

Spring Batch 提供以下专业 writers:

  • SimpleMailMessageItemWriter
SimpleMailMessageItemWriter

SimpleMailMessageItemWriter是可以发送邮件消息的ItemWriter。它将实际发送的消息委托给MailSender的实例。 Spring Batch 提供SimpleMailMessageItemWriterBuilder来构造SimpleMailMessageItemWriter的实例。

1.15.7. 专业处理器

Spring Batch 提供以下专业处理器:

  • ScriptItemProcessor
ScriptItemProcessor

ScriptItemProcessorItemProcessor,它将当前 item 传递给 process 到提供的脚本,并且处理器返回脚本的结果。 Spring Batch 提供ScriptItemProcessorBuilder来构造ScriptItemProcessor的实例。