6. ItemReaders 和 ItemWriters

所有批处理都可以用最简单的形式描述为读取大量数据,执行某种类型的计算或转换,以及将结果写出来。 Spring Batch 提供了三个 key 接口来帮助执行批量读写:ItemReaderItemProcessorItemWriter

6.1 ItemReader

虽然是一个简单的概念,ItemReader是从许多不同类型的输入提供数据的手段。最一般的例子包括:

  • 平面文件平面文件 Item Readers 从平面文件中读取数据的 lines,这些文件通常描述具有由文件中的固定位置定义的数据字段或由某个特殊字符(e.g. 逗号)分隔的数据字段。

  • XML - XML ItemReaders process XML 独立于用于解析,映射和验证 objects 的技术。输入数据允许针对 XSD schema 验证 XML 文件。

  • 数据库 - 访问数据库资源以返回 return 结果集,该结果集可以映射到 objects 进行处理。默认的 SQL ItemReaders 调用RowMapper到 return objects,如果需要重新启动则跟踪当前行,store 基本统计信息,并提供一些 transaction 增强功能,稍后将对此进行说明。

还有更多的可能性,但我们将专注于本章的基本内容。所有可用的 ItemReader 的完整列表可以在附录 A 中找到。

ItemReader是通用输入操作的基本接口:

public interface ItemReader<T> {

    T read() throws Exception, UnexpectedInputException, ParseException;

}

read方法定义ItemReader的最重要的 contract;调用它会返回一个 Item,如果没有剩余项,则返回 null。 item 可能表示文件中的 line,数据库中的行或 XML 文件中的元素。通常期望这些将映射到可用的 domain object(i.e.Trade,Foo 等),但 contract 中没有要求这样做。

预计ItemReader接口的_implement 只能向前进行。但是,如果底层资源是 transactional(例如 JMS 队列),则在回滚方案中,调用 read 可能会在后续 calls 上返回相同的逻辑 item。值得注意的是,ItemReader缺少 process 的项不会导致抛出 exception。对于 example,使用返回 0 结果的查询配置的数据库ItemReader将在read的第一次调用时 return null。

6.2 ItemWriter

ItemWriter在功能上类似于ItemReader,但具有反向操作。资源仍然需要定位,打开和关闭,但它们的不同之处在于ItemWriter写出而不是读入。在数据库或队列的情况下,这些可能是插入,更新或发送。输出序列化的格式特定于每个批 job。

ItemReader一样,ItemWriter是一个相当通用的接口:

public interface ItemWriter<T> {

    void write(List<? extends T> items) throws Exception;

}

ItemReader上的read一样,write提供了ItemWriter的基本 contract;它将尝试写出以 long 打开时传入的项目列表。因为通常期望将项目“批处理”到一个块然后输出,所以接口接受项目列表,而不是 item 本身。在写出列表之后,可以在从 write 方法返回之前执行任何可能需要的刷新。例如,如果写入 Hibernate DAO,则可以进行多次 calls 写入,每个 item 一次。然后 writer 可以在返回之前调用 hibernate Session 上的 close。

6.3 ItemProcessor

ItemReaderItemWriter接口对于它们的特定任务都非常有用,但是如果你想在写入之前插入业务逻辑怎么办?读取和写入的一个选项是使用复合 pattern:创建包含另一个ItemWriterItemWriter,或包含另一个ItemReaderItemReader。例如:

public class CompositeItemWriter<T> implements ItemWriter<T> {

    ItemWriter<T> itemWriter;

    public CompositeItemWriter(ItemWriter<T> itemWriter) {
        this.itemWriter = itemWriter;
    }

    public void write(List<? extends T> items) throws Exception {
        //Add business logic here
       itemWriter.write(item);
    }

    public void setDelegate(ItemWriter<T> itemWriter){
        this.itemWriter = itemWriter;
    }
}

上面的 class 包含在提供一些业务逻辑之后委托给它的另一个ItemWriter。这个 pattern 也可以很容易地用于ItemReader,也许是为了根据 main ItemReader提供的输入获得更多的 reference 数据。如果您需要自己控制对write的调用,这也很有用。但是,如果您只想在实际写入之前“转换”传入的 item 进行写入,则不需要自己调用write:您只想修改 item。对于此场景,Spring Batch 提供ItemProcessor接口:

public interface ItemProcessor<I, O> {

    O process(I item) throws Exception;
}

