1. ItemReaders 和 ItemWriters

所有批处理都可以以其最简单的形式描述为读取大量数据,执行某种类型的计算或转换并写出结果。 Spring Batch 提供了三个关键接口来帮助执行批量读取和写入:ItemReaderItemProcessorItemWriter

1.1. ItemReader

尽管是一个简单的概念,但ItemReader是用于从许多不同类型的 Importing 中提供数据的方法。最一般的示例包括:

还有更多的可能性,但在本章中我们将重点介绍基本的可能性。 Appendix A中可以找到所有可用的ItemReader实现的完整列表。

ItemReader是通用 Importing 操作的基本接口,如以下接口定义所示:

public interface ItemReader<T> {

    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;

}

read方法定义ItemReader的最基本 Contract。调用它会返回一个 Item;如果没有更多 Item,则返回null。一个 Item 可能代表文件中的一行,数据库中的一行或 XML 文件中的元素。通常希望将它们 Map 到可用的域对象(例如TradeFoo或其他),但是 Contract 中没有要求这样做。

预期ItemReader接口的实现仅向前。但是,如果基础资源是事务性的(例如 JMS 队列),则在回滚场景中,调用read可能会在后续调用中返回相同的逻辑项。还值得注意的是,ItemReader缺少要处理的 Item 不会导致引发异常。例如,配置有返回 0 结果的查询的数据库ItemReader在第一次调用 read 时将返回null

1.2. ItemWriter

ItemWriter在功能上与ItemReader类似,但是具有相反的运算。资源仍然需要定位,打开和关闭,但是它们的区别在于ItemWriter是写出而不是读入。对于数据库或队列,这些操作可能是插入,更新或发送。输出序列化的格式特定于每个批处理作业。

ItemReader一样,ItemWriter是一个相当通用的接口,如以下接口定义所示:

public interface ItemWriter<T> {

    void write(List<? extends T> items) throws Exception;

}

ItemReader上的read一样,write提供ItemWriter的基本 Contract。只要打开,它就会尝试写出传入 Item 的列表。因为通常期望将 Item“分批”在一起,然后输出,所以接口接受 Item 列表,而不是 Item 本身。写入列表后,可以执行任何必要的刷新操作,然后再从 write 方法返回。例如,如果写入一个 Hibernate DAO,则可以进行多个写入操作,每个 Item 一个。然后,Writer 可以在休眠会话上调用flush,然后再返回。

1.3. ItemProcessor

ItemReaderItemWriter接口对于它们的特定任务都非常有用,但是如果要在编写之前插入业务逻辑怎么办?读写的一种选择是使用复合模式:创建包含另一个ItemWriterItemWriter或包含另一个ItemReaderItemReader。以下代码显示了一个示例:

public class CompositeItemWriter<T> implements ItemWriter<T> {

    ItemWriter<T> itemWriter;

    public CompositeItemWriter(ItemWriter<T> itemWriter) {
        this.itemWriter = itemWriter;
    }

    public void write(List<? extends T> items) throws Exception {
        //Add business logic here
       itemWriter.write(items);
    }

    public void setDelegate(ItemWriter<T> itemWriter){
        this.itemWriter = itemWriter;
    }
}

上一类包含另一个ItemWriter,它在提供了一些业务逻辑后将其委托给该ItemWriter。该模式也可以很容易地用于ItemReader,也许可以基于主ItemReader提供的 Importing 来获取更多参考数据。如果您需要自己控制对write的调用,它也很有用。但是,如果您只想在实际写入之前“转换”传递给写入的 Item,则无需自己write。您可以只修改 Item。对于这种情况,Spring Batch 提供了ItemProcessor接口,如以下接口定义所示:

public interface ItemProcessor<I, O> {

    O process(I item) throws Exception;
}

ItemProcessor很简单。给定一个对象,对其进行转换,然后返回另一个。提供的对象可以是或可以不是相同的类型。关键是可以在流程中应用业务逻辑,并且完全由开发人员来创建该逻辑。 ItemProcessor可以直接连接到步骤中。例如,假设ItemReader提供了Foo类型的类,并且在将其写出之前需要将其转换为Bar类型。以下示例显示了执行转换的ItemProcessor

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class FooProcessor implements ItemProcessor<Foo,Bar>{
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarWriter implements ItemWriter<Bar>{
    public void write(List<? extends Bar> bars) throws Exception {
        //write bars
    }
}

在前面的示例中,存在一个类Foo,一个类Bar和一个FooProcessor,它们坚持ItemProcessor接口。转换很简单,但是任何类型的转换都可以在这里完成。 BarWriter写入Bar对象,如果提供任何其他类型,则抛出异常。同样,如果提供了Foo以外的内容,则FooProcessor引发异常。然后可以将FooProcessor注入到Step中,如以下示例所示:

XML Configuration

<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="fooProcessor" writer="barWriter"
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>

Java Configuration

@Bean
public Job ioSampleJob() {
        return this.jobBuilderFactory.get("ioSampleJOb")
                                .start(step1())
                                .end()
                                .build();
}

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(fooReader())
                                .processor(fooProcessor())
                                .writer(barWriter())
                                .build();
}

1.3.1. 链接项处理器

执行单个转换在许多情况下很有用,但是如果要将多个ItemProcessor实现“链接”在一起怎么办?这可以使用前面提到的复合图案来完成。要更新先前的单个转换,例如Foo转换为Bar,然后转换为Foobar并写出,如以下示例所示:

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class Foobar {
    public Foobar(Bar bar) {}
}

public class FooProcessor implements ItemProcessor<Foo,Bar>{
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarProcessor implements ItemProcessor<Bar,Foobar>{
    public Foobar process(Bar bar) throws Exception {
        return new Foobar(bar);
    }
}

public class FoobarWriter implements ItemWriter<Foobar>{
    public void write(List<? extends Foobar> items) throws Exception {
        //write items
    }
}

FooProcessorBarProcessor可以“链接”在一起以得到结果Foobar,如以下示例所示:

CompositeItemProcessor<Foo,Foobar> compositeProcessor =
                                      new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooTransformer());
itemProcessors.add(new BarTransformer());
compositeProcessor.setDelegates(itemProcessors);

与前面的示例一样,可以将复合处理器配置为Step

XML Configuration

<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="compositeItemProcessor" writer="foobarWriter"
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>

<bean id="compositeItemProcessor"
      class="org.springframework.batch.item.support.CompositeItemProcessor">
    <property name="delegates">
        <list>
            <bean class="..FooProcessor" />
            <bean class="..BarProcessor" />
        </list>
    </property>
</bean>

Java Configuration

@Bean
public Job ioSampleJob() {
        return this.jobBuilderFactory.get("ioSampleJob")
                                .start(step1())
                                .end()
                                .build();
}

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(fooReader())
                                .processor(compositeProcessor())
                                .writer(foobarWriter())
                                .build();
}

@Bean
public CompositeItemProcessor compositeProcessor() {
        List<ItemProcessor> delegates = new ArrayList<>(2);
        delegates.add(new FooProcessor());
        delegates.add(new BarProcessor());

        CompositeItemProcessor processor = new CompositeItemProcessor();

        processor.setDelegates(delegates);

        return processor;
}

1.3.2. 筛选记录

Item 处理器的一种典型用法是在将记录传递给ItemWriter之前过滤掉记录。过滤是不同于跳过的动作。跳过表示一条记录无效,而过滤只是表明不应写入一条记录。

例如,考虑一个批处理作业,该作业读取一个包含三种不同类型记录的文件:要插入的记录,要更新的记录和要删除的记录。如果系统不支持删除记录,则我们不希望将任何“删除”记录发送到ItemWriter。但是,由于这些记录实际上并不是不良记录,因此我们希望将其过滤掉而不是跳过它们。结果,ItemWriter将仅接收“插入”和“更新”记录。

要过滤记录,您可以从ItemProcessor返回null。框架检测到结果为null,并避免将该 Item 添加到传递给ItemWriter的记录列表中。与往常一样,从ItemProcessor引发的异常导致跳过。

1.3.3. 容错能力

回滚块时,可能会重新处理读取期间已缓存的 Item。如果将某个步骤配置为容错的(通常通过使用跳过或重试处理),则所使用的任何ItemProcessor都应以幂等的方式实现。通常,这包括对ItemProcessor的 Importing 项不执行任何更改,仅更新作为结果的实例。

1.4. ItemStream

ItemReadersItemWriters都可以很好地满足其各自的目的,但是它们之间存在一个共同的问题,即需要另一个接口。通常,作为批处理作业范围的一部分,需要打开,关闭读取器和写入器,并需要一种持久化状态的机制。 ItemStream接口可达到此目的,如以下示例所示:

public interface ItemStream {

    void open(ExecutionContext executionContext) throws ItemStreamException;

    void update(ExecutionContext executionContext) throws ItemStreamException;

    void close() throws ItemStreamException;
}

在描述每种方法之前,我们应该提到ExecutionContext。同样实现ItemStreamItemReader的 Client 端应在对read的任何调用之前先调用open,以打开文件等任何资源或获得连接。类似的限制适用于实现ItemStreamItemWriter。如第 2 章所述,如果在ExecutionContext中找到了预期的数据,则可以使用它在初始状态以外的位置启动ItemReaderItemWriter。相反,调用close以确保安全释放打开期间分配的所有资源。调用update主要是为了确保当前保持的任何状态都已加载到提供的ExecutionContext中。在提交之前调用此方法,以确保在提交之前将当前状态保留在数据库中。

ItemStream的 Client 端是Step(来自 Spring Batch Core)的特殊情况下,将为每个 StepExecution 创建一个ExecutionContext,以允许用户存储特定执行的状态,并期望如果相同则返回该状态。 JobInstance重新开始。对于那些熟悉 Quartz 的人来说,其语义与 Quartz JobDataMap非常相似。

1.5. 委托模式和步骤注册

请注意,CompositeItemWriter是委派模式的示例,在 Spring Batch 中很常见。委托本身可以实现回调接口,例如StepListener。如果这样做了,并且它们与 Spring Batch Core 一起作为JobStep的一部分使用,那么几乎可以肯定他们需要在Step中手动注册。如果直接实现ItemStreamStepListener接口,则直接连接到Step的读取器,写入器或处理器会自动注册。但是,由于Step未知委托,因此需要将它们作为侦听器或流(或在适当时将两者同时注入)注入,如以下示例所示:

XML Configuration

<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="fooProcessor" writer="compositeItemWriter"
                   commit-interval="2">
                <streams>
                    <stream ref="barWriter" />
                </streams>
            </chunk>
        </tasklet>
    </step>
</job>

<bean id="compositeItemWriter" class="...CustomCompositeItemWriter">
    <property name="delegate" ref="barWriter" />
</bean>

<bean id="barWriter" class="...BarWriter" />

Java Configuration

@Bean
public Job ioSampleJob() {
        return this.jobBuilderFactory.get("ioSampleJob")
                                .start(step1())
                                .end()
                                .build();
}

