Class FaultTolerantChunkProcessor<I,O>
- java.lang.Object
- org.springframework.batch.core.step.item.SimpleChunkProcessor<I,O>
- org.springframework.batch.core.step.item.FaultTolerantChunkProcessor<I,O>
- All Implemented Interfaces:
ChunkProcessor<I>
,org.springframework.beans.factory.InitializingBean
public class FaultTolerantChunkProcessor<I,O> extends SimpleChunkProcessor<I,O>
FaultTolerant implementation of theChunkProcessor
interface, that allows for skipping or retry of items that cause exceptions during writing.
Constructor Summary
Constructors Constructor Description FaultTolerantChunkProcessor(ItemProcessor<? super I,? extends O> itemProcessor, ItemWriter<? super O> itemWriter, BatchRetryTemplate batchRetryTemplate)
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected Chunk<O>
getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs)
Extension point for subclasses that want to adjust the outputs based on additional saved data in the inputs.protected int
getFilterCount(Chunk<I> inputs, Chunk<O> outputs)
Extension point for subclasses to calculate the filter count.protected void
initializeUserData(Chunk<I> inputs)
Extension point for subclasses to allow them to memorise the contents of the inputs, in case they are needed for accounting purposes later.protected boolean
isComplete(Chunk<I> inputs)
Extension point for subclasses that want to store additional data in the inputs.void
setBuffering(boolean buffering)
A flag to indicate that items have been buffered and therefore will always come back as a chunk after a rollback.void
setChunkMonitor(ChunkMonitor chunkMonitor)
void
setKeyGenerator(KeyGenerator keyGenerator)
TheKeyGenerator
to use to identify failed items across rollback.void
setProcessorTransactional(boolean processorTransactional)
Flag to say that theItemProcessor
is transactional (defaults to true).void
setProcessSkipPolicy(SkipPolicy SkipPolicy)
void
setRollbackClassifier(org.springframework.classify.Classifier<java.lang.Throwable,java.lang.Boolean> rollbackClassifier)
A classifier that can distinguish between exceptions that cause rollback (return true) or not (return false).void
setWriteSkipPolicy(SkipPolicy SkipPolicy)
protected Chunk<O>
transform(StepContribution contribution, Chunk<I> inputs)
protected void
write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs)
Simple implementation delegates to theSimpleChunkProcessor.doWrite(List)
method and increments the write count in the contribution.Methods inherited from class org.springframework.batch.core.step.item.SimpleChunkProcessor
afterPropertiesSet, doAfterWrite, doOnWriteError, doProcess, doWrite, getListener, process, registerListener, setItemProcessor, setItemWriter, setListeners, writeItems
Constructor Detail
FaultTolerantChunkProcessor
public FaultTolerantChunkProcessor(ItemProcessor<? super I,? extends O> itemProcessor, ItemWriter<? super O> itemWriter, BatchRetryTemplate batchRetryTemplate)
Method Detail
setKeyGenerator
public void setKeyGenerator(KeyGenerator keyGenerator)
TheKeyGenerator
to use to identify failed items across rollback. Not used in the case of thebuffering flag
being true (the default).- Parameters:
keyGenerator
- theKeyGenerator
to set
setProcessSkipPolicy
public void setProcessSkipPolicy(SkipPolicy SkipPolicy)
- Parameters:
SkipPolicy
- theSkipPolicy
for item processing
setWriteSkipPolicy
public void setWriteSkipPolicy(SkipPolicy SkipPolicy)
- Parameters:
SkipPolicy
- theSkipPolicy
for item writing
setRollbackClassifier
public void setRollbackClassifier(org.springframework.classify.Classifier<java.lang.Throwable,java.lang.Boolean> rollbackClassifier)
A classifier that can distinguish between exceptions that cause rollback (return true) or not (return false).- Parameters:
rollbackClassifier
- classifier
setChunkMonitor
public void setChunkMonitor(ChunkMonitor chunkMonitor)
- Parameters:
chunkMonitor
- monitor
setBuffering
public void setBuffering(boolean buffering)
A flag to indicate that items have been buffered and therefore will always come back as a chunk after a rollback. Otherwise things are more complicated because after a rollback the new chunk might or might not contain items from the previous failed chunk.- Parameters:
buffering
- true if items will be buffered
setProcessorTransactional
public void setProcessorTransactional(boolean processorTransactional)
Flag to say that theItemProcessor
is transactional (defaults to true). If false then the processor is only called once per item per chunk, even if there are rollbacks with retries and skips.- Parameters:
processorTransactional
- the flag value to set
initializeUserData
protected void initializeUserData(Chunk<I> inputs)
Description copied from class:SimpleChunkProcessor
Extension point for subclasses to allow them to memorise the contents of the inputs, in case they are needed for accounting purposes later. The default implementation sets up some user data to remember the original size of the inputs. If this method is overridden then some or all ofSimpleChunkProcessor.isComplete(Chunk)
,SimpleChunkProcessor.getFilterCount(Chunk, Chunk)
andSimpleChunkProcessor.getAdjustedOutputs(Chunk, Chunk)
might also need to be, to ensure that the user data is handled consistently.- Overrides:
initializeUserData
in classSimpleChunkProcessor<I,O>
- Parameters:
inputs
- the inputs for the process
getFilterCount
protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs)
Description copied from class:SimpleChunkProcessor
Extension point for subclasses to calculate the filter count. Defaults to the difference between input size and output size.- Overrides:
getFilterCount
in classSimpleChunkProcessor<I,O>
- Parameters:
inputs
- the inputs after transformationoutputs
- the outputs after transformation- Returns:
- the difference in sizes
- See Also:
SimpleChunkProcessor.initializeUserData(Chunk)
isComplete
protected boolean isComplete(Chunk<I> inputs)
Description copied from class:SimpleChunkProcessor
Extension point for subclasses that want to store additional data in the inputs. Default just checks if inputs are empty.- Overrides:
isComplete
in classSimpleChunkProcessor<I,O>
- Parameters:
inputs
- the input chunk- Returns:
- true if it is empty
- See Also:
SimpleChunkProcessor.initializeUserData(Chunk)
getAdjustedOutputs
protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs)
Description copied from class:SimpleChunkProcessor
Extension point for subclasses that want to adjust the outputs based on additional saved data in the inputs. Default implementation just returns the outputs unchanged.- Overrides:
getAdjustedOutputs
in classSimpleChunkProcessor<I,O>
- Parameters:
inputs
- the inputs for the transformationoutputs
- the result of the transformation- Returns:
- the outputs unchanged
- See Also:
SimpleChunkProcessor.initializeUserData(Chunk)
transform
protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws java.lang.Exception
- Overrides:
transform
in classSimpleChunkProcessor<I,O>
- Throws:
java.lang.Exception
write
protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws java.lang.Exception
Description copied from class:SimpleChunkProcessor
Simple implementation delegates to theSimpleChunkProcessor.doWrite(List)
method and increments the write count in the contribution. Subclasses can handle more complicated scenarios, e.g.with fault tolerance. If output items are skipped they should be removed from the inputs as well.- Overrides:
write
in classSimpleChunkProcessor<I,O>
- Parameters:
contribution
- the current step contributioninputs
- the inputs that gave rise to the outputsoutputs
- the outputs to write- Throws:
java.lang.Exception
- if there is a problem