ItemProcessor非常简单;给一个 object,转换它,return 另一个。提供的 object 可能是也可能不是同一类型。关键是业务逻辑可以在 process 中应用,完全由开发人员创建。 ItemProcessor可以直接连接到 step,例如,假设ItemReader提供类型为 Foo 的 class,并且在写出之前需要将其转换为类型 Bar。可以编写执行转换的ItemProcessor

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class FooProcessor implements ItemProcessor<Foo,Bar>{
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarWriter implements ItemWriter<Bar>{
    public void write(List<? extends Bar> bars) throws Exception {
        //write bars
    }
}

在上面非常简单的 example 中,有一个 class Foo,一个 class Bar和一个 class FooProcessor,它们遵循ItemProcessor接口。转换很简单,但任何类型的转换都可以在这里完成。 BarWriter将用于写出Bar objects,如果提供任何其他类型则抛出 exception。类似地,如果提供了除之外的任何内容,将抛出 exception。然后可以将FooProcessor注入Step

<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="fooProcessor" writer="barWriter"
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>

6.3.1 链接 ItemProcessors

在许多情况下执行单个转换很有用,但是如果要将多个ItemProcessor链接在一起会怎么样?这可以使用前面提到的复合 pattern 来完成。要更新先前的单个转换,example,Foo将转换为Bar,它将转换为Foobar并写出:

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class Foobar{
    public Foobar(Bar bar) {}
}

public class FooProcessor implements ItemProcessor<Foo,Bar>{
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarProcessor implements ItemProcessor<Bar,FooBar>{
    public FooBar process(Bar bar) throws Exception {
        return new Foobar(bar);
    }
}

public class FoobarWriter implements ItemWriter<FooBar>{
    public void write(List<? extends FooBar> items) throws Exception {
        //write items
    }
}

FooProcessorBarProcessor可以“链接”在一起以得到结果Foobar

CompositeItemProcessor<Foo,Foobar> compositeProcessor =
                                      new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooTransformer());
itemProcessors.add(new BarTransformer());
compositeProcessor.setDelegates(itemProcessors);

与前面的示例一样,复合处理器可以配置为Step

<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="compositeProcessor" writer="foobarWriter"
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>

<bean id="compositeItemProcessor"
      class="org.springframework.batch.item.support.CompositeItemProcessor">
    <property name="delegates">
        <list>
            <bean class="..FooProcessor" />
            <bean class="..BarProcessor" />
        </list>
    </property>
</bean>

6.3.2 过滤记录

item 处理器的一个典型用途是在将记录传递给 ItemWriter 之前过滤掉记录。过滤是一种与跳过不同的行为;跳过表示 record 无效,而过滤只表示不应写入 record。

例如,考虑一个批处理 job,它读取包含三种不同类型记录的文件:insert 记录,要更新的记录和要删除的记录。如果系统不支持 record 删除,那么我们不希望向ItemWriter发送任何“删除”记录。但是,由于这些记录实际上并不是坏记录,我们希望将它们过滤掉,而不是跳过。因此,ItemWriter 只会收到“insert”和“update”记录。

要过滤 record,只需从ItemProcessor返回“null”。 framework 将检测到结果为“null”并避免将 item 添加到传递给ItemWriter的记录列表中。像往常一样,从ItemProcessor抛出的 exception 将导致跳过。

6.3.3 容错

回滚块时,可以重新处理在读取期间缓存的项目。如果 step 配置为容错(通常使用跳过或重试处理),则所使用的任何 ItemProcessor 都应以幂等的方式实现。通常,这将包括对 ItemProcessor 的输入 item 不执行任何更改,并且仅更新作为结果的实例。

6.4 ItemStream

ItemReaderItemWriter都很好地服务于他们的个人目的,但是他们两个都需要另外一个接口。通常,作为批处理 job 范围的一部分,readers 和 writers 需要打开,关闭,并需要一个持久化 state 的机制:

public interface ItemStream {

    void open(ExecutionContext executionContext) throws ItemStreamException;

    void update(ExecutionContext executionContext) throws ItemStreamException;

    void close() throws ItemStreamException;
}

在描述每种方法之前,我们应该提到ExecutionContext。 的_Cl也实现ItemStream的客户端应该在 calls 之前调用openread在 order 中打开任何资源,如 files 或获取连接。类似的限制适用于实现ItemStreamItemWriter。如第 2 章所述,如果在ExecutionContext中找到预期数据,则可以使用它在初始 state 以外的位置启动ItemReaderItemWriter。相反,将调用close以确保在open期间分配的任何资源都将被安全释放。主要调用update以确保当前保持的任何 state 被加载到提供的ExecutionContext中。在提交之前将调用此方法,以确保在提交之前当前 state 持久保存在数据库中。

ItemStream的 client 是Step(来自 Spring Batch Core)的特殊情况下,为每个StepExecution创建一个ExecutionContext,以允许用户 storestate 特定执行的 state,期望它将被返回,如果相同的JobInstance再次启动。对于熟悉 Quartz 的人来说,语义与 Quartz JobDataMap非常相似。

6.5 委托 Pattern 并注册 Step

请注意,CompositeItemWriter是委托 pattern 的 example,在 Spring Batch 中是 common。委托人自己可以实现回调接口StepListener。如果他们这样做,并且它们与 Spring Batch Core 一起用作JobStep的一部分,那么它们几乎肯定需要用Step手动注册。直接连接到 Step 的 reader,writer 或处理器如果实现ItemStreamStepListener接口,将自动注册。但是因为Step不知道委托,所以需要将它们注入为 listeners 或 stream(或两者都适当):

<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="fooProcessor" writer="compositeItemWriter"
                   commit-interval="2">
                    <streams>
                    <stream ref="barWriter" />
                </streams>
            </chunk>
        </tasklet>
    </step>
</job>

<bean id="compositeItemWriter" class="...CustomCompositeItemWriter">
    <property name="delegate" ref="barWriter" />
</bean>

<bean id="barWriter" class="...BarWriter" />

6.6 Flat Files

用于交换批量数据的最常见机制之一一直是平面文件。与 XML 有着一致的标准来定义它的结构(XSD),任何阅读平面文件的人都必须在 time 之前准确理解文件的结构。通常,所有平面 files 分为两种类型:分隔和固定长度。分隔的 files 是其中字段由分隔符分隔的那些文件,例如逗号。固定长度 files 具有设定长度的字段。

6.6.1 FieldSet

在 Spring Batch 中使用 flat files 时,无论是输入还是输出,最重要的类之一是FieldSet。许多体系结构和 libraries 包含帮助您从文件读入的抽象,但它们通常 return String 或 Strings 的 array。这真的只能让你到达那里。 FieldSet是 Spring Batch 的抽象,用于从文件资源启用 binding 字段。它允许开发人员使用文件输入,就像使用数据库输入一样。 FieldSet在概念上非常类似于 Jdbc ResultSet。 FieldSets 只需要一个参数,一个String array 的标记。或者,您也可以在字段名称中进行配置,以便在ResultSet之后可以通过索引或 name 访问字段。

String[] tokens = new String[]{"foo", "1", "true"};
FieldSet fs = new DefaultFieldSet(tokens);
String name = fs.readString(0);
int value = fs.readInt(1);
boolean booleanValue = fs.readBoolean(2);

FieldSet接口上还有更多选项,例如Date,long,BigDecimal等.FieldSet的最大优点是它提供了对平面文件输入的一致解析。而不是每个批处理 job 以可能意外的方式进行不同的解析,它在处理由格式 exception 引起的错误或进行简单的数据转换时都是一致的。

6.6.2 FlatFileItemReader

平面文件是包含最多 two-dimensional(表格)数据的任何类型的文件。在_Splass _Batch framework 中读取 flat files 是由 class FlatFileItemReader促进的,它提供了读取和解析 flat files 的基本功能。 FlatFileItemReader的两个最重要的必需依赖项是ResourceLineMapper. LineMapper接口将在下一节中进行更多探讨。资源 property 表示 Spring Core Resource。可以在Spring Framework,Chapter 5.Resources中找到解释如何创建此类型 beans 的文档。因此,本指南不会详细介绍 creating Resource objects。但是,可以在下面找到文件系统资源的简单示例:

Resource resource = new FileSystemResource("resources/trades.csv");

在复杂的批处理环境中,目录结构通常由 EAI 基础结构管理,其中建立用于外部接口的 drop zones,用于将 files 从 ftp 位置移动到批处理位置,反之亦然。文件移动实用程序超出了 spring batch architecture 的范围,但批处理 job 流包含文件移动实用程序作为 job 流中的步骤并不罕见。批处理 architecture 只需要知道如何定位要处理的 files 就足够了。 Spring Batch 开始从这个起点将数据输入管道的 process。但是,Spring Integration提供了许多这类服务。

FlatFileItemReader中的其他 properties 允许您进一步指定数据的解释方式:

表格 1_.FlatFileItemReader Properties

属性类型描述
评论String [140]指定指示 comment 行的 line 前缀
编码指定要使用的文本编码 - 默认为“ISO-8859-1”
lineMapperLineMapperString转换为表示 item 的Object
linesToSkipINT要在文件顶部忽略的 lines 数
recordSeparatorPolicyRecordSeparatorPolicy用于确定 line 结尾的位置,并且如果在引用的 string 中,则执行_继续 line 结束。
资源资源要从中读取的资源。
skippedLinesCallbackLineCallbackHandler传递要跳过的文件中 lines 的原始 line 内容的接口。如果 linesToSkip 设置为 2,则此接口将被调用两次。
严格boolean在严格模式下,如果输入资源不存在,reader 将在 ExecutionContext 上抛出 exception。

LineMapper

RowMapper一样,它采用低 level 构造(如ResultSet)并返回Object,平面文件处理需要相同的构造将String line 转换为Object

public interface LineMapper<T> {

    T mapLine(String line, int lineNumber) throws Exception;

}

基本的 contract 是,给定当前 line 和与之关联的 line 数,mapper 应该 return 一个结果 domain object。这类似于RowMapper,因为每个 line 与其 line 数相关联,就像ResultSet中的每一行都与其行号相关联一样。这允许将 line 编号绑定到生成的 domain object 以进行身份比较或获取更多信息 logging。然而,与RowMapper不同,LineMapper被赋予一个原始 line,如上所述,它只会让你到达中途。必须将 line 标记为FieldSet,然后可以将其映射到 object,如下所述。

LineTokenizer

将输入的 line 转换为 line 为FieldSet的抽象是必要的,因为可能有许多格式的平面文件数据需要转换为FieldSet。在 Spring Batch 中,此接口是LineTokenizer

public interface LineTokenizer {

    FieldSet tokenize(String line);

}

的 contract 是这样的,给定一个 line 输入(理论上String可以包含多个 line),将返回表示 line 的FieldSet。这个FieldSet然后可以传递给FieldSetMapper。 Spring Batch 包含以下LineTokenizer implementations:

  • DelmitedLineTokenizer - 用于 files,其中 record 中的字段由分隔符分隔。最常见的分隔符是逗号,但通常也使用管道或分号。

  • FixedLengthTokenizer - 用于 files,其中 record 中的字段均为“固定宽度”。必须为每个 record 类型定义每个字段的宽度。

  • PatternMatchingCompositeLineTokenizer - 通过检查 pattern 来确定应该在特定 line 上使用LineTokenizer列表中的哪一个。

FieldSetMapper

FieldSetMapper接口定义了一个方法mapFieldSet,它将FieldSet object 和 maps 的内容带到 object。这个 object 可能是自定义 DTO,domain object 或简单 array,具体取决于 job 的需要。 FieldSetMapperLineTokenizer结合使用,将数据的 line 转换为所需类型的 object:

public interface FieldSetMapper<T> {

    T mapFieldSet(FieldSet fieldSet);

}

使用的 pattern 与JdbcTemplate使用的RowMapper相同。

DefaultLineMapper

现在已经定义了用于读取 flat files 的基本接口,很明显需要三个基本步骤:

  • 从文件中读取一个 line。

  • 将 string line 传递给LineTokenizer#tokenize()方法,在 order 中检索FieldSet

  • 将从标记化返回的FieldSet传递给FieldSetMapper,从ItemReader#read()方法返回结果。

上面描述的两个接口代表两个单独的任务:将 line 转换为FieldSet,并将FieldSet映射到 domain object。因为LineTokenizer的输入与LineMapper(line)的输入匹配,并且FieldSetMapper的输出与LineMapper的输出匹配,所以提供了同时使用LineTokenizerFieldSetMapper的默认 implementation。 DefaultLineMapper表示大多数用户需要的行为:

public class DefaultLineMapper<T> implements LineMapper<T>, InitializingBean {

    private LineTokenizer tokenizer;

    private FieldSetMapper<T> fieldSetMapper;

    public T mapLine(String line, int lineNumber) throws Exception {
        return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
    }

    public void setLineTokenizer(LineTokenizer tokenizer) {
        this.tokenizer = tokenizer;
    }

    public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) {
        this.fieldSetMapper = fieldSetMapper;
    }
}

上述功能是在默认的 implementation 中提供的,而不是在 order 中自身构建(如 framework 的早期版本中所做的那样),以便用户在控制解析 process 时具有更大的灵活性,尤其是在访问原始 line 时需要。

