001/* 002 * Copyright 2006-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.core.step.item; 018 019import java.util.ArrayList; 020import java.util.Collections; 021import java.util.Iterator; 022import java.util.List; 023import java.util.concurrent.atomic.AtomicInteger; 024import java.util.concurrent.atomic.AtomicReference; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028 029import org.springframework.batch.core.StepContribution; 030import org.springframework.batch.core.listener.StepListenerFailedException; 031import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy; 032import org.springframework.batch.core.step.skip.NonSkippableProcessException; 033import org.springframework.batch.core.step.skip.SkipLimitExceededException; 034import org.springframework.batch.core.step.skip.SkipListenerFailedException; 035import org.springframework.batch.core.step.skip.SkipPolicy; 036import org.springframework.batch.item.ItemProcessor; 037import org.springframework.batch.item.ItemWriter; 038import org.springframework.classify.BinaryExceptionClassifier; 039import org.springframework.classify.Classifier; 040import org.springframework.retry.ExhaustedRetryException; 041import org.springframework.retry.RecoveryCallback; 042import org.springframework.retry.RetryCallback; 043import org.springframework.retry.RetryContext; 044import org.springframework.retry.RetryException; 045import org.springframework.retry.support.DefaultRetryState; 046 047/** 048 * FaultTolerant implementation of the {@link ChunkProcessor} interface, that 049 * allows for skipping or retry of items that cause exceptions during writing. 050 * 051 */ 052public class FaultTolerantChunkProcessor<I, O> extends SimpleChunkProcessor<I, O> { 053 054 private SkipPolicy itemProcessSkipPolicy = new LimitCheckingItemSkipPolicy(); 055 056 private SkipPolicy itemWriteSkipPolicy = new LimitCheckingItemSkipPolicy(); 057 058 private final BatchRetryTemplate batchRetryTemplate; 059 060 private Classifier<Throwable, Boolean> rollbackClassifier = new BinaryExceptionClassifier(true); 061 062 private Log logger = LogFactory.getLog(getClass()); 063 064 private boolean buffering = true; 065 066 private KeyGenerator keyGenerator; 067 068 private ChunkMonitor chunkMonitor = new ChunkMonitor(); 069 070 private boolean processorTransactional = true; 071 072 /** 073 * The {@link KeyGenerator} to use to identify failed items across rollback. 074 * Not used in the case of the {@link #setBuffering(boolean) buffering flag} 075 * being true (the default). 076 * 077 * @param keyGenerator the {@link KeyGenerator} to set 078 */ 079 public void setKeyGenerator(KeyGenerator keyGenerator) { 080 this.keyGenerator = keyGenerator; 081 } 082 083 /** 084 * @param SkipPolicy the {@link SkipPolicy} for item processing 085 */ 086 public void setProcessSkipPolicy(SkipPolicy SkipPolicy) { 087 this.itemProcessSkipPolicy = SkipPolicy; 088 } 089 090 /** 091 * @param SkipPolicy the {@link SkipPolicy} for item writing 092 */ 093 public void setWriteSkipPolicy(SkipPolicy SkipPolicy) { 094 this.itemWriteSkipPolicy = SkipPolicy; 095 } 096 097 /** 098 * A classifier that can distinguish between exceptions that cause rollback 099 * (return true) or not (return false). 100 * 101 * @param rollbackClassifier classifier 102 */ 103 public void setRollbackClassifier(Classifier<Throwable, Boolean> rollbackClassifier) { 104 this.rollbackClassifier = rollbackClassifier; 105 } 106 107 /** 108 * @param chunkMonitor monitor 109 */ 110 public void setChunkMonitor(ChunkMonitor chunkMonitor) { 111 this.chunkMonitor = chunkMonitor; 112 } 113 114 /** 115 * A flag to indicate that items have been buffered and therefore will 116 * always come back as a chunk after a rollback. Otherwise things are more 117 * complicated because after a rollback the new chunk might or might not 118 * contain items from the previous failed chunk. 119 * 120 * @param buffering true if items will be buffered 121 */ 122 public void setBuffering(boolean buffering) { 123 this.buffering = buffering; 124 } 125 126 /** 127 * Flag to say that the {@link ItemProcessor} is transactional (defaults to 128 * true). If false then the processor is only called once per item per 129 * chunk, even if there are rollbacks with retries and skips. 130 * 131 * @param processorTransactional the flag value to set 132 */ 133 public void setProcessorTransactional(boolean processorTransactional) { 134 this.processorTransactional = processorTransactional; 135 } 136 137 public FaultTolerantChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor, 138 ItemWriter<? super O> itemWriter, BatchRetryTemplate batchRetryTemplate) { 139 super(itemProcessor, itemWriter); 140 this.batchRetryTemplate = batchRetryTemplate; 141 } 142 143 @Override 144 protected void initializeUserData(Chunk<I> inputs) { 145 @SuppressWarnings("unchecked") 146 UserData<O> data = (UserData<O>) inputs.getUserData(); 147 if (data == null) { 148 data = new UserData<O>(); 149 inputs.setUserData(data); 150 data.setOutputs(new Chunk<O>()); 151 } 152 else { 153 // BATCH-2663: re-initialize filter count when scanning the chunk 154 if (data.scanning()) { 155 data.filterCount = 0; 156 } 157 } 158 } 159 160 @Override 161 protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) { 162 @SuppressWarnings("unchecked") 163 UserData<O> data = (UserData<O>) inputs.getUserData(); 164 return data.filterCount; 165 } 166 167 @Override 168 protected boolean isComplete(Chunk<I> inputs) { 169 170 /* 171 * Need to remember the write skips across transactions, otherwise they 172 * keep coming back. Since we register skips with the inputs they will 173 * not be processed again but the output skips need to be saved for 174 * registration later with the listeners. The inputs are going to be the 175 * same for all transactions processing the same chunk, but the outputs 176 * are not, so we stash them in user data on the inputs. 177 */ 178 179 @SuppressWarnings("unchecked") 180 UserData<O> data = (UserData<O>) inputs.getUserData(); 181 Chunk<O> previous = data.getOutputs(); 182 183 return inputs.isEmpty() && previous.getSkips().isEmpty(); 184 185 } 186 187 @Override 188 protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) { 189 190 @SuppressWarnings("unchecked") 191 UserData<O> data = (UserData<O>) inputs.getUserData(); 192 Chunk<O> previous = data.getOutputs(); 193 194 Chunk<O> next = new Chunk<O>(outputs.getItems(), previous.getSkips()); 195 next.setBusy(previous.isBusy()); 196 197 // Remember for next time if there are skips accumulating 198 data.setOutputs(next); 199 200 return next; 201 202 } 203 204 @Override 205 protected Chunk<O> transform(final StepContribution contribution, Chunk<I> inputs) throws Exception { 206 207 Chunk<O> outputs = new Chunk<O>(); 208 @SuppressWarnings("unchecked") 209 final UserData<O> data = (UserData<O>) inputs.getUserData(); 210 final Chunk<O> cache = data.getOutputs(); 211 final Iterator<O> cacheIterator = cache.isEmpty() ? null : new ArrayList<O>(cache.getItems()).iterator(); 212 final AtomicInteger count = new AtomicInteger(0); 213 214 // final int scanLimit = processorTransactional && data.scanning() ? 1 : 215 // 0; 216 217 for (final Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) { 218 219 final I item = iterator.next(); 220 221 RetryCallback<O, Exception> retryCallback = new RetryCallback<O, Exception>() { 222 223 @Override 224 public O doWithRetry(RetryContext context) throws Exception { 225 O output = null; 226 try { 227 count.incrementAndGet(); 228 O cached = (cacheIterator != null && cacheIterator.hasNext()) ? cacheIterator.next() : null; 229 if (cached != null && !processorTransactional) { 230 output = cached; 231 } 232 else { 233 output = doProcess(item); 234 if (output == null) { 235 data.incrementFilterCount(); 236 } else if (!processorTransactional && !data.scanning()) { 237 cache.add(output); 238 } 239 } 240 } 241 catch (Exception e) { 242 if (rollbackClassifier.classify(e)) { 243 // Default is to rollback unless the classifier 244 // allows us to continue 245 throw e; 246 } 247 else if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) { 248 // If we are not re-throwing then we should check if 249 // this is skippable 250 contribution.incrementProcessSkipCount(); 251 logger.debug("Skipping after failed process with no rollback", e); 252 // If not re-throwing then the listener will not be 253 // called in next chunk. 254 callProcessSkipListener(item, e); 255 } 256 else { 257 // If it's not skippable that's an error in 258 // configuration - it doesn't make sense to not roll 259 // back if we are also not allowed to skip 260 throw new NonSkippableProcessException( 261 "Non-skippable exception in processor. Make sure any exceptions that do not cause a rollback are skippable.", 262 e); 263 } 264 } 265 if (output == null) { 266 // No need to re-process filtered items 267 iterator.remove(); 268 } 269 return output; 270 } 271 272 }; 273 274 RecoveryCallback<O> recoveryCallback = new RecoveryCallback<O>() { 275 276 @Override 277 public O recover(RetryContext context) throws Exception { 278 Throwable e = context.getLastThrowable(); 279 if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) { 280 iterator.remove(e); 281 contribution.incrementProcessSkipCount(); 282 logger.debug("Skipping after failed process", e); 283 return null; 284 } 285 else { 286 if (rollbackClassifier.classify(e)) { 287 // Default is to rollback unless the classifier 288 // allows us to continue 289 throw new RetryException("Non-skippable exception in recoverer while processing", e); 290 } 291 iterator.remove(e); 292 return null; 293 } 294 } 295 296 }; 297 298 O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState( 299 getInputKey(item), rollbackClassifier)); 300 if (output != null) { 301 outputs.add(output); 302 } 303 304 /* 305 * We only want to process the first item if there is a scan for a 306 * failed item. 307 */ 308 if (data.scanning()) { 309 while (cacheIterator != null && cacheIterator.hasNext()) { 310 outputs.add(cacheIterator.next()); 311 } 312 // Only process the first item if scanning 313 break; 314 } 315 } 316 317 return outputs; 318 319 } 320 321 @Override 322 protected void write(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs) 323 throws Exception { 324 @SuppressWarnings("unchecked") 325 final UserData<O> data = (UserData<O>) inputs.getUserData(); 326 final AtomicReference<RetryContext> contextHolder = new AtomicReference<RetryContext>(); 327 328 RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() { 329 @Override 330 public Object doWithRetry(RetryContext context) throws Exception { 331 contextHolder.set(context); 332 333 if (!data.scanning()) { 334 chunkMonitor.setChunkSize(inputs.size()); 335 try { 336 doWrite(outputs.getItems()); 337 } 338 catch (Exception e) { 339 if (rollbackClassifier.classify(e)) { 340 throw e; 341 } 342 /* 343 * If the exception is marked as no-rollback, we need to 344 * override that, otherwise there's no way to write the 345 * rest of the chunk or to honour the skip listener 346 * contract. 347 */ 348 throw new ForceRollbackForWriteSkipException( 349 "Force rollback on skippable exception so that skipped item can be located.", e); 350 } 351 contribution.incrementWriteCount(outputs.size()); 352 } 353 else { 354 scan(contribution, inputs, outputs, chunkMonitor, false); 355 } 356 return null; 357 358 } 359 }; 360 361 if (!buffering) { 362 363 RecoveryCallback<Object> batchRecoveryCallback = new RecoveryCallback<Object>() { 364 365 @Override 366 public Object recover(RetryContext context) throws Exception { 367 368 Throwable e = context.getLastThrowable(); 369 if (outputs.size() > 1 && !rollbackClassifier.classify(e)) { 370 throw new RetryException("Invalid retry state during write caused by " 371 + "exception that does not classify for rollback: ", e); 372 } 373 374 Chunk<I>.ChunkIterator inputIterator = inputs.iterator(); 375 for (Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); outputIterator.hasNext();) { 376 377 inputIterator.next(); 378 outputIterator.next(); 379 380 checkSkipPolicy(inputIterator, outputIterator, e, contribution, true); 381 if (!rollbackClassifier.classify(e)) { 382 throw new RetryException( 383 "Invalid retry state during recovery caused by exception that does not classify for rollback: ", 384 e); 385 } 386 387 } 388 389 return null; 390 391 } 392 393 }; 394 395 batchRetryTemplate.execute(retryCallback, batchRecoveryCallback, 396 BatchRetryTemplate.createState(getInputKeys(inputs), rollbackClassifier)); 397 398 } 399 else { 400 401 RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() { 402 403 @Override 404 public Object recover(RetryContext context) throws Exception { 405 /* 406 * If the last exception was not skippable we don't need to 407 * do any scanning. We can just bomb out with a retry 408 * exhausted. 409 */ 410 if (!shouldSkip(itemWriteSkipPolicy, context.getLastThrowable(), -1)) { 411 throw new ExhaustedRetryException( 412 "Retry exhausted after last attempt in recovery path, but exception is not skippable.", 413 context.getLastThrowable()); 414 } 415 416 inputs.setBusy(true); 417 data.scanning(true); 418 scan(contribution, inputs, outputs, chunkMonitor, true); 419 return null; 420 } 421 422 }; 423 424 if (logger.isDebugEnabled()) { 425 logger.debug("Attempting to write: " + inputs); 426 } 427 try { 428 batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs, 429 rollbackClassifier)); 430 } 431 catch (Exception e) { 432 RetryContext context = contextHolder.get(); 433 if (!batchRetryTemplate.canRetry(context)) { 434 /* 435 * BATCH-1761: we need advance warning of the scan about to 436 * start in the next transaction, so we can change the 437 * processing behaviour. 438 */ 439 data.scanning(true); 440 } 441 throw e; 442 } 443 444 } 445 446 callSkipListeners(inputs, outputs); 447 448 } 449 450 private void callSkipListeners(final Chunk<I> inputs, final Chunk<O> outputs) { 451 452 for (SkipWrapper<I> wrapper : inputs.getSkips()) { 453 I item = wrapper.getItem(); 454 if (item == null) { 455 continue; 456 } 457 Throwable e = wrapper.getException(); 458 callProcessSkipListener(item, e); 459 } 460 461 for (SkipWrapper<O> wrapper : outputs.getSkips()) { 462 Throwable e = wrapper.getException(); 463 try { 464 getListener().onSkipInWrite(wrapper.getItem(), e); 465 } 466 catch (RuntimeException ex) { 467 throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e); 468 } 469 } 470 471 // Clear skips if we are possibly going to process this chunk again 472 outputs.clearSkips(); 473 inputs.clearSkips(); 474 475 } 476 477 /** 478 * Convenience method for calling process skip listener, so that it can be 479 * called from multiple places. 480 * 481 * @param item the item that is skipped 482 * @param e the cause of the skip 483 */ 484 private void callProcessSkipListener(I item, Throwable e) { 485 try { 486 getListener().onSkipInProcess(item, e); 487 } 488 catch (RuntimeException ex) { 489 throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e); 490 } 491 } 492 493 /** 494 * Convenience method for calling process skip policy, so that it can be 495 * called from multiple places. 496 * 497 * @param policy the skip policy 498 * @param e the cause of the skip 499 * @param skipCount the current skip count 500 */ 501 private boolean shouldSkip(SkipPolicy policy, Throwable e, int skipCount) { 502 try { 503 return policy.shouldSkip(e, skipCount); 504 } 505 catch (SkipLimitExceededException ex) { 506 throw ex; 507 } 508 catch (RuntimeException ex) { 509 throw new SkipListenerFailedException("Fatal exception in SkipPolicy.", ex, e); 510 } 511 } 512 513 private Object getInputKey(I item) { 514 if (keyGenerator == null) { 515 return item; 516 } 517 return keyGenerator.getKey(item); 518 } 519 520 private List<?> getInputKeys(final Chunk<I> inputs) { 521 if (keyGenerator == null) { 522 return inputs.getItems(); 523 } 524 List<Object> keys = new ArrayList<Object>(); 525 for (I item : inputs.getItems()) { 526 keys.add(keyGenerator.getKey(item)); 527 } 528 return keys; 529 } 530 531 private void checkSkipPolicy(Chunk<I>.ChunkIterator inputIterator, Chunk<O>.ChunkIterator outputIterator, 532 Throwable e, StepContribution contribution, boolean recovery) throws Exception { 533 logger.debug("Checking skip policy after failed write"); 534 if (shouldSkip(itemWriteSkipPolicy, e, contribution.getStepSkipCount())) { 535 contribution.incrementWriteSkipCount(); 536 inputIterator.remove(); 537 outputIterator.remove(e); 538 logger.debug("Skipping after failed write", e); 539 } 540 else { 541 if (recovery) { 542 // Only if already recovering should we check skip policy 543 throw new RetryException("Non-skippable exception in recoverer", e); 544 } 545 else { 546 if (e instanceof Exception) { 547 throw (Exception) e; 548 } 549 else if (e instanceof Error) { 550 throw (Error) e; 551 } 552 else { 553 throw new RetryException("Non-skippable throwable in recoverer", e); 554 } 555 } 556 } 557 } 558 559 private void scan(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs, 560 ChunkMonitor chunkMonitor, boolean recovery) throws Exception { 561 562 @SuppressWarnings("unchecked") 563 final UserData<O> data = (UserData<O>) inputs.getUserData(); 564 565 if (logger.isDebugEnabled()) { 566 if (recovery) { 567 logger.debug("Scanning for failed item on recovery from write: " + inputs); 568 } 569 else { 570 logger.debug("Scanning for failed item on write: " + inputs); 571 } 572 } 573 if (outputs.isEmpty() || inputs.isEmpty()) { 574 data.scanning(false); 575 inputs.setBusy(false); 576 chunkMonitor.resetOffset(); 577 return; 578 } 579 580 Chunk<I>.ChunkIterator inputIterator = inputs.iterator(); 581 Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); 582 583 if (!inputs.getSkips().isEmpty() && inputs.getItems().size() != outputs.getItems().size()) { 584 if (outputIterator.hasNext()) { 585 outputIterator.remove(); 586 return; 587 } 588 } 589 590 List<O> items = Collections.singletonList(outputIterator.next()); 591 inputIterator.next(); 592 try { 593 writeItems(items); 594 // If successful we are going to return and allow 595 // the driver to commit... 596 doAfterWrite(items); 597 contribution.incrementWriteCount(1); 598 inputIterator.remove(); 599 outputIterator.remove(); 600 } 601 catch (Exception e) { 602 try { 603 doOnWriteError(e, items); 604 } 605 finally { 606 Throwable cause = e; 607 if(e instanceof StepListenerFailedException) { 608 cause = e.getCause(); 609 } 610 611 if (!shouldSkip(itemWriteSkipPolicy, cause, -1) && !rollbackClassifier.classify(cause)) { 612 inputIterator.remove(); 613 outputIterator.remove(); 614 } 615 else { 616 checkSkipPolicy(inputIterator, outputIterator, cause, contribution, recovery); 617 } 618 if (rollbackClassifier.classify(cause)) { 619 throw (Exception) cause; 620 } 621 } 622 } 623 chunkMonitor.incrementOffset(); 624 if (outputs.isEmpty()) { 625 data.scanning(false); 626 inputs.setBusy(false); 627 chunkMonitor.resetOffset(); 628 } 629 } 630 631 private static class UserData<O> { 632 633 private Chunk<O> outputs; 634 635 private int filterCount = 0; 636 637 private boolean scanning; 638 639 public boolean scanning() { 640 return scanning; 641 } 642 643 public void scanning(boolean scanning) { 644 this.scanning = scanning; 645 } 646 647 public void incrementFilterCount() { 648 filterCount++; 649 } 650 651 public Chunk<O> getOutputs() { 652 return outputs; 653 } 654 655 public void setOutputs(Chunk<O> outputs) { 656 this.outputs = outputs; 657 } 658 659 } 660 661}