001/* 002 * Copyright 2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.jsr.step.builder; 017 018import java.util.ArrayList; 019 020import org.springframework.batch.core.ChunkListener; 021import org.springframework.batch.core.Step; 022import org.springframework.batch.core.StepListener; 023import org.springframework.batch.core.jsr.configuration.support.BatchPropertyContext; 024import org.springframework.batch.core.jsr.step.BatchletStep; 025import org.springframework.batch.core.jsr.step.item.JsrChunkProcessor; 026import org.springframework.batch.core.jsr.step.item.JsrChunkProvider; 027import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; 028import org.springframework.batch.core.step.builder.SimpleStepBuilder; 029import org.springframework.batch.core.step.builder.StepBuilder; 030import org.springframework.batch.core.step.builder.StepBuilderException; 031import org.springframework.batch.core.step.item.ChunkOrientedTasklet; 032import org.springframework.batch.core.step.item.ChunkProcessor; 033import org.springframework.batch.core.step.item.ChunkProvider; 034import org.springframework.batch.core.step.tasklet.Tasklet; 035import org.springframework.batch.core.step.tasklet.TaskletStep; 036import org.springframework.batch.item.ItemStream; 037import org.springframework.batch.repeat.RepeatOperations; 038import org.springframework.batch.repeat.support.RepeatTemplate; 039import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate; 040import org.springframework.util.Assert; 041 042/** 043 * A step builder that extends the {@link FaultTolerantStepBuilder} to create JSR-352 044 * specific {@link ChunkProvider} and {@link ChunkProcessor} supporting the chunking 045 * pattern defined by the spec. 046 * 047 * @author Michael Minella 048 * 049 * @param <I> The input type for the step 050 * @param <O> The output type for the step 051 */ 052public class JsrSimpleStepBuilder<I, O> extends SimpleStepBuilder<I, O> { 053 054 private BatchPropertyContext batchPropertyContext; 055 056 public JsrSimpleStepBuilder(StepBuilder parent) { 057 super(parent); 058 } 059 060 public JsrPartitionStepBuilder partitioner(Step step) { 061 return new JsrPartitionStepBuilder(this).step(step); 062 } 063 064 public void setBatchPropertyContext(BatchPropertyContext batchPropertyContext) { 065 this.batchPropertyContext = batchPropertyContext; 066 } 067 068 /** 069 * Build the step from the components collected by the fluent setters. Delegates first to {@link #enhance(Step)} and 070 * then to {@link #createTasklet()} in subclasses to create the actual tasklet. 071 * 072 * @return a tasklet step fully configured and read to execute 073 */ 074 @Override 075 public TaskletStep build() { 076 registerStepListenerAsItemListener(); 077 registerAsStreamsAndListeners(getReader(), getProcessor(), getWriter()); 078 registerStepListenerAsChunkListener(); 079 080 BatchletStep step = new BatchletStep(getName(), batchPropertyContext); 081 082 super.enhance(step); 083 084 step.setChunkListeners(chunkListeners.toArray(new ChunkListener[0])); 085 086 if (getTransactionAttribute() != null) { 087 step.setTransactionAttribute(getTransactionAttribute()); 088 } 089 090 if (getStepOperations() == null) { 091 092 stepOperations(new RepeatTemplate()); 093 094 if (getTaskExecutor() != null) { 095 TaskExecutorRepeatTemplate repeatTemplate = new TaskExecutorRepeatTemplate(); 096 repeatTemplate.setTaskExecutor(getTaskExecutor()); 097 repeatTemplate.setThrottleLimit(getThrottleLimit()); 098 stepOperations(repeatTemplate); 099 } 100 101 ((RepeatTemplate) getStepOperations()).setExceptionHandler(getExceptionHandler()); 102 103 } 104 step.setStepOperations(getStepOperations()); 105 step.setTasklet(createTasklet()); 106 107 ItemStream[] streams = getStreams().toArray(new ItemStream[0]); 108 step.setStreams(streams); 109 110 try { 111 step.afterPropertiesSet(); 112 } 113 catch (Exception e) { 114 throw new StepBuilderException(e); 115 } 116 117 return step; 118 119 } 120 121 @Override 122 protected Tasklet createTasklet() { 123 Assert.state(getReader() != null, "ItemReader must be provided"); 124 Assert.state(getProcessor() != null || getWriter() != null, "ItemWriter or ItemProcessor must be provided"); 125 RepeatOperations repeatOperations = createRepeatOperations(); 126 ChunkProvider<I> chunkProvider = new JsrChunkProvider<I>(); 127 JsrChunkProcessor<I, O> chunkProcessor = new JsrChunkProcessor<I, O>(getReader(), getProcessor(), getWriter(), repeatOperations); 128 chunkProcessor.setListeners(new ArrayList<StepListener>(getItemListeners())); 129 ChunkOrientedTasklet<I> tasklet = new ChunkOrientedTasklet<I>(chunkProvider, chunkProcessor); 130 tasklet.setBuffering(!isReaderTransactionalQueue()); 131 return tasklet; 132 } 133 134 private RepeatOperations createRepeatOperations() { 135 RepeatTemplate repeatOperations = new RepeatTemplate(); 136 repeatOperations.setCompletionPolicy(getChunkCompletionPolicy()); 137 repeatOperations.setExceptionHandler(getExceptionHandler()); 138 return repeatOperations; 139 } 140}