简单分隔文件读取示例

以下 example 将用于使用实际的 domain 场景来说明这一点。这个特定批 job 从以下文件中读取足球运动员:

ID,lastName,firstName,position,birthYear,debutYear
"AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996",
"AbduRa00,Abdullah,Rabih,rb,1975,1999",
"AberWa00,Abercrombie,Walter,rb,1959,1982",
"AbraDa00,Abramowicz,Danny,wr,1945,1967",
"AdamBo00,Adams,Bob,te,1946,1969",
"AdamCh00,Adams,Charlie,wr,1979,2003"

此文件的内容将映射到以下Player domain object:

public class Player implements Serializable {

    private String ID;
    private String lastName;
    private String firstName;
    private String position;
    private int birthYear;
    private int debutYear;

    public String toString() {
        return "PLAYER:ID=" + ID + ",Last Name=" + lastName +
            ",First Name=" + firstName + ",Position=" + position +
            ",Birth Year=" + birthYear + ",DebutYear=" +
            debutYear;
    }

    // setters and getters...
}

在_list 中将FieldSet映射到Player object,需要定义返回玩家的FieldSetMapper

protected static class PlayerFieldSetMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fieldSet) {
        Player player = new Player();

        player.setID(fieldSet.readString(0));
        player.setLastName(fieldSet.readString(1));
        player.setFirstName(fieldSet.readString(2));
        player.setPosition(fieldSet.readString(3));
        player.setBirthYear(fieldSet.readInt(4));
        player.setDebutYear(fieldSet.readInt(5));

        return player;
    }
}

然后可以通过正确构造FlatFileItemReader并调用read来读取该文件:

FlatFileItemReader<Player> itemReader = new FlatFileItemReader<Player>();
itemReader.setResource(new FileSystemResource("resources/players.csv"));
//DelimitedLineTokenizer defaults to comma as its delimiter
DefaultLineMapper<Player> lineMapper = new DefaultLineMapper<Player>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new PlayerFieldSetMapper());
itemReader.setLineMapper(lineMapper);
itemReader.open(new ExecutionContext());
Player player = itemReader.read();

每次调用read都会_从文件中的每个 line 返回一个新的 Player object。到达文件末尾时,将返回 null。

按 Name 映射字段

DelimitedLineTokenizerFixedLengthTokenizer允许另外一项功能,它在 function 中与 Jdbc ResultSet类似。可以将这些字段的名称注入到这些LineTokenizer __mplement 中的任何一个中,以增加映射 function 的可读性。首先,将平面文件中所有字段的列名注入到 tokenizer 中:

tokenizer.setNames(new String[] {"ID", "lastName","firstName","position","birthYear","debutYear"});

FieldSetMapper可以使用以下信息:

public class PlayerMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fs) {

       if(fs == null){
           return null;
       }

       Player player = new Player();
       player.setID(fs.readString("ID"));
       player.setLastName(fs.readString("lastName"));
       player.setFirstName(fs.readString("firstName"));
       player.setPosition(fs.readString("position"));
       player.setDebutYear(fs.readInt("debutYear"));
       player.setBirthYear(fs.readInt("birthYear"));

       return player;
   }
}

自动将 FieldSet 设置为 Domain Objects

对于许多人来说,必须编写特定的FieldSetMapper与为JdbcTemplate编写特定的RowMapper一样繁琐。 Spring Batch 通过使用 JavaBean 规范通过在 object 上使用 setter 匹配字段 name 来自动 maps 字段来使这更容易。再次使用 football example,BeanWrapperFieldSetMapper configuration 如下所示:

<bean id="fieldSetMapper"
      class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
    <property name="prototypeBeanName" value="player" />
</bean>

<bean id="player"
      class="org.springframework.batch.sample.domain.Player"
      scope="prototype" />

对于FieldSet中的每个条目,映射器将在Player object 的新实例上查找相应的 setter(因此,需要原型范围),就像 Spring 容器查找与 property name 匹配的 setter 一样。将映射FieldSet中的每个可用字段,并返回结果Player object,不需要 code。

固定长度文件格式

到目前为止,仅详细讨论了分隔的 files,但是,它们仅代表文件读取图片的一半。许多使用 flat files 的组织使用固定长度格式。 example 固定长度文件如下:

UK21341EAH4121131.11customer1
UK21341EAH4221232.11customer2
UK21341EAH4321333.11customer3
UK21341EAH4421434.11customer4
UK21341EAH4521535.11customer5

虽然这看起来像一个大字段,但它实际上代表了 4 个不同的字段:

  • ISIN:item 的唯一标识符是 order - 12 个字符 long。

  • 数量:订购此 item 的数量 - 3 个字符 long。

  • 价格:item 的价格 - 5 个字符 long。

  • 客户:客户的 ID 订阅 item - 9 个字符 long。

配置FixedLengthLineTokenizer时,必须以范围的形式提供每个长度:

<bean id="fixedLengthLineTokenizer"
      class="org.springframework.batch.io.file.transform.FixedLengthTokenizer">
    <property name="names" value="ISIN,Quantity,Price,Customer" />
    <property name="columns" value="1-12, 13-15, 16-20, 21-29" />
</bean>

因为FixedLengthLineTokenizer使用与上面讨论的相同的LineTokenizer接口,所以将返回相同的FieldSet,就好像使用了分隔符一样。这允许在处理其输出时使用相同的方法,例如使用BeanWrapperFieldSetMapper

支持范围的上述语法要求在ApplicationContext中配置专用的 property 编辑器RangeArrayPropertyEditor。但是,此 bean 在ApplicationContext中自动声明,其中使用批处理命名空间。

单个文件中的多个 Record 类型

到目前为止,所有文件读取示例都为了简单起见而做出了 key 假设:文件中的所有记录都具有相同的格式。但是,情况可能并非总是如此。非常常见的是,文件可能具有不同格式的记录,需要以不同方式进行标记并映射到不同的 objects。以下文件摘录说明了这一点:

USER;Smith;Peter;;T;20014539;F
LINEA;1044391041ABC037.49G201XX1383.12H
LINEB;2134776319DEF422.99M005LI

在这个文件中,我们有三种类型的记录,“USER”,“LINEA”和“LINEB”。 “USER”line 对应于 User object。 “LINEA”和“LINEB”都对应于 Line objects,尽管“LINEA”比“LINEB”具有更多信息。

ItemReader将单独读取每个 line,但我们必须指定不同的LineTokenizerFieldSetMapper objects,以便ItemWriter将接收正确的项目。 PatternMatchingCompositeLineMapper通过允许 maps 的模式到LineTokenizers 和模式到FieldSetMappers 来配置,使这变得容易:

<bean id="orderFileLineMapper"
      class="org.spr...PatternMatchingCompositeLineMapper">
    <property name="tokenizers">
        <map>
            <entry key="USER*" value-ref="userTokenizer" />
            <entry key="LINEA*" value-ref="lineATokenizer" />
            <entry key="LINEB*" value-ref="lineBTokenizer" />
        </map>
    </property>
    <property name="fieldSetMappers">
        <map>
            <entry key="USER*" value-ref="userFieldSetMapper" />
            <entry key="LINE*" value-ref="lineFieldSetMapper" />
        </map>
    </property>
</bean>

在这个例子中,“LINEA”和“LINEB”有单独的LineTokenizer,但它们都使用相同的FieldSetMapper

PatternMatchingCompositeLineMapper在 order 中使用PatternMatchermatch方法来为每个 line 选择正确的委托。 PatternMatcher允许两个具有特殊含义的通配符:问号(“?”)将匹配一个字符,而星号(“*”)将匹配零个或多个字符。请注意,在上面的 configuration 中,所有模式都以星号结尾,这使它们成为 lines 的有效前缀。无论 configuration 中的 order 如何,PatternMatcher将始终匹配最具体的 pattern。因此,如果“ LINE *”和“LINEA *”都被列为模式,“LINEA”将 match pattern“LINEA *”,而“LINEB”将 match pattern“LINE ”。此外,通过匹配任何其他 pattern 不匹配的 line,单个星号(“”)可以作为默认值。

<entry key="*" value-ref="defaultLineTokenizer" />

还有PatternMatchingCompositeLineTokenizer可以单独用于标记化。

对于平面文件来说,包含每个 span 多个 lines 的记录也是如此。要处理这种情况,需要采用更复杂的策略。可以在第 11.5 节,“Multi-Line 记录”中找到此 common pattern 的演示。

Exception 处理 Flat Files