@Bean
public Step step1() {
        return this.stepBuilderFactory.get("step1")
                                .<String, String>chunk(2)
                                .reader(fooReader())
                                .processor(fooProcessor())
                                .writer(compositeItemWriter())
                                .stream(barWriter())
                                .build();
}

@Bean
public CustomCompositeItemWriter compositeItemWriter() {

        CustomCompositeItemWriter writer = new CustomCompositeItemWriter();

        writer.setDelegate(barWriter());

        return writer;
}

@Bean
public BarWriter barWriter() {
        return new BarWriter();
}

1.6. 平面文件

交换批量数据的最常见机制之一一直是平面文件。与 XML 具有定义其结构化(XSD)的公认标准不同,任何阅读平面文件的人都必须提前了解文件的结构。通常,所有平面文件都分为两种:定界文件和定长文件。分隔文件是指用逗号分隔分隔符的字段。固定长度文件具有设置长度的字段。

1.6.1. FieldSet

在 Spring Batch 中使用平面文件时,无论是用于 Importing 还是输出,最重要的类之一是FieldSet。许多体系结构和库都包含用于帮助您从文件读入的抽象,但是它们通常返回StringString对象的数组。这真的只会让您半途而废。 FieldSet是 Spring Batch 的抽象,用于启用文件资源中字段的绑定。它使开发人员可以像处理数据库 Importing 一样使用文件 Importing。 FieldSet在概念上与 JDBC ResultSet类似。 FieldSet仅需要一个参数:String令牌数组。 (可选)您还可以配置字段的名称,以便可以按ResultSet之后的模式通过索引或名称来访问字段,如以下示例所示:

String[] tokens = new String[]{"foo", "1", "true"};
FieldSet fs = new DefaultFieldSet(tokens);
String name = fs.readString(0);
int value = fs.readInt(1);
boolean booleanValue = fs.readBoolean(2);

FieldSet界面上还有更多选项,例如Date,long,BigDecimal等等。 FieldSet的最大优点是它提供了平面文件 Importing 的一致解析。在处理由格式异常引起的错误或进行简单的数据转换时,它可以保持一致,而不是使每个批处理作业以潜在的意外方式进行不同的解析。

1.6.2. FlatFileItemReader

平面文件是最多包含二维(表格)数据的任何类型的文件。名为FlatFileItemReader的类有助于在 Spring Batch 框架中读取平面文件,该类提供了用于读取和解析平面文件的基本功能。 FlatFileItemReader的两个最重要的必需依存关系是ResourceLineMapperLineMapper接口将在下一部分中进行详细介绍。 resource 属性表示一个 Spring Core Resource。可以在Spring 框架,第 5 章。资源中找到说明如何创建此类 bean 的文档。因此,除了显示以下简单示例之外,本指南不介绍创建Resource对象的详细信息:

Resource resource = new FileSystemResource("resources/trades.csv");

在复杂的批处理环境中,目录结构通常由 EAI 基础结构 Management,在 EAI 基础结构中,构建了用于外部接口的放置区,以将文件从 FTP 位置移动到批处理位置,反之亦然。文件移动 Util 超出了 Spring Batch 体系结构的范围,但是批处理作业流中包括文件移动 Util 作为作业流中的步骤并不少见。批处理体系结构只需要知道如何找到要处理的文件。 Spring Batch 从此起点开始将数据馈入管道的过程。但是,Spring Integration提供了许多此类服务。

FlatFileItemReader中的其他属性使您可以进一步指定数据的解释方式,如下表所述:

表 1. FlatFileItemReader属性

Property Type Description
comments String[] 指定指示 Comments 行的行前缀。
encoding String 指定要使用的文本编码。默认值为Charset.defaultCharset()
lineMapper LineMapper 将表示 Item 的String转换为Object
linesToSkip int 文件顶部要忽略的行数。
recordSeparatorPolicy RecordSeparatorPolicy 用于确定行尾的位置,并执行诸如在带引号的字符串中 continue 到行尾的操作。
resource Resource 从中读取资源。
skippedLinesCallback LineCallbackHandler 传递要跳过的文件中各行的原始行内容的接口。如果linesToSkip设置为 2,则此接口将被调用两次。
strict boolean 在严格模式下,如果 Importing 资源不存在,则读取器将在ExecutionContext上引发异常。否则,它将记录问题并 continue。
LineMapper

RowMapper一样,它采用诸如ResultSet之类的低级构造并返回Object,平面文件处理需要相同的构造才能将String行转换为Object,如以下接口定义所示:

public interface LineMapper<T> {

    T mapLine(String line, int lineNumber) throws Exception;

}

基本约定是,给定当前行及其关联的行号,Map 器应返回结果域对象。这类似于RowMapper,因为每一行都与其行号相关联,就像ResultSet中的每一行都与其行号绑定一样。这允许将行号绑定到结果域对象,以进行身份比较或提供更多信息。但是,与RowMapper不同,LineMapper被赋予了原始行,如上所述,该原始行只会使您到达中间。该行必须标记为FieldSet,然后可以将其 Map 到一个对象,如本文档后面所述。

LineTokenizer

必须将 Importing 行转换为FieldSet的抽象,因为可能需要将多种格式的平面文件数据转换为FieldSet。在 Spring Batch 中,此接口是LineTokenizer

public interface LineTokenizer {

    FieldSet tokenize(String line);

}

LineTokenizer的约定是这样的:给定 Importing 行(理论上String可以包含多条线),则返回代表该行的FieldSet。然后可以将此FieldSet传递给FieldSetMapper。 Spring Batch 包含以下LineTokenizer实现:

FieldSetMapper

FieldSetMapper接口定义单个方法mapFieldSet,该方法采用FieldSet对象并将其内容 Map 到对象。根据作业的需要,此对象可以是自定义 DTO,域对象或数组。 FieldSetMapperLineTokenizer结合使用,可将一行数据从资源转换为所需类型的对象,如以下接口定义所示:

public interface FieldSetMapper<T> {

    T mapFieldSet(FieldSet fieldSet) throws BindException;

}

使用的模式与JdbcTemplate使用的RowMapper相同。

DefaultLineMapper

既然已经定义了读取平面文件的基本接口,那么很明显,需要三个基本步骤:

上述两个接口代表两个单独的任务:将线转换为FieldSet并将FieldSetMap 到域对象。因为LineTokenizer的 Importing 与LineMapper的 Importing(一条线)匹配,并且FieldSetMapper的输出与LineMapper的输出匹配,所以提供了同时使用LineTokenizerFieldSetMapper的默认实现。下列类定义中显示的DefaultLineMapper表示大多数用户需要的行为:

public class DefaultLineMapper<T> implements LineMapper<>, InitializingBean {

    private LineTokenizer tokenizer;

    private FieldSetMapper<T> fieldSetMapper;

    public T mapLine(String line, int lineNumber) throws Exception {
        return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
    }

    public void setLineTokenizer(LineTokenizer tokenizer) {
        this.tokenizer = tokenizer;
    }

    public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) {
        this.fieldSetMapper = fieldSetMapper;
    }
}

以上功能是默认实现中提供的,而不是内置于 Reader 本身中(如在框架的先前版本中所做的那样),以使用户在控制解析过程时具有更大的灵 Active,尤其是在需要访问原始行的情况下。

简单分隔文件读取示例

以下示例说明了如何使用实际域方案读取平面文件。这个特定的批处理作业从以下文件中读取足球运动员:

ID,lastName,firstName,position,birthYear,debutYear
"AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996",
"AbduRa00,Abdullah,Rabih,rb,1975,1999",
"AberWa00,Abercrombie,Walter,rb,1959,1982",
"AbraDa00,Abramowicz,Danny,wr,1945,1967",
"AdamBo00,Adams,Bob,te,1946,1969",
"AdamCh00,Adams,Charlie,wr,1979,2003"

该文件的内容 Map 到以下Player域对象:

public class Player implements Serializable {

    private String ID;
    private String lastName;
    private String firstName;
    private String position;
    private int birthYear;
    private int debutYear;

    public String toString() {
        return "PLAYER:ID=" + ID + ",Last Name=" + lastName +
            ",First Name=" + firstName + ",Position=" + position +
            ",Birth Year=" + birthYear + ",DebutYear=" +
            debutYear;
    }

    // setters and getters...
}

要将FieldSetMap 到Player对象,需要定义一个返回玩家的FieldSetMapper,如以下示例所示:

protected static class PlayerFieldSetMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fieldSet) {
        Player player = new Player();

        player.setID(fieldSet.readString(0));
        player.setLastName(fieldSet.readString(1));
        player.setFirstName(fieldSet.readString(2));
        player.setPosition(fieldSet.readString(3));
        player.setBirthYear(fieldSet.readInt(4));
        player.setDebutYear(fieldSet.readInt(5));

        return player;
    }
}

然后可以通过正确构造FlatFileItemReader并调用read来读取文件,如以下示例所示:

FlatFileItemReader<Player> itemReader = new FlatFileItemReader<Player>();
itemReader.setResource(new FileSystemResource("resources/players.csv"));
//DelimitedLineTokenizer defaults to comma as its delimiter
DefaultLineMapper<Player> lineMapper = new DefaultLineMapper<Player>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new PlayerFieldSetMapper());
itemReader.setLineMapper(lineMapper);
itemReader.open(new ExecutionContext());
Player player = itemReader.read();

每次对read的调用都会从文件的每一行返回一个新的Player对象。当到达文件末尾时,返回null

按名称 Map 字段

DelimitedLineTokenizerFixedLengthTokenizer都允许有一项附加功能,其功能与 JDBC ResultSet类似。字段的名称可以被注入到这些LineTokenizer实现中,以提高 Map 函数的可读性。首先,将平面文件中所有字段的列名注入令牌生成器,如以下示例所示:

tokenizer.setNames(new String[] {"ID", "lastName","firstName","position","birthYear","debutYear"});

FieldSetMapper可以使用以下信息:

public class PlayerMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fs) {

       if(fs == null){
           return null;
       }

       Player player = new Player();
       player.setID(fs.readString("ID"));
       player.setLastName(fs.readString("lastName"));
       player.setFirstName(fs.readString("firstName"));
       player.setPosition(fs.readString("position"));
       player.setDebutYear(fs.readInt("debutYear"));
       player.setBirthYear(fs.readInt("birthYear"));

       return player;
   }
}
将字段集自动 Map 到域对象

对于许多人来说,必须编写特定的FieldSetMapper与为JdbcTemplate编写特定的RowMapper一样麻烦。 Spring Batch 通过提供FieldSetMapper来简化此过程,该FieldSetMapper通过使用 JavaBean 规范将字段名称与对象上的设置器进行匹配来自动 Map 字段。再次使用 Football 示例,BeanWrapperFieldSetMapper配置类似于以下代码片段:

XML Configuration

<bean id="fieldSetMapper"
      class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
    <property name="prototypeBeanName" value="player" />
</bean>

<bean id="player"
      class="org.springframework.batch.sample.domain.Player"
      scope="prototype" />

Java Configuration

@Bean
public FieldSetMapper fieldSetMapper() {
        BeanWrapperFieldSetMapper fieldSetMapper = new BeanWrapperFieldSetMapper();

        fieldSetMapper.setPrototypeBeanName("player");

        return fieldSetMapper;
}

