001/* 002 * Copyright 2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.jsr.step.item; 017 018import org.apache.commons.logging.Log; 019import org.apache.commons.logging.LogFactory; 020import org.springframework.batch.core.StepContribution; 021import org.springframework.batch.core.StepListener; 022import org.springframework.batch.core.listener.MulticasterBatchListener; 023import org.springframework.batch.core.step.item.BatchRetryTemplate; 024import org.springframework.batch.core.step.item.Chunk; 025import org.springframework.batch.core.step.item.ChunkMonitor; 026import org.springframework.batch.core.step.item.ForceRollbackForWriteSkipException; 027import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy; 028import org.springframework.batch.core.step.skip.SkipException; 029import org.springframework.batch.core.step.skip.SkipPolicy; 030import org.springframework.batch.core.step.skip.SkipPolicyFailedException; 031import org.springframework.batch.item.ItemProcessor; 032import org.springframework.batch.item.ItemReader; 033import org.springframework.batch.item.ItemWriter; 034import org.springframework.batch.repeat.RepeatOperations; 035import org.springframework.classify.BinaryExceptionClassifier; 036import org.springframework.classify.Classifier; 037import org.springframework.retry.RecoveryCallback; 038import org.springframework.retry.RetryCallback; 039import org.springframework.retry.RetryContext; 040import org.springframework.retry.RetryException; 041import org.springframework.util.Assert; 042 043import javax.batch.operations.BatchRuntimeException; 044import java.util.List; 045 046/** 047 * Extension of the {@link JsrChunkProcessor} that adds skip and retry functionality. 048 * 049 * @author Michael Minella 050 * @author Chris Schaefer 051 * 052 * @param <I> input type for the step 053 * @param <O> output type for the step 054 */ 055public class JsrFaultTolerantChunkProcessor<I,O> extends JsrChunkProcessor<I, O> { 056 protected final Log logger = LogFactory.getLog(getClass()); 057 private SkipPolicy skipPolicy = new LimitCheckingItemSkipPolicy(); 058 private Classifier<Throwable, Boolean> rollbackClassifier = new BinaryExceptionClassifier(true); 059 private final BatchRetryTemplate batchRetryTemplate; 060 private ChunkMonitor chunkMonitor = new ChunkMonitor(); 061 private boolean hasProcessor = false; 062 063 public JsrFaultTolerantChunkProcessor(ItemReader<? extends I> reader, ItemProcessor<? super I, ? extends O> processor, ItemWriter<? super O> writer, RepeatOperations repeatTemplate, BatchRetryTemplate batchRetryTemplate) { 064 super(reader, processor, writer, repeatTemplate); 065 hasProcessor = processor != null; 066 this.batchRetryTemplate = batchRetryTemplate; 067 } 068 069 /** 070 * @param skipPolicy a {@link SkipPolicy} 071 */ 072 public void setSkipPolicy(SkipPolicy skipPolicy) { 073 Assert.notNull(skipPolicy, "A skip policy is required"); 074 075 this.skipPolicy = skipPolicy; 076 } 077 078 /** 079 * @param rollbackClassifier a {@link Classifier} 080 */ 081 public void setRollbackClassifier(Classifier<Throwable, Boolean> rollbackClassifier) { 082 Assert.notNull(rollbackClassifier, "A rollbackClassifier is required"); 083 084 this.rollbackClassifier = rollbackClassifier; 085 } 086 087 /** 088 * @param chunkMonitor a {@link ChunkMonitor} 089 */ 090 public void setChunkMonitor(ChunkMonitor chunkMonitor) { 091 Assert.notNull(chunkMonitor, "A chunkMonitor is required"); 092 093 this.chunkMonitor = chunkMonitor; 094 } 095 096 /** 097 * Register some {@link StepListener}s with the handler. Each will get the 098 * callbacks in the order specified at the correct stage. 099 * 100 * @param listeners listeners to be registered 101 */ 102 @Override 103 public void setListeners(List<? extends StepListener> listeners) { 104 for (StepListener listener : listeners) { 105 registerListener(listener); 106 } 107 } 108 109 /** 110 * Register a listener for callbacks at the appropriate stages in a process. 111 * 112 * @param listener a {@link StepListener} 113 */ 114 @Override 115 public void registerListener(StepListener listener) { 116 getListener().register(listener); 117 } 118 119 /** 120 * Adds retry and skip logic to the reading phase of the chunk loop. 121 * 122 * @param contribution a {@link StepContribution} 123 * @param chunk a {@link Chunk} 124 * @return I an item 125 * @throws Exception thrown if error occurs. 126 */ 127 @Override 128 protected I provide(final StepContribution contribution, final Chunk<I> chunk) throws Exception { 129 RetryCallback<I, Exception> retryCallback = new RetryCallback<I, Exception>() { 130 131 @Override 132 public I doWithRetry(RetryContext arg0) throws Exception { 133 while (true) { 134 try { 135 return doProvide(contribution, chunk); 136 } 137 catch (Exception e) { 138 if (shouldSkip(skipPolicy, e, contribution.getStepSkipCount())) { 139 140 // increment skip count and try again 141 contribution.incrementReadSkipCount(); 142 chunk.skip(e); 143 144 getListener().onSkipInRead(e); 145 146 logger.debug("Skipping failed input", e); 147 } 148 else { 149 getListener().onRetryReadException(e); 150 151 if(rollbackClassifier.classify(e)) { 152 throw e; 153 } 154 else { 155 throw e; 156 } 157 } 158 } 159 } 160 } 161 }; 162 163 RecoveryCallback<I> recoveryCallback = new RecoveryCallback<I>() { 164 165 @Override 166 public I recover(RetryContext context) throws Exception { 167 Throwable e = context.getLastThrowable(); 168 if (shouldSkip(skipPolicy, e, contribution.getStepSkipCount())) { 169 contribution.incrementReadSkipCount(); 170 logger.debug("Skipping after failed process", e); 171 return null; 172 } 173 else { 174 if (rollbackClassifier.classify(e)) { 175 // Default is to rollback unless the classifier 176 // allows us to continue 177 throw new RetryException("Non-skippable exception in recoverer while reading", e); 178 } 179 180 throw new BatchRuntimeException(e); 181 } 182 } 183 184 }; 185 186 return batchRetryTemplate.execute(retryCallback, recoveryCallback); 187 } 188 189 /** 190 * Convenience method for calling process skip policy. 191 * 192 * @param policy the skip policy 193 * @param e the cause of the skip 194 * @param skipCount the current skip count 195 */ 196 private boolean shouldSkip(SkipPolicy policy, Throwable e, int skipCount) { 197 try { 198 return policy.shouldSkip(e, skipCount); 199 } 200 catch (SkipException ex) { 201 throw ex; 202 } 203 catch (RuntimeException ex) { 204 throw new SkipPolicyFailedException("Fatal exception in SkipPolicy.", ex, e); 205 } 206 } 207 208 /** 209 * Adds retry and skip logic to the process phase of the chunk loop. 210 * 211 * @param contribution a {@link StepContribution} 212 * @param item an item to be processed 213 * @return O an item that has been processed if a processor is available 214 * @throws Exception thrown if error occurs. 215 */ 216 @Override 217 @SuppressWarnings("unchecked") 218 protected O transform(final StepContribution contribution, final I item) throws Exception { 219 if (!hasProcessor) { 220 return (O) item; 221 } 222 223 RetryCallback<O, Exception> retryCallback = new RetryCallback<O, Exception>() { 224 225 @Override 226 public O doWithRetry(RetryContext context) throws Exception { 227 try { 228 return doTransform(item); 229 } 230 catch (Exception e) { 231 if (shouldSkip(skipPolicy, e, contribution.getStepSkipCount())) { 232 // If we are not re-throwing then we should check if 233 // this is skippable 234 contribution.incrementProcessSkipCount(); 235 logger.debug("Skipping after failed process with no rollback", e); 236 // If not re-throwing then the listener will not be 237 // called in next chunk. 238 getListener().onSkipInProcess(item, e); 239 } else { 240 getListener().onRetryProcessException(item, e); 241 242 if (rollbackClassifier.classify(e)) { 243 // Default is to rollback unless the classifier 244 // allows us to continue 245 throw e; 246 } 247 else { 248 throw e; 249 } 250 } 251 } 252 return null; 253 } 254 255 }; 256 257 RecoveryCallback<O> recoveryCallback = new RecoveryCallback<O>() { 258 @Override 259 public O recover(RetryContext context) throws Exception { 260 Throwable e = context.getLastThrowable(); 261 if (shouldSkip(skipPolicy, e, contribution.getStepSkipCount())) { 262 contribution.incrementProcessSkipCount(); 263 logger.debug("Skipping after failed process", e); 264 return null; 265 } 266 else { 267 if (rollbackClassifier.classify(e)) { 268 // Default is to rollback unless the classifier 269 // allows us to continue 270 throw new RetryException("Non-skippable exception in recoverer while processing", e); 271 } 272 273 throw new BatchRuntimeException(e); 274 } 275 } 276 }; 277 278 return batchRetryTemplate.execute(retryCallback, recoveryCallback); 279 } 280 281 /** 282 * Adds retry and skip logic to the write phase of the chunk loop. 283 * 284 * @param contribution a {@link StepContribution} 285 * @param chunk a {@link Chunk} 286 * @throws Exception thrown if error occurs. 287 */ 288 @Override 289 protected void persist(final StepContribution contribution, final Chunk<O> chunk) throws Exception { 290 291 RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() { 292 @Override 293 @SuppressWarnings({ "unchecked", "rawtypes" }) 294 public Object doWithRetry(RetryContext context) throws Exception { 295 296 chunkMonitor.setChunkSize(chunk.size()); 297 try { 298 doPersist(contribution, chunk); 299 } 300 catch (Exception e) { 301 if (shouldSkip(skipPolicy, e, contribution.getStepSkipCount())) { 302 // Per section 9.2.7 of JSR-352, the SkipListener receives all the items within the chunk 303 ((MulticasterBatchListener) getListener()).onSkipInWrite(chunk.getItems(), e); 304 } else { 305 getListener().onRetryWriteException((List<Object>) chunk.getItems(), e); 306 307 if (rollbackClassifier.classify(e)) { 308 throw e; 309 } 310 } 311 /* 312 * If the exception is marked as no-rollback, we need to 313 * override that, otherwise there's no way to write the 314 * rest of the chunk or to honour the skip listener 315 * contract. 316 */ 317 throw new ForceRollbackForWriteSkipException( 318 "Force rollback on skippable exception so that skipped item can be located.", e); 319 } 320 contribution.incrementWriteCount(chunk.size()); 321 return null; 322 323 } 324 }; 325 326 RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() { 327 328 @Override 329 public O recover(RetryContext context) throws Exception { 330 Throwable e = context.getLastThrowable(); 331 if (shouldSkip(skipPolicy, e, contribution.getStepSkipCount())) { 332 contribution.incrementWriteSkipCount(); 333 logger.debug("Skipping after failed write", e); 334 return null; 335 } 336 else { 337 if (rollbackClassifier.classify(e)) { 338 // Default is to rollback unless the classifier 339 // allows us to continue 340 throw new RetryException("Non-skippable exception in recoverer while write", e); 341 } 342 return null; 343 } 344 } 345 346 }; 347 348 batchRetryTemplate.execute(retryCallback, recoveryCallback); 349 } 350}