标记 line 时可能会导致抛出 exceptions。许多平面 files 不完美,包含格式不正确的记录。许多用户选择跳过这些错误的 lines,logging out the issue,original line 和 line number。稍后可以手动或通过另一批 job 检查这些日志。因此,Spring Batch 为处理解析 exceptions 提供了 exceptions 层次结构:FlatFileParseExceptionFlatFileFormatException。当尝试读取文件时遇到任何错误时,FlatFileItemReader会抛出FlatFileParseException。 由LineTokenizer接口的_implement 引发,并指示在标记化时遇到更具体的错误。

IncorrectTokenCountException

DelimitedLineTokenizerFixedLengthLineTokenizer都能够指定可用于创建FieldSet的列名。但是,如果列名称数不匹配在标记 line 时找到的列数,则无法创建FieldSet,并抛出IncorrectTokenCountException,其中包含遇到的标记数和预期数:

tokenizer.setNames(new String[] {"A", "B", "C", "D"});

try {
    tokenizer.tokenize("a,b,c");
}
catch(IncorrectTokenCountException e){
    assertEquals(4, e.getExpectedCount());
    assertEquals(3, e.getActualCount());
}

因为 tokenizer 配置了 4 个列名,但在文件中只找到了 3 个令牌,所以抛出了IncorrectTokenCountException

IncorrectLineLengthException

_以固定长度格式格式化的文件在解析时有其他要求,因为与分隔格式不同,每列必须严格遵守其预定义的宽度。如果总 line 长度未累加到此列的最宽 value,则抛出 exception:

tokenizer.setColumns(new Range[] { new Range(1, 5),
                                   new Range(6, 10),
                                   new Range(11, 15) });
try {
    tokenizer.tokenize("12345");
    fail("Expected IncorrectLineLengthException");
}
catch (IncorrectLineLengthException ex) {
    assertEquals(15, ex.getExpectedLength());
    assertEquals(5, ex.getActualLength());
}

上面的 tokenizer 的配置范围是:1-5,6-10 和 11-15,因此 line 预期的总长度为 15.但是,在这种情况下,传入长度为 5 的 line,导致IncorrectLineLengthException被抛出。在这里抛出 exception 而不是仅映射第一列允许 line 的处理更早失败,并且比尝试读取FieldSetMapper中的第 2 列时失败的信息更多。但是,有些情况下 line 的长度并不总是恒定的。因此,可以通过'strict' property 关闭 line length 的验证:

tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10) });
tokenizer.setStrict(false);
FieldSet tokens = tokenizer.tokenize("12345");
assertEquals("12345", tokens.readString(0));
assertEquals("", tokens.readString(1));

上面的例子几乎与之前的例子相同,只是调用了 tokenizer.setStrict(false)。此设置告诉标记生成器在标记 line 时不强制 line 长度。现在可以正确创建并返回FieldSet。但是,它只包含剩余值的空标记。

6.6.3 FlatFileItemWriter

写出平 files 有同样的问题和问题,从文件读入必须克服。 step 必须能够以 transactional 方式以分隔或固定长度格式写出。

LineAggregator

正如LineTokenizer接口是获取 item 并将其转换为String所必需的一样,文件编写必须能够将多个字段聚合为单个 string 以写入文件。在 Spring Batch 中这是LineAggregator

public interface LineAggregator<T> {

    public String aggregate(T item);

}

LineAggregatorLineTokenizer相反。 LineTokenizer接受String并返回FieldSet,而LineAggregator接受item并返回String

PassThroughLineAggregator

LineAggregator 接口最基本的 implementation 是PassThroughLineAggregator,它只是假设 object 已经是 string,或者它的 string 表示可以接受写:

public class PassThroughLineAggregator<T> implements LineAggregator<T> {

    public String aggregate(T item) {
        return item.toString();
    }
}

如果需要直接控制 creating string,则上述 implementation 很有用,但的优点,如 transaction 和 restart 支持,是必要的。

简化文件写入示例

现在已经定义了LineAggregator接口及其最基本的 implementation PassThroughLineAggregator,可以解释基本的写入流程:

  • 要写入的 object 传递给 order 中的LineAggregator以获得String

  • 返回的String将写入配置的文件。

以下摘自FlatFileItemWriter在 code 中表示:

public void write(T item) throws Exception {
    write(lineAggregator.aggregate(item) + LINE_SEPARATOR);
}

一个简单的 configuration 类似于以下内容:

<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
    <property name="resource" value="file:target/test-outputs/output.txt" />
    <property name="lineAggregator">
        <bean class="org.spr...PassThroughLineAggregator"/>
    </property>
</bean>

FieldExtractor

上述示例可能对写入文件的最基本用途有用。但是,FlatFileItemWriter的大多数用户都有一个需要写出的 domain object,因此必须转换为 line。在文件阅读中,需要以下内容:

  • 从文件中读取一个 line。

  • 将 string line 传递给LineTokenizer#tokenize()方法,在 order 中检索FieldSet

  • 将从标记化返回的FieldSet传递给FieldSetMapper,从ItemReader#read()方法返回结果

文件写入有类似但相反的步骤:

  • 传递 item 以写入 writer

  • 将 item 上的字段转换为 array

  • 将生成的 array 聚合成 line

因为 framework 无法知道 object 中需要写出哪些字段,所以必须编写FieldExtractor来完成将 item 转换为 array 的任务:

public interface FieldExtractor<T> {

    Object[] extract(T item);

}

接口的实现应该从提供的 object 的字段创建一个 array,然后可以在元素之间用分隔符写出,或者作为 field-widthline 的一部分。

PassThroughFieldExtractor

在许多情况下,需要写出集合,例如 array,CollectionFieldSet。从这些集合类型之一“提取”array 非常简单:只需将集合转换为 array 即可。因此,应在此方案中使用PassThroughFieldExtractor。应该注意的是,如果传入的 object 不是一种集合,那么PassThroughFieldExtractor将_return 一个仅包含要提取的 item 的 array。

BeanWrapperFieldExtractor

与文件读取部分中描述的BeanWrapperFieldSetMapper一样,通常最好配置如何将 domain object 转换为 object array,而不是自己编写转换。 BeanWrapperFieldExtractor提供了这种类型的功能:

BeanWrapperFieldExtractor<Name> extractor = new BeanWrapperFieldExtractor<Name>();
extractor.setNames(new String[] { "first", "last", "born" });

String first = "Alan";
String last = "Turing";
int born = 1912;

Name n = new Name(first, last, born);
Object[] values = extractor.extract(n);

assertEquals(first, values[0]);
assertEquals(last, values[1]);
assertEquals(born, values[2]);

这个提取器 implementation 只有一个必需的 property,即 map 的字段名称。正如BeanWrapperFieldSetMapper需要在FieldSet上的 map 字段上的 map 字段到提供的 object 上的 setter,BeanWrapperFieldExtractor需要 map 的名称来为创建 object array 的 getter。值得注意的是,名称的 order 决定了 array 中字段的 order。

分隔文件写入示例

最基本的平面文件格式是所有字段由分隔符分隔的格式。这可以使用DelimitedLineAggregator来完成。下面的 example 写出一个简单的 domain object,表示对客户帐户的信用:

public class CustomerCredit {

    private int id;
    private String name;
    private BigDecimal credit;

    //getters and setters removed for clarity
}

因为正在使用 domain object,所以必须提供 FieldExtractor 接口的 implementation,以及要使用的分隔符:

<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator">
        <bean class="org.spr...DelimitedLineAggregator">
            <property name="delimiter" value=","/>
            <property name="fieldExtractor">
                <bean class="org.spr...BeanWrapperFieldExtractor">
                    <property name="names" value="name,credit"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

在这种情况下,本章前面描述的BeanWrapperFieldExtractor用于将CustomerCredit中的 name 和 credit 字段转换为 object array,然后在每个字段之间用逗号写出。

固定宽度文件写入 Example

分隔不是唯一的平面文件格式。许多人更喜欢使用每列的设定宽度来描绘字段之间,这通常被称为“固定宽度”。 Spring Batch 通过FormatterLineAggregator支持文件写入。使用上面描述的相同CustomerCredit domain object,可以配置如下:

<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator">
        <bean class="org.spr...FormatterLineAggregator">
            <property name="fieldExtractor">
                <bean class="org.spr...BeanWrapperFieldExtractor">
                    <property name="names" value="name,credit" />
                </bean>
            </property>
            <property name="format" value="%-9s%-2.0f" />
        </bean>
    </property>
</bean>

大多数上述示例应该看起来很熟悉。但是,property 格式的 value 是新的:

<property name="format" value="%-9s%-2.0f" />

底层的 implementation 是使用与 Java 5 的一部分相同的Formatter构建的.Java Formatter基于 C 编程语言的printf功能。有关如何配置格式化程序的大多数详细信息都可以在格式化的 javadoc 中找到。

处理文件创建