@Bean
@Scope("prototype")
public Player player() {
        return new Player();
}

对于FieldSet中的每个条目,Map 器都在Player对象的新实例上寻找对应的 setter(因此,需要原型作用域),就像 Spring 容器寻找与属性名称匹配的 setter 一样。MapFieldSet中的每个可用字段,并返回生成的Player对象,而无需代码。

定长文件格式

到目前为止,仅详细讨论了定界文件。但是,它们仅代表文件读取图片的一半。许多使用平面文件的组织都使用固定长度格式。固定长度文件示例如下:

UK21341EAH4121131.11customer1
UK21341EAH4221232.11customer2
UK21341EAH4321333.11customer3
UK21341EAH4421434.11customer4
UK21341EAH4521535.11customer5

虽然这 Watch 起来像一个大字段,但实际上代表了 4 个不同的字段:

配置FixedLengthLineTokenizer时,必须以范围的形式提供每个长度,如以下示例所示:

XML Configuration

<bean id="fixedLengthLineTokenizer"
      class="org.springframework.batch.io.file.transform.FixedLengthTokenizer">
    <property name="names" value="ISIN,Quantity,Price,Customer" />
    <property name="columns" value="1-12, 13-15, 16-20, 21-29" />
</bean>

因为FixedLengthLineTokenizer使用与上面讨论的相同的LineTokenizer接口,所以它返回的FieldSet就像使用了分隔符一样。这允许在处理其输出时使用相同的方法,例如使用BeanWrapperFieldSetMapper

Note

要支持上述范围语法,需要在ApplicationContext中配置专门的属性编辑器RangeArrayPropertyEditor。但是,此 Bean 是在使用批处理名称空间的ApplicationContext中自动声明的。

Java Configuration

@Bean
public FixedLengthTokenizer fixedLengthTokenizer() {
        FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();

        tokenizer.setNames("ISIN", "Quantity", "Price", "Customer");
        tokenizer.setColumns(new Range(1-12),
                                                new Range(13-15),
                                                new Range(16-20),
                                                new Range(21-29));

        return tokenizer;
}

因为FixedLengthLineTokenizer使用与上面讨论的相同的LineTokenizer接口,所以它返回的FieldSet就像使用了分隔符一样。这样就可以使用相同的方法来处理其输出,例如使用BeanWrapperFieldSetMapper

单个文件中的多种记录类型

到目前为止,为简单起见,所有文件读取示例都作了一个关键假设:文件中的所有记录都具有相同的格式。但是,并非总是如此。通常,文件中的记录可能具有不同的格式,需要对其进行不同的标记和 Map 到不同的对象。以下文件摘录对此进行了说明:

USER;Smith;Peter;;T;20014539;F
LINEA;1044391041ABC037.49G201XX1383.12H
LINEB;2134776319DEF422.99M005LI

在此文件中,我们有三种类型的记录:“ USER”,“ LINEA”和“ LINEB”。 “ USER”行对应于User对象。尽管“ LINEA”比“ LINEB”具有更多信息,但“ LINEA”和“ LINEB”都对应于Line个对象。

ItemReader分别读取每一行,但是我们必须指定不同的LineTokenizerFieldSetMapper对象,以便ItemWriter接收正确的 Item。 PatternMatchingCompositeLineMapper允许配置模式到LineTokenizer实例的 Map 以及模式到FieldSetMapper实例的 Map,因此很容易,如以下示例所示:

XML Configuration

<bean id="orderFileLineMapper"
      class="org.spr...PatternMatchingCompositeLineMapper">
    <property name="tokenizers">
        <map>
            <entry key="USER*" value-ref="userTokenizer" />
            <entry key="LINEA*" value-ref="lineATokenizer" />
            <entry key="LINEB*" value-ref="lineBTokenizer" />
        </map>
    </property>
    <property name="fieldSetMappers">
        <map>
            <entry key="USER*" value-ref="userFieldSetMapper" />
            <entry key="LINE*" value-ref="lineFieldSetMapper" />
        </map>
    </property>
</bean>

Java Configuration

@Bean
public PatternMatchingCompositeLineMapper orderFileLineMapper() {
        PatternMatchingCompositeLineMapper lineMapper =
                new PatternMatchingCompositeLineMapper();

        Map<String, LineTokenizer> tokenizers = new HashMap<>(3);
        tokenizers.put("USER*", userTokenizer());
        tokenizers.put("LINEA*", lineATokenizer());
        tokenizers.put("LINEB*", lineBTokenizer());

        lineMapper.setTokenizers(tokenizers);

        Map<String, FieldSetMapper> mappers = new HashMap<>(2);
        mappers.put("USER*", userFieldSetMapper());
        mappers.put("LINE*", lineFieldSetMapper());

        lineMapper.setFieldSetMappers(mappers);

        return lineMapper;
}

在此示例中,“ LINEA”和“ LINEB”具有单独的LineTokenizer实例,但它们都使用相同的FieldSetMapper

PatternMatchingCompositeLineMapper使用PatternMatcher#match方法以便为每行选择正确的委托。 PatternMatcher允许使用两个具有特殊含义的通配符:问号(“?”)恰好匹配一个字符,而星号(“ *”)则匹配零个或多个字符。请注意,在上述配置中,所有模式都以星号结尾,从而使它们有效地成为行的前缀。 PatternMatcher始终匹配最可能的特定模式,而不考虑配置中的 Sequences。因此,如果将“ LINE *”和“ LINEA *”都列为模式,则“ LINEA”将匹配模式“ LINEA *”,而“ LINEB”将匹配模式“ LINE *”。此外,单个星号(“ *”)可以通过匹配未与任何其他模式匹配的任何行作为默认值,如以下示例所示。

XML Configuration

<entry key="*" value-ref="defaultLineTokenizer" />

Java Configuration

...
tokenizers.put("*", defaultLineTokenizer());
...

还有一个PatternMatchingCompositeLineTokenizer可以单独用于令牌化。

平面文件包含每个跨越多行的记录也是很常见的。为了处理这种情况,需要更复杂的策略。 multiLineRecords示例中提供了这种常见模式的演示。

平面文件中的异常处理

在很多情况下,对行进行标记可能会引发异常。许多平面文件并不完美,并且包含格式错误的记录。许多用户选择在记录问题,原始行和行号时跳过这些错误的行。以后可以手动或通过其他批处理作业检查这些日志。因此,Spring Batch 提供了一个用于处理解析异常的异常层次结构:FlatFileParseExceptionFlatFileFormatException。尝试读取文件时遇到任何错误,FlatFileItemReader会引发FlatFileParseExceptionFlatFileFormatExceptionLineTokenizer接口的实现抛出,并指示在标记化时遇到的更具体的错误。

IncorrectTokenCountException

DelimitedLineTokenizerFixedLengthLineTokenizer都可以指定可用于创建FieldSet的列名。但是,如果列名的数量与对行进行标记时找到的列数不匹配,则无法创建FieldSet,并抛出IncorrectTokenCountException,其中包含遇到的标记数和期望的数,如以下示例:

tokenizer.setNames(new String[] {"A", "B", "C", "D"});

try {
    tokenizer.tokenize("a,b,c");
}
catch(IncorrectTokenCountException e){
    assertEquals(4, e.getExpectedCount());
    assertEquals(3, e.getActualCount());
}

因为令牌化程序配置了 4 个列名,但是在文件中仅找到 3 个令牌,所以抛出了IncorrectTokenCountException

IncorrectLineLengthException

解析为固定长度格式的文件在解析时还有其他要求,因为与分隔格式不同,每一列必须严格遵守其 sched 义宽度。如果总行长不等于此列的最宽值,则将引发异常,如以下示例所示:

tokenizer.setColumns(new Range[] { new Range(1, 5),
                                   new Range(6, 10),
                                   new Range(11, 15) });
try {
    tokenizer.tokenize("12345");
    fail("Expected IncorrectLineLengthException");
}
catch (IncorrectLineLengthException ex) {
    assertEquals(15, ex.getExpectedLength());
    assertEquals(5, ex.getActualLength());
}

上面标记器的配置范围是:1-5、6-10 和 11-15.因此,该行的总长度为 15.但是,在前面的示例中,传入了一条长度为 5 的行,从而引发了IncorrectLineLengthException。在这里抛出异常,而不是仅 Map 第一列,可以使行的处理更早地失败,并且具有比尝试在FieldSetMapper的第 2 列中读取失败时包含的更多信息。但是,在某些情况下,线的长度并不总是恒定的。因此,可以通过'strict'属性关闭行长的验证,如以下示例所示:

tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10) });
tokenizer.setStrict(false);
FieldSet tokens = tokenizer.tokenize("12345");
assertEquals("12345", tokens.readString(0));
assertEquals("", tokens.readString(1));

前面的示例几乎与之前的示例相同,只是调用了tokenizer.setStrict(false)。此设置告诉令牌化程序在对行进行令牌化时不要强制行长。现在已正确创建并返回FieldSet。但是,对于其余值,它仅包含空标记。

1.6.3. FlatFileItemWriter

写入平面文件具有相同的问题和必须从文件中读取的问题。步骤必须能够以事务方式编写定界或定长格式。

LineAggregator

就像需要LineTokenizer接口来获取一项并将其变成String一样,文件写入必须具有一种将多个字段聚合到单个字符串中以写入文件的方法。在 Spring Batch 中,这是LineAggregator,如以下接口定义所示:

public interface LineAggregator<T> {

    public String aggregate(T item);

}

LineAggregatorLineTokenizer逻辑相反。 LineTokenizerString并返回FieldSet,而LineAggregatoritem并返回String

PassThroughLineAggregator

LineAggregator接口的最基本实现是PassThroughLineAggregator,它假定对象已经是一个字符串或它的字符串表示形式可以接受写入,如以下代码所示:

public class PassThroughLineAggregator<T> implements LineAggregator<T> {

    public String aggregate(T item) {
        return item.toString();
    }
}

如果需要直接控制创建字符串,但是FlatFileItemWriter的优点(例如事务和重新启动支持)是必需的,则上述实现很有用。

简化文件编写示例

既然已经定义了LineAggregator接口及其最基本的实现PassThroughLineAggregator,那么可以说明基本的编写流程:

FlatFileItemWriter的以下摘录用代码表示:

public void write(T item) throws Exception {
    write(lineAggregator.aggregate(item) + LINE_SEPARATOR);
}

一个简单的配置可能如下所示:

XML Configuration

<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
    <property name="resource" value="file:target/test-outputs/output.txt" />
    <property name="lineAggregator">
        <bean class="org.spr...PassThroughLineAggregator"/>
    </property>
</bean>

Java Configuration

@Bean
public FlatFileItemWriter itemWriter() {
        return  new FlatFileItemWriterBuilder<Foo>()
                                   .name("itemWriter")
                                   .resource(new FileSystemResource("target/test-outputs/output.txt"))
                                   .lineAggregator(new PassThroughLineAggregator<>())
                                   .build();
}
FieldExtractor

