001/* 002 * Copyright 2006-2007 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.core.step.item; 018 019import org.springframework.batch.core.StepContribution; 020import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy; 021import org.springframework.batch.core.step.skip.NonSkippableReadException; 022import org.springframework.batch.core.step.skip.SkipException; 023import org.springframework.batch.core.step.skip.SkipListenerFailedException; 024import org.springframework.batch.core.step.skip.SkipPolicy; 025import org.springframework.batch.core.step.skip.SkipPolicyFailedException; 026import org.springframework.batch.item.ItemReader; 027import org.springframework.batch.repeat.RepeatOperations; 028import org.springframework.classify.BinaryExceptionClassifier; 029import org.springframework.classify.Classifier; 030 031/** 032 * FaultTolerant implementation of the {@link ChunkProcessor} interface, that 033 * allows for skipping or retry of items that cause exceptions during reading or 034 * processing. 035 * 036 */ 037public class FaultTolerantChunkProvider<I> extends SimpleChunkProvider<I> { 038 039 /** 040 * Hard limit for number of read skips in the same chunk. Should be 041 * sufficiently high that it is only encountered in a runaway step where all 042 * items are skipped before the chunk can complete (leading to a potential 043 * heap memory problem). 044 */ 045 public static final int DEFAULT_MAX_SKIPS_ON_READ = 100; 046 047 private SkipPolicy skipPolicy = new LimitCheckingItemSkipPolicy(); 048 049 private Classifier<Throwable, Boolean> rollbackClassifier = new BinaryExceptionClassifier(true); 050 051 private int maxSkipsOnRead = DEFAULT_MAX_SKIPS_ON_READ; 052 053 public FaultTolerantChunkProvider(ItemReader<? extends I> itemReader, RepeatOperations repeatOperations) { 054 super(itemReader, repeatOperations); 055 } 056 057 /** 058 * @param maxSkipsOnRead the maximum number of skips on read 059 */ 060 public void setMaxSkipsOnRead(int maxSkipsOnRead) { 061 this.maxSkipsOnRead = maxSkipsOnRead; 062 } 063 064 /** 065 * The policy that determines whether exceptions can be skipped on read. 066 * @param skipPolicy instance of {@link SkipPolicy} to be used by FaultTolerantChunkProvider. 067 */ 068 public void setSkipPolicy(SkipPolicy skipPolicy) { 069 this.skipPolicy = skipPolicy; 070 } 071 072 /** 073 * Classifier to determine whether exceptions have been marked as 074 * no-rollback (as opposed to skippable). If encountered they are simply 075 * ignored, unless also skippable. 076 * 077 * @param rollbackClassifier the rollback classifier to set 078 */ 079 public void setRollbackClassifier(Classifier<Throwable, Boolean> rollbackClassifier) { 080 this.rollbackClassifier = rollbackClassifier; 081 } 082 083 @Override 084 protected I read(StepContribution contribution, Chunk<I> chunk) throws Exception { 085 while (true) { 086 try { 087 return doRead(); 088 } 089 catch (Exception e) { 090 091 if (shouldSkip(skipPolicy, e, contribution.getStepSkipCount())) { 092 093 // increment skip count and try again 094 contribution.incrementReadSkipCount(); 095 chunk.skip(e); 096 097 if (chunk.getErrors().size() >= maxSkipsOnRead) { 098 throw new SkipOverflowException("Too many skips on read"); 099 } 100 101 logger.debug("Skipping failed input", e); 102 } 103 else { 104 if (rollbackClassifier.classify(e)) { 105 throw new NonSkippableReadException("Non-skippable exception during read", e); 106 } 107 logger.debug("No-rollback for non-skippable exception (ignored)", e); 108 } 109 110 } 111 } 112 } 113 114 @Override 115 public void postProcess(StepContribution contribution, Chunk<I> chunk) { 116 for (Exception e : chunk.getErrors()) { 117 try { 118 getListener().onSkipInRead(e); 119 } 120 catch (RuntimeException ex) { 121 throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e); 122 } 123 } 124 } 125 126 /** 127 * Convenience method for calling process skip policy. 128 * 129 * @param policy the skip policy 130 * @param e the cause of the skip 131 * @param skipCount the current skip count 132 */ 133 private boolean shouldSkip(SkipPolicy policy, Throwable e, int skipCount) { 134 try { 135 return policy.shouldSkip(e, skipCount); 136 } 137 catch (SkipException ex) { 138 throw ex; 139 } 140 catch (RuntimeException ex) { 141 throw new SkipPolicyFailedException("Fatal exception in SkipPolicy.", ex, e); 142 } 143 } 144 145}