FlatFileItemReader与文件资源的关系非常简单。初始化 reader 时,它会打开文件(如果存在),如果不存在则抛出 exception。文件写作并不那么简单。乍一看,似乎FlatFileItemWriter应该存在类似的直接 contract:如果文件已经存在,则抛出 exception,如果不存在,则创建它并开始编写。但是,可能重新启动Job可能会导致问题。在正常重启场景中,contract 是相反的:如果文件存在,则从最后一个已知的好位置开始写入,如果不存在,则抛出 exception。但是,如果此 job 的文件 name 始终相同,会发生什么?在这种情况下,除非重新启动,否则您希望删除该文件(如果存在)。由于这种可能性,FlatFileItemWriter包含 property,shouldDeleteIfExists。将此 property 设置为 true 将导致在打开 writer 时删除具有相同 name 的现有文件。

6.7 XML Item Readers 和 Writers

Spring Batch 提供 transactional 基础结构,既可以读取 XML 记录,也可以将它们映射到 Java objects,也可以将 Java objects 编写为 XML 记录。

StAX API 用于 I/O,因为其他标准 XML 解析 API 不适合批处理要求(DOM 将整个输入一次加载到 memory,SAX 控制解析 process,允许用户仅提供回调)。

让我们仔细看看 XML 输入和输出在 Spring Batch 中是如何工作的。首先,有一些概念因文件读写而异,但在 Spring Batch XML 处理中是 common。使用 XML 处理,而不是需要标记化的行记录(FieldSets),假设 XML 资源是与各个记录对应的“片段”的集合:

图 3.1:XML 输入

'trade'标签被定义为上述场景中的'根元素'。 '<trade>'和'</trade>'之间的所有内容都被视为一个'片段'。 Spring Batch 使用 Object/XML Mapping(OXM)将片段绑定到 objects。但是,Spring Batch 并不依赖于任何特定的 XML binding 技术。典型的用途是委托Spring OXM,它为最流行的 OXM 技术提供统一的抽象。对 Spring OXM 的依赖是可选的,如果需要,您可以选择实现 Spring Batch 特定接口。与 OXM 支持的技术的关系可以显示如下:

图 3.2:OXM Binding

现在介绍 OXM 以及如何使用 XML 片段来表示记录,让我们仔细看看 readers 和 writers。

6.7.1 StaxEventItemReader

StaxEventItemReader configuration 提供了从 XML 输入流处理记录的典型设置。首先,让我们检查一下StaxEventItemReader可以处理的一组 XML 记录。

<?xml version="1.0" encoding="UTF-8"?>
<records>
    <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0001</isin>
        <quantity>5</quantity>
        <price>11.39</price>
        <customer>Customer1</customer>
    </trade>
    <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0002</isin>
        <quantity>2</quantity>
        <price>72.99</price>
        <customer>Customer2c</customer>
    </trade>
    <trade xmlns="http://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0003</isin>
        <quantity>9</quantity>
        <price>99.99</price>
        <customer>Customer3</customer>
    </trade>
</records>

为了能够处理 XML 记录,需要以下内容:

  • 根元素 Name - 构成要映射的 object 的片段的根元素的 Name。 example configuration 使用 value of trade 来演示这一点。

  • 资源 - 表示要读取的文件的 Spring 资源。

  • Unmarshaller - Spring OXM 提供的解组工具,用于将 XML 片段映射到 object。

<bean id="itemReader" class="org.springframework.batch.item.xml.StaxEventItemReader">
    <property name="fragmentRootElementName" value="trade" />
    <property name="resource" value="data/iosample/input/input.xml" />
    <property name="unmarshaller" ref="tradeMarshaller" />
</bean>

请注意,在此 example 中,我们选择使用XStreamMarshaller接受作为 map 传入的别名,第一个 key 和 value 是片段的 name(i.e.根元素)和要绑定的 object 类型。然后,类似于FieldSet,map 类型中 map 的其他元素的名称在 map 中被描述为 key/value 对。在 configuration 文件中,我们可以使用 Spring configuration 实用程序来描述所需的别名,如下所示:

<bean id="tradeMarshaller"
      class="org.springframework.oxm.xstream.XStreamMarshaller">
    <property name="aliases">
        <util:map id="aliases">
            <entry key="trade"
                   value="org.springframework.batch.sample.domain.Trade" />
            <entry key="price" value="java.math.BigDecimal" />
            <entry key="name" value="java.lang.String" />
        </util:map>
    </property>
</bean>

在输入时,reader 读取 XML 资源,直到它识别出新片段即将开始(默认情况下通过匹配标记 name)。 reader 从片段创建一个独立的 XML 文档(或者至少使它看起来如此),并将文档传递给反序列化器(通常是 Spring OXM Unmarshaller周围的 wrapper),以将 XML 映射到 Java object。

总之,此过程类似于以下脚本化的 Java code,它使用 Spring configuration 提供的注入:

StaxEventItemReader xmlStaxEventItemReader = new StaxEventItemReader()
Resource resource = new ByteArrayResource(xmlResource.getBytes())

Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
XStreamMarshaller unmarshaller = new XStreamMarshaller();
unmarshaller.setAliases(aliases);
xmlStaxEventItemReader.setUnmarshaller(unmarshaller);
xmlStaxEventItemReader.setResource(resource);
xmlStaxEventItemReader.setFragmentRootElementName("trade");
xmlStaxEventItemReader.open(new ExecutionContext());

boolean hasNext = true

CustomerCredit credit = null;

while (hasNext) {
    credit = xmlStaxEventItemReader.read();
    if (credit == null) {
        hasNext = false;
    }
    else {
        System.out.println(credit);
    }
}

6.7.2 StaxEventItemWriter

输出与输入对称。 StaxEventItemWriter需要Resource,marshaller 和rootTagName。 Java object 被传递给 marshaller(通常是标准 Spring OXM Marshaller),它使用自定义 event writer 写入Resource,过滤由 OXM 工具为每个片段生成的StartDocumentEndDocumentevents。我们将使用MarshallingEventWriterSerializer在 example 中显示此内容。此设置的 Spring configuration 如下所示:

<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="marshaller" ref="customerCreditMarshaller" />
    <property name="rootTagName" value="customers" />
    <property name="overwriteOutput" value="true" />
</bean>

configuration 设置了三个必需的 properties,并且可选地设置 overwriteOutput=true,在本章前面提到过,用于指定是否可以覆盖现有文件。应该注意的是,用于 writer 的 marshaller 与本章前面的阅读 example 中使用的 marshaller 完全相同:

<bean id="customerCreditMarshaller"
      class="org.springframework.oxm.xstream.XStreamMarshaller">
    <property name="aliases">
        <util:map id="aliases">
            <entry key="customer"
                   value="org.springframework.batch.sample.domain.CustomerCredit" />
            <entry key="credit" value="java.math.BigDecimal" />
            <entry key="name" value="java.lang.String" />
        </util:map>
    </property>
</bean>

总结一下 Java example,下面的 code 说明了所讨论的所有要点,展示了所需 properties 的编程设置:

StaxEventItemWriter staxItemWriter = new StaxEventItemWriter()
FileSystemResource resource = new FileSystemResource("data/outputFile.xml")

Map aliases = new HashMap();
aliases.put("customer","org.springframework.batch.sample.domain.CustomerCredit");
aliases.put("credit","java.math.BigDecimal");
aliases.put("name","java.lang.String");
Marshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

staxItemWriter.setResource(resource);
staxItemWriter.setMarshaller(marshaller);
staxItemWriter.setRootTagName("trades");
staxItemWriter.setOverwriteOutput(true);

ExecutionContext executionContext = new ExecutionContext();
staxItemWriter.open(executionContext);
CustomerCredit Credit = new CustomerCredit();
trade.setPrice(11.39);
credit.setName("Customer1");
staxItemWriter.write(trade);

6.8 Multi-File 输入

在单个Step中处理多个 files 是一个常见的要求。假设 files 都具有相同的格式,MultiResourceItemReader支持 XML 和平面文件处理的这种类型的输入。考虑目录中的以下 files:

file-1.txt  file-2.txt  ignored.txt

file-1.txt 和 file-2.txt 的格式相同,并且出于商业原因应该一起处理。 MuliResourceItemReader可用于通过使用通配符读取两个 files:

<bean id="multiResourceReader" class="org.spr...MultiResourceItemReader">
    <property name="resources" value="classpath:data/input/file-*.txt" />
    <property name="delegate" ref="flatFileItemReader" />
</bean>

引用的委托是一个简单的FlatFileItemReader。上述 configuration 将从 files 读取输入,处理回滚和重启方案。应该注意的是,与任何ItemReader一样,添加额外输入(在这种情况下是文件)可能会在重新启动时导致潜在问题。建议批处理作业使用各自的目录,直到成功完成。

6.9 数据库