前面的示例对于写入文件的最基本用途可能很有用。但是,FlatFileItemWriter的大多数用户都有一个域对象,需要将该域对象写出,因此必须将其转换为一行。在读取文件时,需要满足以下条件:

文件写入具有相似但相反的步骤:

因为框架没有办法知道需要写出对象中的哪些字段,所以必须写一个FieldExtractor来完成将 Item 变成数组的任务,如以下接口定义所示:

public interface FieldExtractor<T> {

    Object[] extract(T item);

}

FieldExtractor接口的实现应从提供的对象的字段中创建一个数组,然后可以使用元素之间的分隔符将其写出,也可以将其写为固定宽度的行的一部分。

PassThroughFieldExtractor

在许多情况下,需要写出集合,例如数组CollectionFieldSet。从这些集合类型之一中“提取”数组非常简单。为此,请将集合转换为数组。因此,在这种情况下应使用PassThroughFieldExtractor。应当注意,如果传入的对象不是集合的类型,则PassThroughFieldExtractor返回仅包含要提取的 Item 的数组。

BeanWrapperFieldExtractor

与在文件读取部分中介绍的BeanWrapperFieldSetMapper一样,通常最好配置如何将域对象转换为对象数组,而不是自己编写转换。 BeanWrapperFieldExtractor提供了此功能,如以下示例所示:

BeanWrapperFieldExtractor<Name> extractor = new BeanWrapperFieldExtractor<Name>();
extractor.setNames(new String[] { "first", "last", "born" });

String first = "Alan";
String last = "Turing";
int born = 1912;

Name n = new Name(first, last, born);
Object[] values = extractor.extract(n);

assertEquals(first, values[0]);
assertEquals(last, values[1]);
assertEquals(born, values[2]);

此提取器实现只有一个必需的属性:要 Map 的字段的名称。就像BeanWrapperFieldSetMapper需要字段名称将FieldSet上的字段 Map 到所提供对象上的 setter 一样,BeanWrapperFieldExtractor也需要名称 Map 到 getter 来创建对象数组。值得注意的是,名称的 Sequences 决定了数组中字段的 Sequences。

分隔文件写入示例

最基本的平面文件格式是其中所有字段都由定界符分隔的格式。这可以使用DelimitedLineAggregator完成。下面的示例写出一个简单的域对象,该对象代表 Client 帐户的贷方:

public class CustomerCredit {

    private int id;
    private String name;
    private BigDecimal credit;

    //getters and setters removed for clarity
}

由于正在使用域对象,因此必须提供FieldExtractor接口的实现以及要使用的分隔符,如以下示例所示:

XML Configuration

<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator">
        <bean class="org.spr...DelimitedLineAggregator">
            <property name="delimiter" value=","/>
            <property name="fieldExtractor">
                <bean class="org.spr...BeanWrapperFieldExtractor">
                    <property name="names" value="name,credit"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

Java Configuration

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
        BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
        fieldExtractor.setNames(new String[] {"name", "credit"});
        fieldExtractor.afterPropertiesSet();

        DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>();
        lineAggregator.setDelimiter(",");
        lineAggregator.setFieldExtractor(fieldExtractor);

        return new FlatFileItemWriterBuilder<CustomerCredit>()
                                .name("customerCreditWriter")
                                .resource(outputResource)
                                .lineAggregator(lineAggregator)
                                .build();
}

在前面的示例中,本章前面介绍的BeanWrapperFieldExtractor用于将CustomerCredit中的名称和贷方字段转换为对象数组,然后将其写成每个字段之间的逗号。

也可以使用FlatFileItemWriterBuilder.DelimitedBuilder自动创建BeanWrapperFieldExtractorDelimitedLineAggregator,如以下示例所示:

Java Configuration

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
        return new FlatFileItemWriterBuilder<CustomerCredit>()
                                .name("customerCreditWriter")
                                .resource(outputResource)
                                .delimited()
                                .delimiter("|")
                                .names(new String[] {"name", "credit"})
                                .build();
}
定宽文件写入示例

分隔不是平面文件格式的唯一类型。许多人更喜欢为每个列使用固定宽度来在字段之间划定轮廓,这通常称为“固定宽度”。 Spring Batch 通过FormatterLineAggregator支持此功能。使用上述相同的CustomerCredit域对象,可以将其配置如下:

XML Configuration

<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator">
        <bean class="org.spr...FormatterLineAggregator">
            <property name="fieldExtractor">
                <bean class="org.spr...BeanWrapperFieldExtractor">
                    <property name="names" value="name,credit" />
                </bean>
            </property>
            <property name="format" value="%-9s%-2.0f" />
        </bean>
    </property>
</bean>

Java Configuration

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
        BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
        fieldExtractor.setNames(new String[] {"name", "credit"});
        fieldExtractor.afterPropertiesSet();

        FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
        lineAggregator.setFormat("%-9s%-2.0f");
        lineAggregator.setFieldExtractor(fieldExtractor);

        return new FlatFileItemWriterBuilder<CustomerCredit>()
                                .name("customerCreditWriter")
                                .resource(outputResource)
                                .lineAggregator(lineAggregator)
                                .build();
}

前面的大多数示例应该 Watch 起来很熟悉。但是,format 属性的值是 new,并显示在以下元素中:

<property name="format" value="%-9s%-2.0f" />
...
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
...

使用与 Java 5 相同的Formatter构建底层实现。Java Formatter基于 C 编程语言的printf功能。有关如何配置格式化程序的大多数详细信息,请参见Formatter的 Javadoc。

也可以使用FlatFileItemWriterBuilder.FormattedBuilder自动创建BeanWrapperFieldExtractorFormatterLineAggregator,如以下示例所示:

Java Configuration

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
        return new FlatFileItemWriterBuilder<CustomerCredit>()
                                .name("customerCreditWriter")
                                .resource(outputResource)
                                .formatted()
                                .format("%-9s%-2.0f")
                                .names(new String[] {"name", "credit"})
                                .build();
}
处理文件创建

FlatFileItemReader与文件资源的关系非常简单。初始化 Reader 后,它将打开文件(如果存在),如果没有,则引发异常。文件写入并不是那么简单。乍一 Watch,似乎应该为FlatFileItemWriter构建类似的简单约定:如果文件已经存在,则引发异常,如果不存在,则创建它并开始写入。但是,潜在地重新启动Job可能会导致问题。在正常的重新启动方案中,Contract 被撤销:如果文件存在,则从最后一个已知的好的位置开始对其进行写入,如果不存在,则引发异常。但是,如果此作业的文件名始终相同会怎样?在这种情况下,您希望删除该文件(如果存在),除非重新启动。由于这种可能性,FlatFileItemWriter包含属性shouldDeleteIfExists。将此属性设置为 true 会导致在打开编写器时删除具有相同名称的现有文件。

1.7. XML Item 读取器和写入器

Spring Batch 提供了用于读取 XML 记录并将它们 Map 到 Java 对象以及将 Java 对象编写为 XML 记录的事务性基础结构。

Definition of Actuator

StAX API 用于 I/O,因为其他标准 XML 解析 API 不能满足批处理要求(DOM 一次将整个 Importing 加载到内存中,而 SAX 通过允许用户仅提供回调来控制解析过程)。

我们需要考虑在 Spring Batch 中 XML Importing 和输出如何工作。首先,有一些概念与文件读写不同,但在 Spring Batch XML 处理中很常见。使用 XML 处理时,代替需要标记的记录行(FieldSet个实例),假定 XML 资源是与各个记录相对应的“片段”的集合,如下图所示:

图 1. XML Importing

在上述方案中,“贸易”标签被定义为“根元素”。 ''和'</ trade>'之间的所有内容都被视为一个“片段”。 Spring Batch 使用对象/ XML Map(OXM)将片段绑定到对象。但是,Spring Batch 不与任何特定的 XML 绑定技术绑定。典型的用法是委托Spring OXM,它为最流行的 OXM 技术提供统一的抽象。对 Spring OXM 的依赖关系是可选的,如果需要,您可以选择实现特定于 Spring Batch 的接口。下图显示了与 OXM 支持的技术的关系:

图 2. OXM 绑定

通过对 OXM 的介绍以及如何使用 XML 片段表示记录,我们现在可以更仔细地检查 Reader 和 Writer。

1.7.1. StaxEventItemReader

StaxEventItemReader配置提供了用于处理 XML Importing 流中的记录的典型设置。首先,考虑StaxEventItemReader可以处理的以下 XML 记录集:

<?xml version="1.0" encoding="UTF-8"?>
<records>
    <trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0001</isin>
        <quantity>5</quantity>
        <price>11.39</price>
        <customer>Customer1</customer>
    </trade>
    <trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0002</isin>
        <quantity>2</quantity>
        <price>72.99</price>
        <customer>Customer2c</customer>
    </trade>
    <trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0003</isin>
        <quantity>9</quantity>
        <price>99.99</price>
        <customer>Customer3</customer>
    </trade>
</records>

为了能够处理 XML 记录,需要满足以下条件:

下面的示例显示如何定义与名为trade的根元素,org/springframework/batch/item/xml/domain/trades.xml的资源以及称为tradeMarshaller的解组器一起使用的StaxEventItemReader

XML Configuration

<bean id="itemReader" class="org.springframework.batch.item.xml.StaxEventItemReader">
    <property name="fragmentRootElementName" value="trade" />
    <property name="resource" value="org/springframework/batch/item/xml/domain/trades.xml" />
    <property name="unmarshaller" ref="tradeMarshaller" />
</bean>

Java Configuration

@Bean
public StaxEventItemReader itemReader() {
        return new StaxEventItemReaderBuilder<Trade>()
                        .name("itemReader")
                        .resource(new FileSystemResource("org/springframework/batch/item/xml/domain/trades.xml"))
                        .addFragmentRootElements("trade")
                        .unmarshaller(tradeMarshaller())
                        .build();

}

请注意,在此示例中,我们选择使用XStreamMarshaller,它接受作为 Map 传入的别名,其中第一个键和值是片段的名称(即根元素)以及要绑定的对象类型。然后,类似于FieldSet,Map 到对象类型内字段的其他元素的名称在 Map 中描述为键/值对。在配置文件中,我们可以使用 Spring 配置 Util 来描述所需的别名,如下所示:

XML Configuration

<bean id="tradeMarshaller"
      class="org.springframework.oxm.xstream.XStreamMarshaller">
    <property name="aliases">
        <util:map id="aliases">
            <entry key="trade"
                   value="org.springframework.batch.sample.domain.trade.Trade" />
            <entry key="price" value="java.math.BigDecimal" />
            <entry key="isin" value="java.lang.String" />
            <entry key="customer" value="java.lang.String" />
            <entry key="quantity" value="java.lang.Long" />
        </util:map>
    </property>
</bean>

Java Configuration

@Bean
public XStreamMarshaller tradeMarshaller() {
        Map<String, Class> aliases = new HashMap<>();
        aliases.put("trade", Trade.class);
        aliases.put("price", BigDecimal.class);
        aliases.put("isin", String.class);
        aliases.put("customer", String.class);
        aliases.put("quantity", Long.class);

        XStreamMarshaller marshaller = new XStreamMarshaller();

        marshaller.setAliases(aliases);

        return marshaller;
}