与大多数企业应用程序样式一样,数据库是批处理的中央存储机制。但是,由于系统必须使用的数据集的大小,批处理与其他 application 样式不同。如果 SQL 语句返回 100 万行,则结果集可能会将所有返回的结果保存在 memory 中,直到读取完所有行为止。 Spring Batch 为此问题提供了两种类型的解决方案:Cursor 和 Paging 数据库 ItemReaders。

6.9.1 基于游标的 ItemReaders

使用数据库游标通常是大多数批处理开发人员的默认方法,因为它是数据库解决“流”关系数据问题的方法。 Java ResultSet class 本质上是一个用于操作游标的 object 定向机制。 ResultSet将光标维持到当前数据行。在ResultSet上调用next会将此光标移动到下一行。 Spring Batch 基于游标的 ItemReaders 在初始化时打开一个游标,并在每次调用read时向前移动一行光标,返回一个可用于处理的映射对象。然后将调用close方法以确保释放所有资源。 Spring 核心JdbcTemplate通过使用回调 pattern 完全_map ResultSet中的所有行并关闭然后将控制权返回给方法调用者来解决此问题。但是,在批处理中,必须等到 step 完成。下面是一个基于ItemReader的游标如何工作的通用图,虽然 SQL 语句被用作 example,因为它广为人知,任何技术都可以实现基本方法:

这个 example 说明了基本的 pattern。给定'FOO'table,它有三列:ID,NAME 和 BAR,选择 ID 大于 1 但小于 7 的所有行。这将光标(第 1 行)的开头放在 ID 2 上。结果这一行应该是一个完全映射的 Foo object。再次调用read()会将光标移动到下一行,即 ID 为 3 的 Foo。这些读取的结果将在每个read之后写出,从而允许 objects 被垃圾收集(假设没有实例变量是保持 references 他们)。

JdbcCursorItemReader

JdbcCursorItemReader是基于游标的技术的 Jdbc implementation。它直接与ResultSet一起工作,并且需要一个 SQL 语句来运行从DataSource获得的连接。以下数据库 schema 将用作 example:

CREATE TABLE CUSTOMER (
   ID BIGINT IDENTITY PRIMARY KEY,
   NAME VARCHAR(45),
   CREDIT FLOAT
);

许多人更喜欢为每一行使用 domain object,因此我们将使用RowMapper接口的 implementation 来 map object:

public class CustomerCreditRowMapper implements RowMapper {

    public static final String ID_COLUMN = "id";
    public static final String NAME_COLUMN = "name";
    public static final String CREDIT_COLUMN = "credit";

    public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
        CustomerCredit customerCredit = new CustomerCredit();

        customerCredit.setId(rs.getInt(ID_COLUMN));
        customerCredit.setName(rs.getString(NAME_COLUMN));
        customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));

        return customerCredit;
    }
}

因为JdbcTemplate对于 Spring 的用户来说是如此熟悉,并且JdbcCursorItemReader与它共享 key 接口,所以查看如何使用JdbcTemplate读取此数据的 example 非常有用,在 order 中将其与ItemReader进行对比。出于本示例的目的,我们假设 CUSTOMER 数据库中有 1,000 行。第一个 example 将使用JdbcTemplate

//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
                                          new CustomerCreditRowMapper());

在 running 此 code 片段后,customerCredits 列表将包含 1,000 个CustomerCredit objects。在查询方法中,将从DataSource获取连接,提供的 SQL 将对其进行 run,并且将为ResultSet中的每一行调用mapRow方法。让我们将其与JdbcCursorItemReader的方法进行对比:

JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close(executionContext);

在 running 此 code 片段后,计数器将等于 1,000。如果上面的 code 将返回的 customerCredit 放入列表中,则结果与JdbcTemplate example 完全相同。然而,ItemReader的最大优点是它允许项目“流式传输”。 read方法可以调用一次,item 通过ItemWriter写出,然后通过read获得下一个 item。这允许 item 读取和写入以“块”完成并定期提交,这是高性能批处理的本质。此外,它非常容易配置为注入 Spring Batch Step

<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>
其他 Properties

因为在 Java 中打开游标有很多不同的选项,所以JdbcCustorItemReader上有许多 properties 可以设置:

表格 1_.JdbcCursorItemReader Properties