在 Importing 时,Reader 读取 XML 资源,直到它识别出一个新的片段即将开始。默认情况下,读取器将匹配元素名称以识别新片段即将开始。读取器从该片段创建一个独立的 XML 文档,并将该文档传递给解串器(通常是 Spring OXM Unmarshaller的包装器),以将 XML Map 到 Java 对象。

总之,此过程类似于以下 Java 代码,该代码使用 Spring 配置提供的注入:

StaxEventItemReader<Trade> xmlStaxEventItemReader = new StaxEventItemReader<>();
Resource resource = new ByteArrayResource(xmlResource.getBytes());

Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
XStreamMarshaller unmarshaller = new XStreamMarshaller();
unmarshaller.setAliases(aliases);
xmlStaxEventItemReader.setUnmarshaller(unmarshaller);
xmlStaxEventItemReader.setResource(resource);
xmlStaxEventItemReader.setFragmentRootElementName("trade");
xmlStaxEventItemReader.open(new ExecutionContext());

boolean hasNext = true;

Trade trade = null;

while (hasNext) {
    trade = xmlStaxEventItemReader.read();
    if (trade == null) {
        hasNext = false;
    }
    else {
        System.out.println(trade);
    }
}

1.7.2. StaxEventItemWriter

输出与 Importing 对称地工作。 StaxEventItemWriter需要Resource,编组和rootTagName。将 Java 对象传递到编组器(通常是标准的 Spring OXM Marshaller),编组器通过使用自定义事件编写器写入Resource来过滤 OXM 工具为每个片段生成的StartDocumentEndDocument事件。以下示例使用StaxEventItemWriter

XML Configuration

<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="marshaller" ref="tradeMarshaller" />
    <property name="rootTagName" value="trade" />
    <property name="overwriteOutput" value="true" />
</bean>

Java Configuration

@Bean
public StaxEventItemWriter itemWriter(Resource outputResource) {
        return new StaxEventItemWriterBuilder<Trade>()
                        .name("tradesWriter")
                        .marshaller(tradeMarshaller())
                        .resource(outputResource)
                        .rootTagName("trade")
                        .overwriteOutput(true)
                        .build();

}

前面的配置设置了三个必需的属性,并设置了可选的overwriteOutput=true属性,本章前面已经提到了该属性,用于指定是否可以覆盖现有文件。应当注意,以下示例中用于编写程序的编组器与本章前面的阅读示例中使用的编组器完全相同:

XML Configuration

<bean id="customerCreditMarshaller"
      class="org.springframework.oxm.xstream.XStreamMarshaller">
    <property name="aliases">
        <util:map id="aliases">
            <entry key="customer"
                   value="org.springframework.batch.sample.domain.trade.Trade" />
            <entry key="price" value="java.math.BigDecimal" />
            <entry key="isin" value="java.lang.String" />
            <entry key="customer" value="java.lang.String" />
            <entry key="quantity" value="java.lang.Long" />
        </util:map>
    </property>
</bean>

Java Configuration

@Bean
public XStreamMarshaller customerCreditMarshaller() {
        XStreamMarshaller marshaller = new XStreamMarshaller();

        Map<String, Class> aliases = new HashMap<>();
        aliases.put("trade", Trade.class);
        aliases.put("price", BigDecimal.class);
        aliases.put("isin", String.class);
        aliases.put("customer", String.class);
        aliases.put("quantity", Long.class);

        marshaller.setAliases(aliases);

        return marshaller;
}

总结一下 Java 示例,以下代码说明了所有讨论的要点,展示了所需属性的编程设置:

FileSystemResource resource = new FileSystemResource("data/outputFile.xml")

Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
Marshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

StaxEventItemWriter staxItemWriter =
        new StaxEventItemWriterBuilder<Trade>()
                                .name("tradesWriter")
                                .marshaller(marshaller)
                                .resource(resource)
                                .rootTagName("trade")
                                .overwriteOutput(true)
                                .build();

staxItemWriter.afterPropertiesSet();

ExecutionContext executionContext = new ExecutionContext();
staxItemWriter.open(executionContext);
Trade trade = new Trade();
trade.setPrice(11.39);
trade.setIsin("XYZ0001");
trade.setQuantity(5L);
trade.setCustomer("Customer1");
staxItemWriter.write(trade);

1.8. JSON Item 读取器和写入器

Spring Batch 提供了以下格式的读取和写入 JSON 资源的支持:

[
  {
    "isin": "123",
    "quantity": 1,
    "price": 1.2,
    "customer": "foo"
  },
  {
    "isin": "456",
    "quantity": 2,
    "price": 1.4,
    "customer": "bar"
  }
]

假定 JSON 资源是对应于各个 Item 的 JSON 对象数组。 Spring Batch 未绑定到任何特定的 JSON 库。

1.8.1. JsonItemReader

JsonItemReader将 JSON 解析和绑定委托给org.springframework.batch.item.json.JsonObjectReader接口的实现。该接口旨在通过使用流 API 读取大块的 JSON 对象来实现。当前提供了两种实现:

为了能够处理 JSON 记录,需要满足以下条件:

以下示例显示了如何定义与先前的 JSON 资源org/springframework/batch/item/json/trades.json一起使用的JsonItemReader和基于 Jackson 的JsonObjectReader

@Bean
public JsonItemReader<Trade> jsonItemReader() {
   return new JsonItemReaderBuilder<Trade>()
                 .jsonObjectReader(new JacksonJsonObjectReader<>(Trade.class))
                 .resource(new ClassPathResource("trades.json"))
                 .name("tradeJsonItemReader")
                 .build();
}

1.8.2. JsonFileItemWriter

JsonFileItemWriter将 Item 的编组委派到org.springframework.batch.item.json.JsonObjectMarshaller界面。该接口的约定是获取一个对象并将其编组为 JSON String。当前提供了两种实现:

为了能够写入 JSON 记录,需要满足以下条件:

以下示例显示了如何定义JsonFileItemWriter

@Bean
public JsonFileItemWriter<Trade> jsonFileItemWriter() {
   return new JsonFileItemWriterBuilder<Trade>()
                 .jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
                 .resource(new ClassPathResource("trades.json"))
                 .name("tradeJsonFileItemWriter")
                 .build();
}

1.9. 多文件 Importing

通常在单个Step中处理多个文件。假设文件格式相同,则MultiResourceItemReader支持 XML 和平面文件处理的这种类型的 Importing。考虑目录中的以下文件:

file-1.txt  file-2.txt  ignored.txt

file-1.txtfile-2.txt的格式相同,并且出于业务原因,应一起处理。可以使用通配符使用MultiResourceItemReader读取两个文件,如以下示例所示:

XML Configuration

<bean id="multiResourceReader" class="org.spr...MultiResourceItemReader">
    <property name="resources" value="classpath:data/input/file-*.txt" />
    <property name="delegate" ref="flatFileItemReader" />
</bean>

Java Configuration

@Bean
public MultiResourceItemReader multiResourceReader() {
        return new MultiResourceItemReaderBuilder<Foo>()
                                        .delegate(flatFileItemReader())
                                        .resources(resources())
                                        .build();
}

引用的委托是简单的FlatFileItemReader。上面的配置从两个文件读取 Importing,处理回滚和重新启动方案。应当注意,与任何ItemReader一样,添加额外的 Importing(在这种情况下为文件)可能会在重新启动时引起潜在的问题。建议批处理作业使用其各自的目录,直到成功完成为止。

Note

通过使用MultiResourceItemReader#setComparator(Comparator)对 Importing 资源进行排序,以确保在重新启动方案中在作业运行之间保留资源排序。

1.10. Database

像大多数企业应用程序样式一样,数据库是批处理的中央存储机制。但是,批处理与其他应用程序样式不同,这是由于系统必须使用的数据集的绝对大小。如果 SQL 语句返回 100 万行,则结果集可能将所有返回的结果保存在内存中,直到读取了所有行。 Spring Batch 针对此问题提供了两种类型的解决方案:

1.10.1. 基于游标的 ItemReader 实现

通常,使用数据库游标是大多数批处理开发人员的默认方法,因为它是数据库解决“流式”关系数据问题的方法。 Java ResultSet类实质上是一种用于操纵游标的面向对象的机制。 ResultSet将光标保留到当前数据行。在ResultSet上调用next会将光标移到下一行。基于 Spring Batch 游标的ItemReader实现会在初始化时打开游标,并将对read的每次调用都将游标向前移动一行,返回可用于处理的 Map 对象。然后调用close方法以确保释放所有资源。 Spring 核心JdbcTemplate通过使用回调模式完全 MapResultSet中的所有行并在将控制权返回给方法调用者之前关闭来解决此问题。但是,必须成批处理,直到步骤完成。下图显示了基于游标的ItemReader的工作原理图。请注意,尽管示例使用 SQL(因为 SQL 众所周知),但是任何技术都可以实现基本方法。

图 3.游标示例

本示例说明了基本模式。给定一个'FOO'表,该表具有三列:IDNAMEBAR,选择 ID 大于 1 但小于 7 的所有行。这会将光标的开始(行 1)放在 ID 2 上。结果该行的Foo应该完全 Map。再次调用read()会将光标移动到 ID 为 3 的下一行,即Foo。这些读取的结果将在每个read之后写出,从而可以对对象进行垃圾回收(假定没有实例变量维护对其的引用) )。

JdbcCursorItemReader

JdbcCursorItemReader是基于游标的技术的 JDBC 实现。它直接与ResultSet一起使用,并且需要 SQL 语句针对从DataSource获得的连接运行。以以下数据库模式为例:

CREATE TABLE CUSTOMER (
   ID BIGINT IDENTITY PRIMARY KEY,
   NAME VARCHAR(45),
   CREDIT FLOAT
);

许多人更喜欢为每行使用一个域对象,因此以下示例使用RowMapper接口的实现来 MapCustomerCredit对象:

public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {

    public static final String ID_COLUMN = "id";
    public static final String NAME_COLUMN = "name";
    public static final String CREDIT_COLUMN = "credit";

    public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
        CustomerCredit customerCredit = new CustomerCredit();

        customerCredit.setId(rs.getInt(ID_COLUMN));
        customerCredit.setName(rs.getString(NAME_COLUMN));
        customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));

        return customerCredit;
    }
}

因为JdbcCursorItemReaderJdbcTemplate共享关键接口,所以 Watch 一个示例如何使用JdbcTemplate读取此数据以与ItemReader进行对比很有用。就本示例而言,假设CUSTOMER数据库中有 1,000 行。第一个示例使用JdbcTemplate

//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
                                          new CustomerCreditRowMapper());

运行前面的代码段后,customerCredits列表包含 1,000 个CustomerCredit对象。在查询方法中,从DataSource获得连接,对它运行提供的 SQL,并为ResultSet中的每一行调用mapRow方法。将此与JdbcCursorItemReader的方法进行对比,如下面的示例所示:

JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close();