ignoreWarnings确定是否记录 SQLWarning 或导致 exception - 默认为 true
FETCHSIZEItemReader使用的ResultSet object 需要更多行时,为 Jdbc 驱动程序提供有关应从数据库中提取的行数的提示。默认情况下,不提供任何提示。
maxRows 进行设置底层ResultSet在任何一个 time 时可以容纳的最大行数限制。
QueryTimeout设置驱动程序等待Statement object 执行到指定秒数的秒数。如果超出限制,则抛出DataAccessEception。 (有关详细信息,请参阅驱动程序供应商文档
verifyCursorPosition因为ItemReader持有的ResultSet被传递给RowMapper,所以用户可以自己调用ResultSet.next(),这可能会导致 reader 的内部计数出现问题。如果在RowMapper调用之后光标位置与以前不同,则将此 value 设置为 true 将导致抛出 exception。
saveState 和指示 reader 的 state 是否应保存在由ItemStream#update(ExecutionContext)提供的ExecutionContext中。默认 value 为 true。
driverSupportsAbsolute默认为 false。指示 Jdbc 驱动程序是否支持在ResultSet上设置绝对行。对于支持ResultSet.absolute()的 Jdbc 驱动程序,建议将其设置为 true,因为它可能会提高 performance,尤其是在处理大型数据集时 step 失败时。
setUseSharedExtendedConnection默认为 false。指示是否应该由所有其他处理使用用于游标的连接,从而共享相同的 transaction。如果将此设置为 false(默认值),则将使用自己的连接打开游标,并且不会参与为 step 处理的 rest 启动的任何 transactions。如果将此 flag 设置为 true,则必须将DataSource包装在ExtendedConnectionDataSourceProxy中,以防止在每次提交后关闭和释放连接。将此选项设置为 true 时,将使用“READ_ONLY”和“HOLD_CUSORS_OVER_COMMIT”选项创建用于打开游标的语句。这允许在 transaction start 上保持游标打开,并在 step 处理中执行提交。要使用此 feature,您需要一个支持此功能的数据库和一个支持 Jdbc 3.0 或更高版本的 Jdbc 驱动程序。

HibernateCursorItemReader

正如正常的 Spring 用户做出关于是否使用 ORM 解决方案的重要决定,这会影响他们是否使用JdbcTemplateHibernateTemplate,Spring Batch 用户具有相同的选项。 HibernateCursorItemReader是游标技术的 Hibernate implementation。 Hibernate 在批处理中的使用一直存在争议。这主要是因为 Hibernate 最初是为支持在线 application 样式而开发的。但是,这并不意味着它不能用于批处理。解决此问题的最简单方法是使用StatelessSession而不是标准 session。这将删除 hibernate 使用的所有缓存和脏检查,这可能会导致批处理方案中出现问题。有关 stateless 和常规 hibernate 会话之间差异的更多信息,请参阅特定 hibernate 发行版的文档。 HibernateCursorItemReader允许您声明一个 HQL 语句并传入SessionFactory,它将以与JdbcCursorItemReader相同的基本方式将每次调用一个 item 传回read。下面是一个使用与 JDBC reader 相同的'customer credit'example 的 example configuration:

HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close(executionContext);

这个配置ItemReader将以JdbcCursorItemReader所描述的完全相同的方式 return CustomerCredit objects,假设已经为 Customer table 正确创建了 hibernate 映射 files。 'useStatelessSession'property 默认为 true,但这里已添加以引起注意打开或关闭它的能力。值得注意的是,可以通过 setFetchSize property 设置底层游标的 fetchSize。与JdbcCursorItemReader一样,configuration 很简单:

<bean id="itemReader"
      class="org.springframework.batch.item.database.HibernateCursorItemReader">
    <property name="sessionFactory" ref="sessionFactory" />
    <property name="queryString" value="from CustomerCredit" />
</bean>

StoredProcedureItemReader

有时需要使用存储过程获取游标数据。 StoredProcedureItemReader的工作方式与JdbcCursorItemReader类似,不同之处在于我们不执行查询来获取游标,而是执行返回游标的存储过程。存储过程可以以三种不同的方式返回光标:

  • 作为返回的 ResultSet(由 SQL Server,Sybase,DB2,Derby 和 MySQL 使用)

  • 作为 out 参数返回 ref-cursor(由 Oracle 和 PostgreSQL 使用)

  • 作为存储的 function 调用的 return value

以下是使用与之前相同的“客户信用”example 的基本 example configuration:

<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

此 example 依赖于存储过程来提供 ResultSet 作为返回结果(上面的选项 1)。

如果存储过程返回 ref-cursor(选项 2),那么我们需要提供 out 参数的位置,即返回的 ref-cursor。这是一个 example,其中第一个参数是返回的 ref-cursor:

<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

如果光标是从存储的 function(选项 3)返回的,我们需要将 property“function”设置为true。它默认为false。这是什么样子:

<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="function" value="true"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

在所有这些情况下,我们需要定义一个RowMapper以及一个DataSource和实际的过程 name。

如果存储过程或 function 接受参数,则必须通过参数 property 声明和设置它们。这是 Oracle 的一个 example,它声明了三个参数。第一个是返回 ref-cursor 的 out 参数,第二个和第三个是参数,它采用 INTEGER 类型的 value:

<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="spring.cursor_func"/>
    <property name="parameters">
        <list>
            <bean class="org.springframework.jdbc.core.SqlOutParameter">
                <constructor-arg index="0" value="newid"/>
                <constructor-arg index="1">
                    <util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="amount"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="custid"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
        </list>
    </property>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper" ref="rowMapper"/>
    <property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>

除了参数声明之外,我们还需要指定一个PreparedStatementSetter implementation 来设置调用的参数值。这与上面的JdbcCursorItemReader相同。 名为“Additional Properties”的部分中列出的所有其他 properties 也适用于StoredProcedureItemReader

6.9.2 Paging ItemReaders

使用数据库游标的替代方法是执行多个查询,其中每个查询都返回一部分结果。我们将此部分称为页面。执行的每个查询都必须指定起始行号和我们希望为页面返回的行数。

JdbcPagingItemReader

一个实现分页ItemReaderJdbcPagingItemReaderJdbcPagingItemReader需要PagingQueryProvider负责提供用于检索构成页面的行的 SQL 查询。由于每个数据库都有自己的提供分页支持的策略,因此我们需要为每个受支持的数据库类型使用不同的PagingQueryProvider。还有SqlPagingQueryProviderFactoryBean将 auto-detect 正在使用的数据库并确定适当的PagingQueryProvider 实现。这简化了 configuration,是推荐的最佳实践。

SqlPagingQueryProviderFactoryBean要求您指定 select 子句和 from 子句。您还可以提供可选的 where 子句。这些子句将用于构建与所需 sortKey 结合的 SQL 语句。

sortKey上设置唯一的 key 约束非常重要,以确保执行之间不会丢失任何数据。

打开 reader 后,它会以与任何其他ItemReader相同的基本方式将每次调用一个 item 传回read。当需要额外的行时,分页发生在幕后。

下面是一个 example configuration,它使用与上面基于光标的 ItemReaders 类似的'customer credit'example:

<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="queryProvider">
        <bean class="org.spr...SqlPagingQueryProviderFactoryBean">
            <property name="selectClause" value="select id, name, credit"/>
            <property name="fromClause" value="from customer"/>
            <property name="whereClause" value="where status=:status"/>
            <property name="sortKey" value="id"/>
        </bean>
    </property>
    <property name="parameterValues">
        <map>
            <entry key="status" value="NEW"/>
        </map>
    </property>
    <property name="pageSize" value="1000"/>
    <property name="rowMapper" ref="customerMapper"/>
</bean>

这个配置ItemReader将使用必须指定的RowMapper return CustomerCredit objects。 'pageSize'property 确定每次查询执行时从数据库读取的实体数。

'parameterValues'property 可用于为查询指定参数值的 Map。如果在 where 子句中使用命名参数,则每个条目的 key 应_匹配命名参数的 name。如果你使用传统的'?'占位符然后每个条目的 key 应该是占位符的编号,从 1 开始。

JpaPagingItemReader

另一个实现分页ItemReaderJpaPagingItemReader。 JPA 没有类似于 Hibernate StatelessSession的概念,所以我们必须使用 JPA 规范提供的其他 features。由于 JPA 支持分页,因此在使用 JPA 进行批处理时这是一个自然的选择。读取每个页面后,实体将分离,并且 order 中的 persistence context 将被清除,以允许在处理页面后对实体进行垃圾回收。

JpaPagingItemReader允许您声明 JPQL 语句并传入EntityManagerFactory。然后它会以与任何其他ItemReader相同的基本方式将每次调用一个 item 传回read。当需要其他实体时,分页发生在幕后。下面是一个 example configuration,使用与上面的 JDBC reader 相同的'customer credit'example:

<bean id="itemReader" class="org.spr...JpaPagingItemReader">
    <property name="entityManagerFactory" ref="entityManagerFactory"/>
    <property name="queryString" value="select c from CustomerCredit c"/>
    <property name="pageSize" value="1000"/>
</bean>

假设 Customer object 具有正确的 JPA annotations 或 ORM 映射文件,此配置的ItemReader将以与上述JdbcPagingItemReader所描述的完全相同的方式 return CustomerCredit objects。 'pageSize'property 确定每次查询执行时从数据库读取的实体数。

IbatisPagingItemReader

从 Spring Batch 3.0 开始,不推荐使用此 reader。

如果您使用 IBATIS 进行数据访问,那么您可以使用IbatisPagingItemReader,正如 name 所指示的那样,IbatisPagingItemReaderItemReader的实现。 IBATIS 没有直接支持读取页面中的行,但通过提供几个标准变量,您可以为 IBATIS 查询添加分页支持。

以下是IbatisPagingItemReader读取 CustomerCredits 的 configuration 示例,如上例所示:

<bean id="itemReader" class="org.spr...IbatisPagingItemReader">
    <property name="sqlMapClient" ref="sqlMapClient"/>
    <property name="queryId" value="getPagedCustomerCredits"/>
    <property name="pageSize" value="1000"/>
</bean>

上面的IbatisPagingItemReader configuration references 一个名为“getPagedCustomerCredits”的 IBATIS 查询。这是 MySQL 的查询应该是什么样子的示例。

<select id="getPagedCustomerCredits" resultMap="customerCreditResult">
    select id, name, credit from customer order by id asc LIMIT #_skiprows#, #_pagesize#
</select>

_skiprows_pagesize变量由IbatisPagingItemReader提供,并且还有一个_page变量,可以在必要时使用。分页查询的语法因使用的数据库而异。这是 Oracle 的一个示例(遗憾的是,我们需要为某些 operators 使用 CDATA,因为它属于 XML 文档):

<select id="getPagedCustomerCredits" resultMap="customerCreditResult">
    select * from (
      select * from (
        select t.id, t.name, t.credit, ROWNUM ROWNUM_ from customer t order by id
       )) where ROWNUM_ <![CDATA[ > ]]> ( #_page# * #_pagesize# )
    ) where ROWNUM <![CDATA[ <= ]]> #_pagesize#
</select>

6.9.3 数据库 ItemWriters

虽然 Flat Files 和 XML 都有特定的 ItemWriters,但数据库世界中没有完全等价的东西。这是因为 transactions 提供了所需的所有功能。对于 files,ItemWriters 是必需的,因为它们必须表现为 transactional,跟踪书面项目并在适当的时间刷新或清除。数据库不需要此功能,因为写入已包含在 transaction 中。用户可以创建自己的 DAO 来实现ItemWriter接口,也可以使用自定义ItemWriter中的一个来编写通用处理问题,无论哪种方式,它们应该没有任何问题。需要注意的一件事是通过批量输出提供的 performance 和错误处理功能。当 hibernate 用作ItemWriter时,这是最常见的,但在使用 Jdbc 批处理模式时可能会遇到同样的问题。批处理数据库输出没有任何固有的缺陷,假设我们小心刷新并且数据中没有错误。但是,写出时的任何错误都可能导致混淆,因为无法知道哪个 item 导致 exception,或者即使任何个人 item 负责,如下所示:

如果在写出之前缓冲了项,则在提交之前刷新缓冲区之前不会抛出遇到的任何错误。对于 example,假设每个块将写入 20 个项目,第 15 个 item 将抛出 DataIntegrityViolationException。就 Step 而言,所有 20 个 item 都将被成功写出,因为在实际写出错误之前无法知道错误。一旦Session# flush()被调用,缓冲区将被清空并且 exception 将被命中。此时,Step无法做任何事情,transaction 必须回滚。通常,此 exception 可能会导致 Item 被跳过(取决于 skip/retry policies),然后它将不再被写出。但是,在批处理方案中,没有办法知道哪个 item 导致了问题,整个缓冲区在故障发生时被写出来。解决此问题的唯一方法是在每个 item 之后刷新:

这是一个 common 用例,特别是在使用 Hibernate 时,ItemWriter的_implement 的简单指南是在每次调用write()时刷新。这样做可以可靠地跳过项目,Spring Batch 在发生错误后在内部注意 calls 的粒度为ItemWriter

6.10 重用现有服务

批处理系统通常与其他 application 样式一起使用。最常见的是一个在线系统,但它也可以通过移动每个 application 样式使用的必要批量数据来支持 integration 甚至厚 client application。因此,很多用户希望在批处理作业中重用现有的 DAO 或其他服务。 Spring 容器本身通过允许注入任何必要的 class 来使这相当容易。但是,可能存在这样的情况:现有服务需要充当ItemReaderItemWriter,以满足另一个 Spring Batch class 的依赖关系,或者因为它确实是 step 的主ItemReader。为每个需要包装的服务编写一个适配器 class 是相当简单的,但由于它是一个常见的问题,Spring Batch 提供 implementations:ItemReaderAdapterItemWriterAdapter。这两个 classes 都实现了调用委托 pattern 的标准 Spring 方法,并且设置起来相当简单。下面是 reader 的示例:

<bean id="itemReader" class="org.springframework.batch.item.adapter.ItemReaderAdapter">
    <property name="targetObject" ref="fooService" />
    <property name="targetMethod" value="generateFoo" />
</bean>

<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />

需要注意的一点是,targetMethod 的 contract 必须与read的 contract 相同:当用尽时,它将 return null,否则为Object。其他任何东西都会阻止 framework 知道何时应该结束处理,导致无限循环或不正确的失败,这取决于ItemWriter的 implementation。 ItemWriter implementation 同样简单:

<bean id="itemWriter" class="org.springframework.batch.item.adapter.ItemWriterAdapter">
    <property name="targetObject" ref="fooService" />
    <property name="targetMethod" value="processFoo" />
</bean>

<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />

6.11 验证输入

在本章的过程中,讨论了多种解析输入的方法。如果不是'well-formed',每个主要的 implementation 将抛出一个 exception。如果缺少一系列数据,FixedLengthTokenizer将抛出 exception。类似地,尝试访问FieldSetMapper FieldSetMapper中不存在的索引或格式与预期格式不同将导致抛出 exception。在read返回之前,将抛出所有这些类型的 exceptions。但是,它们没有解决返回的 item 是否有效的问题。例如,如果其中一个字段是年龄,则显然不能为负数。它将正确解析,因为它存在并且是一个数字,但它不会导致 exception。由于已经有大量的 Validation 框架,Spring Batch 不会尝试提供另一个,而是提供一个非常简单的接口,可以由任意数量的框架实现:

public interface Validator {

    void validate(Object value) throws ValidationException;

}

contract 是validate方法如果 object 无效则抛出 exception,如果有效则 return 正常返回。 Spring Batch 提供开箱即用的ItemProcessor:

<bean class="org.springframework.batch.item.validator.ValidatingItemProcessor">
    <property name="validator" ref="validator" />
</bean>

<bean id="validator"
      class="org.springframework.batch.item.validator.SpringValidator">
    <property name="validator">
        <bean id="orderValidator"
              class="org.springmodules.validation.valang.ValangValidator">
            <property name="valang">
                <value>
                    <![CDATA[
           { orderId : ? > 0 AND ? <= 9999999999 : 'Incorrect order ID' : 'error.order.id' }
           { totalLines : ? = size(lineItems) : 'Bad count of order lines'
                                              : 'error.order.lines.badcount'}
           { customer.registered : customer.businessCustomer = FALSE OR ? = TRUE
                                 : 'Business customer must be registered'
                                 : 'error.customer.registration'}
           { customer.companyName : customer.businessCustomer = FALSE OR ? HAS TEXT
                                  : 'Company name for business customer is mandatory'
                                  :'error.customer.companyname'}
                    ]]>
                </value>
            </property>
        </bean>
    </property>
</bean>

这个简单的 example 显示了一个简单的ValangValidator,用于验证 order object。目的不是要显示 Valang 功能,以显示如何添加验证器。

6.12 防止 State 持久性

默认情况下,所有ItemReaderItemWriter _mplempleations _在提交之前将当前的 state 存储在ExecutionContext中。但是,这可能并不总是理想的行为。例如,许多开发人员选择使用 process 指标使其数据库 readers'可重新运行'。将一个额外的列添加到输入数据以指示它是否已被处理。当正在读取(或写出)特定的 record 时,处理后的 flag 将从 false 翻转为 true。然后,SQL 语句可以在 where 子句中包含一个额外的语句,例如“where PROCESSED_IND = false”,从而确保在重新启动时只返回未处理的记录。在这种情况下,最好不要 store 任何 state,例如当前行号,因为它在重新启动时将无关紧要。因此,所有 readers 和 writers 都包含'saveState'property:

<bean id="playerSummarizationSource" class="org.spr...JdbcCursorItemReader">
    <property name="dataSource" ref="dataSource" />
    <property name="rowMapper">
        <bean class="org.springframework.batch.sample.PlayerSummaryMapper" />
    </property>
    <property name="saveState" value="false" />
    <property name="sql">
        <value>
            SELECT games.player_id, games.year_no, SUM(COMPLETES),
            SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),
            SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),
            SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)
            from games, players where players.player_id =
            games.player_id group by games.player_id, games.year_no
        </value>
    </property>
</bean>

上面配置的ItemReader将不会在ExecutionContext中为其参与的任何执行创建任何条目。

6.13 Creating Custom ItemReaders 和 ItemWriters

到目前为止,在本章中已经讨论了 Spring Batch 中存在的用于读写的基本 contracts 以及一些 common implementations。但是,这些都是相当通用的,并且有很多潜在的场景可能无法通过开箱即用的实现来实现。本节将使用一个简单的 example 显示如何创建自定义ItemReaderItemWriter implementation 并正确实现其 contracts。 ItemReader还将在 order 中实现ItemStream,以说明如何使 reader 或 writer 可重启。

6.13.1 Custom ItemReader Example

出于此 example 的目的,将创建从提供的列表中读取的简单ItemReader implementation。我们首先要实现ItemReaderread的最基本的 contract:

public class CustomItemReader<T> implements ItemReader<T>{

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NoWorkFoundException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

这个非常简单的 class 获取一个项目列表,并在 time 返回一个项目,从列表中删除每个项目。当列表为空时,它返回 null,从而满足ItemReader的最基本要求,如下所示:

List<String> items = new ArrayList<String>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<String>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());

使 ItemReader 可重新启动

现在的最后一个挑战是使ItemReader可重启。目前,如果电源耗尽,处理再次开始,ItemReader必须从头开始。这在许多情况下实际上都是有效的,但有时最好是批量 job 从它停止的地方开始。 key 判别式通常是 reader 是有状态的还是 stateless 的。 stateless reader 不需要担心可重启性,但有状态的 reader 必须尝试在重启时重新构建其最后一个已知的 state。因此,我们建议您尽可能保留自定义 readers stateless,这样您就不必担心可重启性。

如果确实需要 store state,则应使用ItemStream接口:

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }

        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if(executionContext.containsKey(CURRENT_INDEX)){
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else{
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

在每次调用ItemStream update方法时,ItemReader的当前索引将以_current.index 的 key 存储在提供的ExecutionContext中。调用ItemStream open方法时,将检查ExecutionContext以查看它是否包含带有该 key 的条目。如果找到 key,则将当前索引移动到该位置。这是一个相当简单的例子,但它仍然符合一般的 contract:

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<String>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<String>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

大多数 ItemReaders 都有更复杂的重启逻辑。 JdbcCursorItemReader,对于 example,存储 Cursor 中最后一个处理行的行 id。

值得注意的是中使用的 key 不应该是微不足道的。那是因为Step用于Step中的所有ItemStream。在大多数情况下,只需在 class name 之前加上 key 就足以保证唯一性。但是,在极少数情况下,同一个 step 中使用了两个相同类型的ItemStream(如果需要输出两个 files 会发生这种情况),那么将需要一个更独特的 name。出于这个原因,许多 Spring Batch ItemReaderItemWriter __mplement 都有setName()属性,允许重写此 key name。

6.13.2 自定义 ItemWriter Example

实现自定义ItemWriter在很多方面类似于上面的ItemReader example,但在足够的方面有所不同,以保证自己的 example。但是,添加可重启性基本相同,因此本示例中不会介绍它。与ItemReader example 一样,order 将在 order 中使用,以使 example 尽可能简单:

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(List<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}

使 ItemWriter 可重新启动

为了使 ItemWriter 可以重新启动,我们将遵循与ItemReader相同的 process,添加和实现ItemStream接口以同步执行 context。在 example 中,我们可能需要计算处理的项目数,并将其添加为页脚 record。如果我们需要这样做,我们可以在ItemWriter中实现ItemStream,这样如果流是 re-opened,则从执行 context 重构计数器。

在许多实际情况中,自定义 ItemWriters 还委托另一个 writer 本身可以重新启动(e.g. 写入文件时),或者它写入 transactional 资源,因此不需要重新启动,因为它是 stateless。当你有一个有状态的 writer 时,你也应该确保实现ItemStreamItemWriter。还要记住 writer 的 client 需要知道ItemStream,因此您可能需要将其注册为 configuration xml 中的流。

Updated at: 9 months ago
5.4.2. 工作范围Table of content7. 缩放和 Parallel 处理