运行前面的代码段后,计数器等于 1,000.如果上面的代码已将返回的customerCredit放入列表,则结果将与JdbcTemplate示例完全相同。但是,ItemReader的最大优点是它允许“流式传输”Item。可以一次调用read方法,可以用ItemWriter写出该 Item,然后可以使用read获得下一个 Item。这样就可以在“块”中进行 Item 读取和写入,并定期进行,这是高性能批处理的本质。此外,它很容易配置为注入到 Spring Batch Step中,如以下示例所示:

XML Configuration

<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

Java Configuration

@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
        return new JdbcCursorItemReaderBuilder<CustomerCredit>()
                        .dataSource(this.dataSource)
                        .name("creditReader")
                        .sql("select ID, NAME, CREDIT from CUSTOMER")
                        .rowMapper(new CustomerCreditRowMapper())
                        .build();

}
Additional Properties

由于在 Java 中有很多用于打开游标的选项,因此JdbcCursorItemReader上可以设置许多属性,如下表所示:

表 2. JdbcCursorItemReader 属性

ignoreWarnings 确定是否记录 SQLWarnings 或导致异常。默认值为true(表示已记录警告)。
fetchSize ItemReader使用的ResultSet对象需要更多行时,为 JDBC 驱动程序提供提示,提示应从数据库中获取的行数。默认情况下,不给出任何提示。
maxRows 设置基础ResultSet一次可以容纳的最大行数限制。
queryTimeout 设置驱动程序 awaitStatement对象运行的秒数。如果超出限制,则抛出DataAccessException。 (有关详细信息,请咨询您的驱动程序供应商文档)。
verifyCursorPosition 由于ItemReader持有的ResultSet被传递给RowMapper,因此用户可以自己调用ResultSet.next(),这可能会导致读取器的内部计数出现问题。将此值设置为true会导致抛出异常,如果RowMapper调用之后的游标位置与之前不同。
saveState 指示是否将 Reader 的状态保存在ItemStream#update(ExecutionContext)提供的ExecutionContext中。默认值为true
driverSupportsAbsolute 指示 JDBC 驱动程序是否支持在ResultSet上设置绝对行。对于支持ResultSet.absolute()的 JDBC 驱动程序,建议将其设置为true,因为它可以提高性能,特别是如果在处理大型数据集时某个步骤失败时。默认为false
setUseSharedExtendedConnection 指示用于游标的连接是否应由所有其他处理使用,从而共享同一事务。如果将其设置为false,那么游标将以其自己的连接打开,并且不参与其余步骤处理中启动的任何事务。如果将此标志设置为true,则必须将数据源包装在ExtendedConnectionDataSourceProxy中,以防止每次提交后关闭和释放连接。当将此选项设置为true时,将同时使用'READ_ONLY'和'HOLD_CURSORS_OVER_COMMIT'选项创建用于打开游标的语句。这样就可以使游标在事务开始时保持打开状态,并在步骤处理中执行提交。要使用此功能,您需要一个支持此功能的数据库和一个支持 JDBC 3.0 或更高版本的 JDBC 驱动程序。默认为false
HibernateCursorItemReader

就像普通的 Spring 用户在决定是否使用 ORM 解决方案(这会影响他们使用JdbcTemplate还是HibernateTemplate)上做出重要决定一样,Spring Batch 用户也具有相同的选择。 HibernateCursorItemReader是光标技术的 Hibernate 实现。 Hibernate 的批量使用方式一直存在争议。这主要是因为 Hibernate 最初是为支持在线应用程序样式而开发的。但是,这并不意味着它不能用于批处理。解决此问题的最简单方法是使用StatelessSession而不是标准会话。这消除了 Hibernate 使用的所有缓存和脏检查,并且可能在批处理方案中引起问题。有关 Stateless 和正常休眠会话之间差异的更多信息,请参阅特定休眠版本的文档。 HibernateCursorItemReader允许您声明 HQL 语句并传递SessionFactory,这将使每个调用返回一个 Item,以与JdbcCursorItemReader相同的基本方式进行读取。以下示例配置使用与 JDBC Reader 相同的“Client 信用”示例:

HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close();

假设已为Customer表正确创建了休眠 Map 文件,此配置的ItemReader以与JdbcCursorItemReader所述完全相同的方式返回CustomerCredit个对象。 “ useStatelessSession”属性默认为 true,但已在此处添加,以引起人们注意打开或关闭该属性的能力。还值得注意的是,可以通过setFetchSize属性设置基础游标的获取大小。与JdbcCursorItemReader一样,配置非常简单,如以下示例所示:

XML Configuration

<bean id="itemReader"
      class="org.springframework.batch.item.database.HibernateCursorItemReader">
    <property name="sessionFactory" ref="sessionFactory" />
    <property name="queryString" value="from CustomerCredit" />
</bean>

Java Configuration

@Bean
public HibernateCursorItemReader itemReader(SessionFactory sessionFactory) {
        return new HibernateCursorItemReaderBuilder<CustomerCredit>()
                        .name("creditReader")
                        .sessionFactory(sessionFactory)
                        .queryString("from CustomerCredit")
                        .build();
}
StoredProcedureItemReader

有时有必要通过使用存储过程来获取游标数据。 StoredProcedureItemReader的工作方式类似于JdbcCursorItemReader,不同之处在于,它运行查询返回游标的存储过程,而不是运行查询以获取游标。存储过程可以通过三种不同的方式返回游标:

以下示例配置使用与先前示例相同的“Client 信用”示例:

XML Configuration

<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

Java Configuration

@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
        StoredProcedureItemReader reader = new StoredProcedureItemReader();

        reader.setDataSource(dataSource);
        reader.setProcedureName("sp_customer_credit");
        reader.setRowMapper(new CustomerCreditRowMapper());

        return reader;
}

前面的示例依赖于存储过程来提供ResultSet作为返回结果(先前的选项 1)。

如果存储过程返回了ref-cursor(选项 2),则我们需要提供 out 参数的位置,即返回的ref-cursor。以下示例显示如何将第一个参数用作 ref 游标:

XML Configuration

<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

Java Configuration

@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
        StoredProcedureItemReader reader = new StoredProcedureItemReader();

        reader.setDataSource(dataSource);
        reader.setProcedureName("sp_customer_credit");
        reader.setRowMapper(new CustomerCreditRowMapper());
        reader.setRefCursorPosition(1);

        return reader;
}

如果游标是从存储的函数(选项 3)返回的,则需要将属性“ function”设置为true。默认为false。以下示例显示了如下内容:

XML Configuration

<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="function" value="true"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

Java Configuration

@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
        StoredProcedureItemReader reader = new StoredProcedureItemReader();

        reader.setDataSource(dataSource);
        reader.setProcedureName("sp_customer_credit");
        reader.setRowMapper(new CustomerCreditRowMapper());
        reader.setFunction(true);

        return reader;
}

在所有这些情况下,我们都需要定义一个RowMapper以及一个DataSource以及实际的过程名称。

如果存储过程或函数接受参数,则必须通过parameters属性对其进行声明和设置。对于 Oracle,以下示例声明了三个参数。第一个是返回参量的 out 参数,第二个和第三个是采用INTEGER类型值的 in 参数。

XML Configuration

<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="spring.cursor_func"/>
    <property name="parameters">
        <list>
            <bean class="org.springframework.jdbc.core.SqlOutParameter">
                <constructor-arg index="0" value="newid"/>
                <constructor-arg index="1">
                    <util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="amount"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="custid"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
        </list>
    </property>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper" ref="rowMapper"/>
    <property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>

Java Configuration

@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
        List<SqlParameter> parameters = new ArrayList<>();
        parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
        parameters.add(new SqlParameter("amount", Types.INTEGER);
        parameters.add(new SqlParameter("custId", Types.INTEGER);

        StoredProcedureItemReader reader = new StoredProcedureItemReader();

        reader.setDataSource(dataSource);
        reader.setProcedureName("spring.cursor_func");
        reader.setParameters(parameters);
        reader.setRefCursorPosition(1);
        reader.setRowMapper(rowMapper());
        reader.setPreparedStatementSetter(parameterSetter());

        return reader;
}

除了参数声明之外,我们还需要指定PreparedStatementSetter实现,该实现为调用设置参数值。这与上面的JdbcCursorItemReader相同。 Additional Properties中列出的所有其他属性也适用于StoredProcedureItemReader

1.10.2. 分页 ItemReader 实现

使用数据库游标的一种替代方法是运行多个查询,其中每个查询获取一部分结果。我们将此部分称为页面。每个查询必须指定起始行号和我们要在页面中返回的行数。

JdbcPagingItemReader

寻呼ItemReader的一种实现是JdbcPagingItemReaderJdbcPagingItemReader需要一个PagingQueryProvider负责提供用于检索组成页面的行的 SQL 查询。由于每个数据库都有其提供分页支持的策略,因此我们需要为每种受支持的数据库类型使用不同的PagingQueryProvider。还有SqlPagingQueryProviderFactoryBean自动检测正在使用的数据库并确定适当的PagingQueryProvider实现。这简化了配置,是推荐的最佳实践。

SqlPagingQueryProviderFactoryBean要求您指定select子句和from子句。您还可以提供一个可选的where子句。这些子句和必需的sortKey用于构建 SQL 语句。

Note

重要的是在sortKey上具有唯一的键约束,以确保两次执行之间不会丢失任何数据。

打开 Reader 后,它会按照每次调用ItemReader的基本方式,将每次调用将一项返回给read。当需要其他行时,分页将在幕后进行。

以下示例配置使用与前面显示的基于光标的ItemReaders类似的“Client 信用”示例:

XML Configuration

<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="queryProvider">
        <bean class="org.spr...SqlPagingQueryProviderFactoryBean">
            <property name="selectClause" value="select id, name, credit"/>
            <property name="fromClause" value="from customer"/>
            <property name="whereClause" value="where status=:status"/>
            <property name="sortKey" value="id"/>
        </bean>
    </property>
    <property name="parameterValues">
        <map>
            <entry key="status" value="NEW"/>
        </map>
    </property>
    <property name="pageSize" value="1000"/>
    <property name="rowMapper" ref="customerMapper"/>
</bean>

Java Configuration

@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("status", "NEW");

        return new JdbcPagingItemReaderBuilder<CustomerCredit>()
                                           .name("creditReader")
                                           .dataSource(dataSource)
                                           .queryProvider(queryProvider)
                                           .parameterValues(parameterValues)
                                           .rowMapper(customerCreditMapper())
                                           .pageSize(1000)
                                           .build();
}

@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

        provider.setSelectClause("select id, name, credit");
        provider.setFromClause("from customer");
        provider.setWhereClause("where status=:status");
        provider.setSortKey("id");

        return provider;
}

此配置的ItemReader使用RowMapper返回CustomerCredit对象,必须指定该对象。 “ pageSize”属性确定每个查询运行从数据库读取的实体数。

'parameterValues'属性可用于为查询指定参数值的Map。如果在where子句中使用命名参数,则每个条目的键应与命名参数的名称匹配。如果使用传统的“?”占位符,则每个条目的键应为占位符的编号,从 1 开始。

JpaPagingItemReader

寻呼ItemReader的另一种实现是JpaPagingItemReader。 JPA 没有类似于 Hibernate StatelessSession的概念,因此我们必须使用 JPA 规范提供的其他功能。由于 JPA 支持分页,因此在使用 JPA 进行批处理时,这是自然的选择。读取每个页面后,实体将分离,并清除持久性上下文,以允许在处理页面后对实体进行垃圾回收。

JpaPagingItemReader可让您声明 JPQL 语句并传递EntityManagerFactory。然后,它每次调用都传回一项内容,以与其他任何ItemReader相同的基本方式进行读取。当需要其他实体时,分页发生在幕后。以下示例配置使用与前面显示的 JDBC Reader 相同的“Client 信用”示例:

XML Configuration

<bean id="itemReader" class="org.spr...JpaPagingItemReader">
    <property name="entityManagerFactory" ref="entityManagerFactory"/>
    <property name="queryString" value="select c from CustomerCredit c"/>
    <property name="pageSize" value="1000"/>
</bean>

Java Configuration

@Bean
public JpaPagingItemReader itemReader() {
        return new JpaPagingItemReaderBuilder<CustomerCredit>()
                                           .name("creditReader")
                                           .entityManagerFactory(entityManagerFactory())
                                           .queryString("select c from CustomerCredit c")
                                           .pageSize(1000)
                                           .build();
}

假设CustomerCredit对象具有正确的 JPA 注解或 ORM Map 文件,则此配置的ItemReader以与上述JdbcPagingItemReader所述完全相同的方式返回CustomerCredit个对象。 “ pageSize”属性确定每次查询执行时从数据库读取的实体数。

1.10.3. 数据库 ItemWriters

虽然平面文件和 XML 文件都具有特定的ItemWriter实例,但是在数据库世界中没有确切的实例。这是因为事务提供了所有必需的功能。 ItemWriter实现对于文件来说是必需的,因为它们必须表现得像是事务性的,并跟踪书面 Item 并在适当的时候进行刷新或清除。数据库不需要此功能,因为写入已包含在事务中。用户可以创建自己的 DAO,以实现ItemWriter接口,也可以使用自定义ItemWriter中的一个 DAO 来处理一般的处理问题。无论哪种方式,它们都应该正常工作。需要注意的一件事是批处理输出所提供的性能和错误处理功能。这在使用 hibernate 作为ItemWriter时最常见,但是在使用 JDBC 批处理模式时可能会有相同的问题。假设我们要小心刷新并且数据中没有错误,则批处理数据库的输出没有任何固有的缺陷。但是,写入时发生的任何错误都可能引起混乱,因为无法知道是哪一个单独的 Item 导致了异常,甚至没有任何一个单独的 Item 负责,如下图所示:

图 4.冲洗错误

如果在写入之前对 Item 进行了缓冲,则在提交之前刷新缓冲区之前,不会引发任何错误。例如,假设每个块写入 20 个 Item,而第 15 个 Item 则抛出DataIntegrityViolationException。就Step而言,所有 20 个 Item 均已成功写入,因为在实际写入之前无法知道发生错误。 Session#flush()被调用后,缓冲区将被清空,并会触发异常。此时,Step无能为力。事务必须回滚。通常,此异常可能会导致 Item 被跳过(取决于跳过/重试策略),然后不会再次写入。但是,在批处理方案中,无法知道是哪个 Item 引起了问题。发生故障时将写入整个缓冲区。解决此问题的唯一方法是在每个 Item 之后冲洗,如下图所示:

图 5.写入错误

这是一个常见的用例,尤其是在使用 Hibernate 时,实现ItemWriter的简单准则是在每次对write()的调用时刷新。这样做可以使 Item 可靠地被跳过,Spring Batch 在内部负责处理发生错误后对ItemWriter的调用的粒度。

1.11. 重用现有服务

批处理系统通常与其他应用程序样式结合使用。最常见的是在线系统,但它也可以通过移动每种应用程序样式使用的必要批量数据来支持集成甚至胖 Client 端应用程序。因此,许多用户通常都想在其批处理作业中重用现有的 DAO 或其他服务。通过允许注入任何必需的类,Spring 容器本身使此操作相当容易。但是,在某些情况下,现有服务需要充当ItemReaderItemWriter,以满足另一个 Spring Batch 类的依赖关系,或者因为它确实是某个步骤的主要ItemReader。为每个需要包装的服务编写一个适配器类很简单,但是由于这是一个普遍的问题,Spring Batch 提供了ItemReaderAdapterItemWriterAdapter的实现。这两个类都通过调用委托模式来实现标准的 Spring 方法,并且设置起来非常简单。以下示例使用ItemReaderAdapter

XML Configuration

<bean id="itemReader" class="org.springframework.batch.item.adapter.ItemReaderAdapter">
    <property name="targetObject" ref="fooService" />
    <property name="targetMethod" value="generateFoo" />
</bean>

<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />

Java Configuration

@Bean
public ItemReaderAdapter itemReader() {
        ItemReaderAdapter reader = new ItemReaderAdapter();

        reader.setTargetObject(fooService());
        reader.setTargetMethod("generateFoo");

        return reader;
}

@Bean
public FooService fooService() {
        return new FooService();
}

需要注意的重要一点是targetMethod的 Contract 必须与read的 Contract 相同:用尽后,它将返回null。否则,它返回Object。根据ItemWriter的实现,其他任何因素都会阻止框架知道处理应何时结束,从而导致无限循环或错误失败。以下示例使用ItemWriterAdapter

XML Configuration

<bean id="itemWriter" class="org.springframework.batch.item.adapter.ItemWriterAdapter">
    <property name="targetObject" ref="fooService" />
    <property name="targetMethod" value="processFoo" />
</bean>

<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />

Java Configuration

@Bean
public ItemWriterAdapter itemWriter() {
        ItemWriterAdapter writer = new ItemWriterAdapter();

        writer.setTargetObject(fooService());
        writer.setTargetMethod("processFoo");

        return writer;
}

@Bean
public FooService fooService() {
        return new FooService();
}

1.12. 验证 Importing

在本章的过程中,讨论了多种解析 Importing 的方法。如果每个主要实现的格式都不正确,则将引发异常。如果缺少一系列数据,FixedLengthTokenizer将引发异常。同样,尝试访问RowMapperFieldSetMapper的索引不存在或格式与预期的索引不同,则将引发异常。所有这些类型的异常都在read返回之前引发。但是,它们不能解决退回商品是否有效的问题。例如,如果字段之一是年龄,则显然不能为负。它可以正确解析,因为它存在并且是一个数字,但是不会引起异常。由于已经有大量的验证框架,因此 Spring Batch 不会尝试提供其他验证框架。而是,它提供了一个名为Validator的简单接口,可以通过任意数量的框架来实现,如以下接口定义所示:

public interface Validator<T> {

    void validate(T value) throws ValidationException;

}

约定是,如果对象无效,则validate方法将引发异常,如果有效,则正常返回。 Spring Batch 提供了开箱即用的ValidatingItemProcessor,如以下 bean 定义所示:

XML Configuration

<bean class="org.springframework.batch.item.validator.ValidatingItemProcessor">
    <property name="validator" ref="validator" />
</bean>

<bean id="validator" class="org.springframework.batch.item.validator.SpringValidator">
        <property name="validator">
                <bean class="org.springframework.batch.sample.domain.trade.internal.validator.TradeValidator"/>
        </property>
</bean>

Java Configuration

@Bean
public ValidatingItemProcessor itemProcessor() {
        ValidatingItemProcessor processor = new ValidatingItemProcessor();

        processor.setValidator(validator());

        return processor;
}

@Bean
public SpringValidator validator() {
        SpringValidator validator = new SpringValidator();

        validator.setValidator(new TradeValidator());

        return validator;
}

您还可以使用BeanValidatingItemProcessor来验证使用 Bean Validation API(JSR-303)注解进行注解的 Item。例如,给定以下类型Person

class Person {

    @NotEmpty
    private String name;

    public Person(String name) {
     this.name = name;
    }

    public String getName() {
     return name;
    }

    public void setName(String name) {
     this.name = name;
    }

}

您可以通过在应用程序上下文中声明`` bean 来验证 Item,并在面向块的步骤中将其注册为处理器:

@Bean
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
    BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
    beanValidatingItemProcessor.setFilter(true);

    return beanValidatingItemProcessor;
}

1.13. 防止国家持续存在

默认情况下,所有ItemReaderItemWriter实现都在提交前将其当前状态存储在ExecutionContext中。但是,这可能并不总是所需的行为。例如,许多开发人员选择使用过程指示器来使其数据库读取器“可重新运行”。在 Importing 数据中添加了一个额外的列,以指示是否已对其进行处理。当读取(或写入)特定记录时,已处理标志从false翻转到true。然后,SQL 语句可以在where子句中包含一个额外的语句,例如where PROCESSED_IND = false,从而确保在重新启动的情况下仅返回未处理的记录。在这种情况下,最好不要存储任何状态,例如当前行号,因为它与重新启动无关。因此,所有读取器和写入器都包含'saveState'属性,如以下示例所示:

XML Configuration

<bean id="playerSummarizationSource" class="org.spr...JdbcCursorItemReader">
    <property name="dataSource" ref="dataSource" />
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.PlayerSummaryMapper" />
    </property>
    <property name="saveState" value="false" />
    <property name="sql">
        <value>
            SELECT games.player_id, games.year_no, SUM(COMPLETES),
            SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),
            SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),
            SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)
            from games, players where players.player_id =
            games.player_id group by games.player_id, games.year_no
        </value>
    </property>
</bean>

Java Configuration

@Bean
public JdbcCursorItemReader playerSummarizationSource(DataSource dataSource) {
        return new JdbcCursorItemReaderBuilder<PlayerSummary>()
                                .dataSource(dataSource)
                                .rowMapper(new PlayerSummaryMapper())
                                .saveState(false)
                                .sql("SELECT games.player_id, games.year_no, SUM(COMPLETES),"
                                  + "SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),"
                                  + "SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),"
                                  + "SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)"
                                  + "from games, players where players.player_id ="
                                  + "games.player_id group by games.player_id, games.year_no")
                                .build();

}

上面配置的ItemReader不会对其参与的执行在ExecutionContext中进行任何 Importing。

1.14. 创建自定义 ItemReaders 和 ItemWriters

到目前为止,本章已经讨论了 Spring Batch 中读写的基本契约以及这样做的一些常见实现。但是,这些都是相当通用的,开箱即用的实现可能无法涵盖许多潜在的场景。本节通过一个简单的示例说明如何创建自定义ItemReaderItemWriter实现并正确实现其 Contract。 ItemReader还实现了ItemStream,以说明如何使读取器或写入器可重启。

1.14.1. 自定义 ItemReader 示例

就此示例而言,我们创建一个简单的ItemReader实现,该实现从提供的列表中读取。我们首先实现ItemReaderread方法的最基本协定,如以下代码所示:

public class CustomItemReader<T> implements ItemReader<T>{

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NonTransientResourceException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

上一类获取 Item 列表,然后一次返回一个,将其从列表中删除。当列表为空时,它将返回null,从而满足ItemReader的最基本要求,如以下测试代码所示:

List<String> items = new ArrayList<String>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<String>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
使 ItemReader 可重新启动

最后的挑战是使ItemReader可重新启动。当前,如果处理中断并再次开始,则ItemReader必须从头开始。这实际上在许多情况下都是有效的,但有时最好将批处理作业从中断处重新开始。关键的区别通常是 Reader 是有状态的还是 Stateless 的。Stateless 读取器无需担心可重新启动性,但是有状态读取器必须尝试在重新启动时重新构造其最后一个已知状态。因此,我们建议您尽可能使自定义 Reader 保持 Stateless,因此您不必担心可重新启动性。

如果确实需要存储状态,则应使用ItemStream接口:

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException, NonTransientResourceException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }

        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if(executionContext.containsKey(CURRENT_INDEX)){
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else{
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

在每次调用ItemStream update方法时,ItemReader的当前索引都使用键“ current.index”存储在提供的ExecutionContext中。调用ItemStream open方法时,将检查ExecutionContext以查 Watch 其是否包含具有该键的条目。如果找到该键,则当前索引将移动到该位置。这是一个非常琐碎的示例,但仍然符合一般约定:

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<String>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<String>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

大多数ItemReaders具有更复杂的重启逻辑。例如,JdbcCursorItemReader将最后处理的行的行 ID 存储在游标中。

还值得注意的是,ExecutionContext中使用的密钥不应太小。这是因为Step中的所有ItemStreams使用相同的ExecutionContext。在大多数情况下,只需在键之前加上类名就足以保证唯一性。但是,在极少数情况下,在同一步骤中使用了两个相同类型的ItemStream(如果需要两个文件进行输出,可能会发生这种情况),则需要一个更唯一的名称。因此,许多 Spring Batch ItemReaderItemWriter实现都具有setName()属性,该属性可以覆盖此键名。

1.14.2. 自定义 ItemWriter 示例

实现自定义ItemWriter的方式与上述ItemReader的示例在很多方面相似,但是在足以保证其自己的示例方面的差异也很大。但是,添加可重新启动性本质上是相同的,因此在本示例中未涉及。与ItemReader示例一样,使用List是为了使示例尽可能简单:

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(List<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}
使 ItemWriter 可重新启动

为了使ItemWriter可重新启动,我们将遵循与ItemReader相同的过程,添加并实现ItemStream接口以同步执行上下文。在示例中,我们可能必须计算已处理的 Item 数,并将其添加为页脚记录。如果需要这样做,则可以在ItemWriter中实现ItemStream,以便在重新打开流时从执行上下文重构计数器。

在许多实际情况下,自定义ItemWriters还会委派给另一个本身可重新启动的编写器(例如,当写入文件时),或者它写入事务性资源,因此不需要重新启动,因为它是 Stateless 的。当您有状态 Writer 时,您可能应该确保同时实现_和ItemWriter。还请记住,Writer 的 Client 端需要注意ItemStream,因此您可能需要将其注册为流,并在配置中。

1.15. Item 读取器和写入器实现

在本节中,我们将向您介绍之前各节中未讨论的 Reader 和 Writer。

1.15.1. Decorators

在某些情况下,用户需要将特定的行为附加到预先存在的ItemReader。 Spring Batch 提供了一些现成的装饰器,它们可以为ItemReaderItemWriter实现添加其他行为。

Spring Batch 包含以下装饰器:

SynchronizedItemStreamReader

当使用不是线程安全的ItemReader时,Spring Batch 提供了SynchronizedItemStreamReader装饰器,可用于使ItemReader线程安全。 Spring Batch 提供了SynchronizedItemStreamReaderBuilder来构造SynchronizedItemStreamReader的实例。

SingleItemPeekableItemReader

Spring Batch 包含一个装饰器,该装饰器向ItemReader添加了 peek 方法。这种窥视方法使用户可以窥视前面的一项。重复调用 peek 会返回相同的 Item,这是从read方法返回的下一个 Item。 Spring Batch 提供了SingleItemPeekableItemReaderBuilder来构造SingleItemPeekableItemReader的实例。

Note

SingleItemPeekableItemReader 的 peek 方法不是线程安全的,因为不可能在多个线程中使用 peek。偷 Watch 的线程中只有一个会在下一次调用 read 中得到该 Item。

MultiResourceItemWriter

当当前资源中写入的 Item 数超过itemCountLimitPerResource时,MultiResourceItemWriter包装ResourceAwareItemWriterItemStream并创建一个新的输出资源。 Spring Batch 提供了MultiResourceItemWriterBuilder来构造MultiResourceItemWriter的实例。

ClassifierCompositeItemWriter

ClassifierCompositeItemWriter根据通过提供的Classifier实现的 Router 模式,为每个 Item 调用ItemWriter实现的集合之一。如果所有委托都是线程安全的,则实现是线程安全的。 Spring Batch 提供了ClassifierCompositeItemWriterBuilder来构造ClassifierCompositeItemWriter的实例。

ClassifierCompositeItemProcessor

ClassifierCompositeItemProcessorItemProcessor,它基于通过提供的Classifier实现的 Router 模式调用ItemProcessor实现的集合之一。 Spring Batch 提供了ClassifierCompositeItemProcessorBuilder来构造ClassifierCompositeItemProcessor的实例。

1.15.2. 消息 Reader 和 Writer

Spring Batch 为常用消息传递系统提供以下读取器和写入器:

AmqpItemReader

AmqpItemReader是使用AmqpTemplate来接收或转换来自交换机的消息的ItemReader。 Spring Batch 提供了AmqpItemReaderBuilder来构造AmqpItemReader的实例。

AmqpItemWriter

AmqpItemWriter是使用AmqpTemplate将消息发送到 AMQP 交换机的ItemWriter。如果未在提供的AmqpTemplate中指定名称,则消息将发送到无名交换机。 Spring Batch 提供了AmqpItemWriterBuilder来构造AmqpItemWriter的实例。

JmsItemReader

JmsItemReader是使用JmsTemplate的 JMS 的ItemReader。模板应具有默认目标,该目标用于提供read()方法的 Item。 Spring Batch 提供了JmsItemReaderBuilder来构造JmsItemReader的实例。

JmsItemWriter

JmsItemWriter是使用JmsTemplate的 JMS 的ItemWriter。模板应具有默认目的地,该目的地用于发送write(List)中的 Item。 Spring Batch 提供了JmsItemWriterBuilder来构造JmsItemWriter的实例。

1.15.3. 数据库 Reader

Spring Batch 提供以下数据库 Reader:

Neo4jItemReader

Neo4jItemReaderItemReader,它通过使用分页技术从图形数据库 Neo4j 中读取对象。 Spring Batch 提供了Neo4jItemReaderBuilder来构造Neo4jItemReader的实例。

MongoItemReader

MongoItemReaderItemReader,它通过使用分页技术从 MongoDB 中读取文档。 Spring Batch 提供了MongoItemReaderBuilder来构造MongoItemReader的实例。

HibernateCursorItemReader

HibernateCursorItemReaderItemStreamReader,用于读取基于 Hibernate 构建的数据库记录。它执行 HQL 查询,然后在初始化时在调用read()方法时对结果集进行迭代,依次返回与当前行相对应的对象。 Spring Batch 提供了HibernateCursorItemReaderBuilder来构造HibernateCursorItemReader的实例。

HibernatePagingItemReader

HibernatePagingItemReaderItemReader,用于读取构建在 Hibernate 之上的数据库记录,并且一次最多只能读取固定数量的 Item。 Spring Batch 提供了HibernatePagingItemReaderBuilder来构造HibernatePagingItemReader的实例。

RepositoryItemReader

RepositoryItemReader是使用PagingAndSortingRepository读取记录的ItemReader。 Spring Batch 提供了RepositoryItemReaderBuilder来构造RepositoryItemReader的实例。

1.15.4. 数据库 Writer

Spring Batch 提供以下数据库编写器:

Neo4jItemWriter

Neo4jItemWriterItemWriter实现,可写入 Neo4j 数据库。 Spring Batch 提供了Neo4jItemWriterBuilder来构造Neo4jItemWriter的实例。

MongoItemWriter

MongoItemWriterItemWriter实现,它使用 Spring Data 的MongoOperations实现写入 MongoDB 存储。 Spring Batch 提供了MongoItemWriterBuilder来构造MongoItemWriter的实例。

RepositoryItemWriter

RepositoryItemWriter是 Spring Data 中CrudRepositoryItemWriter包装器。 Spring Batch 提供了RepositoryItemWriterBuilder来构造RepositoryItemWriter的实例。

HibernateItemWriter

HibernateItemWriterItemWriter,它使用 Hibernate 会话保存或更新不属于当前 Hibernate 会话的实体。 Spring Batch 提供了HibernateItemWriterBuilder来构造HibernateItemWriter的实例。

JdbcBatchItemWriter

JdbcBatchItemWriterItemWriter,它使用NamedParameterJdbcTemplate中的批处理功能为提供的所有 Item 执行一批语句。 Spring Batch 提供了JdbcBatchItemWriterBuilder来构造JdbcBatchItemWriter的实例。

JpaItemWriter

JpaItemWriter是使用 JPA EntityManagerFactory合并不属于持久性上下文一部分的任何实体的ItemWriter。 Spring Batch 提供了JpaItemWriterBuilder来构造JpaItemWriter的实例。

GemfireItemWriter

GemfireItemWriter是使用GemfireTemplate并将GemfireTemplate存储在 GemFire 中作为键/值对的ItemWriter。 Spring Batch 提供了GemfireItemWriterBuilder来构造GemfireItemWriter的实例。

1.15.5. 专业 Reader

Spring Batch 提供以下专业 Reader:

LdifReader

LdifReaderResource读取 LDIF(LDAP 数据交换格式)记录,对其进行解析,然后为每个执行的read返回LdapAttribute对象。 Spring Batch 提供了LdifReaderBuilder来构造LdifReader的实例。

MappingLdifReader

MappingLdifReaderResource读取 LDIF(LDAP 数据交换格式)记录,对其进行解析,然后将每个 LDIF 记录 Map 到 POJO(普通的旧 Java 对象)。每次读取都会返回一个 POJO。 Spring Batch 提供了MappingLdifReaderBuilder来构造MappingLdifReader的实例。

1.15.6. 专业 Writer

Spring Batch 提供以下专业 Writer:

SimpleMailMessageItemWriter

SimpleMailMessageItemWriter是可以发送邮件的ItemWriter。它将消息的实际发送委派给MailSender的实例。 Spring Batch 提供了SimpleMailMessageItemWriterBuilder来构造SimpleMailMessageItemWriter的实例。

1.15.7. 专用处理器

Spring Batch 提供以下专用处理器:

ScriptItemProcessor

ScriptItemProcessorItemProcessor,它将当前 Item 传递给提供的脚本以进行处理,并且脚本的结果由处理器返回。 Spring Batch 提供了ScriptItemProcessorBuilder来构造ScriptItemProcessor的实例。

首页