001/*
002 * Copyright 2006-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.step.item;
018
019import java.util.ArrayList;
020import java.util.Collections;
021import java.util.Iterator;
022import java.util.List;
023import java.util.concurrent.atomic.AtomicInteger;
024import java.util.concurrent.atomic.AtomicReference;
025
026import org.apache.commons.logging.Log;
027import org.apache.commons.logging.LogFactory;
028
029import org.springframework.batch.core.StepContribution;
030import org.springframework.batch.core.listener.StepListenerFailedException;
031import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
032import org.springframework.batch.core.step.skip.NonSkippableProcessException;
033import org.springframework.batch.core.step.skip.SkipLimitExceededException;
034import org.springframework.batch.core.step.skip.SkipListenerFailedException;
035import org.springframework.batch.core.step.skip.SkipPolicy;
036import org.springframework.batch.item.ItemProcessor;
037import org.springframework.batch.item.ItemWriter;
038import org.springframework.classify.BinaryExceptionClassifier;
039import org.springframework.classify.Classifier;
040import org.springframework.retry.ExhaustedRetryException;
041import org.springframework.retry.RecoveryCallback;
042import org.springframework.retry.RetryCallback;
043import org.springframework.retry.RetryContext;
044import org.springframework.retry.RetryException;
045import org.springframework.retry.support.DefaultRetryState;
046
047/**
048 * FaultTolerant implementation of the {@link ChunkProcessor} interface, that
049 * allows for skipping or retry of items that cause exceptions during writing.
050 *
051 */
052public class FaultTolerantChunkProcessor<I, O> extends SimpleChunkProcessor<I, O> {
053
054        private SkipPolicy itemProcessSkipPolicy = new LimitCheckingItemSkipPolicy();
055
056        private SkipPolicy itemWriteSkipPolicy = new LimitCheckingItemSkipPolicy();
057
058        private final BatchRetryTemplate batchRetryTemplate;
059
060        private Classifier<Throwable, Boolean> rollbackClassifier = new BinaryExceptionClassifier(true);
061
062        private Log logger = LogFactory.getLog(getClass());
063
064        private boolean buffering = true;
065
066        private KeyGenerator keyGenerator;
067
068        private ChunkMonitor chunkMonitor = new ChunkMonitor();
069
070        private boolean processorTransactional = true;
071
072        /**
073         * The {@link KeyGenerator} to use to identify failed items across rollback.
074         * Not used in the case of the {@link #setBuffering(boolean) buffering flag}
075         * being true (the default).
076         *
077         * @param keyGenerator the {@link KeyGenerator} to set
078         */
079        public void setKeyGenerator(KeyGenerator keyGenerator) {
080                this.keyGenerator = keyGenerator;
081        }
082
083        /**
084         * @param SkipPolicy the {@link SkipPolicy} for item processing
085         */
086        public void setProcessSkipPolicy(SkipPolicy SkipPolicy) {
087                this.itemProcessSkipPolicy = SkipPolicy;
088        }
089
090        /**
091         * @param SkipPolicy the {@link SkipPolicy} for item writing
092         */
093        public void setWriteSkipPolicy(SkipPolicy SkipPolicy) {
094                this.itemWriteSkipPolicy = SkipPolicy;
095        }
096
097        /**
098         * A classifier that can distinguish between exceptions that cause rollback
099         * (return true) or not (return false).
100         *
101         * @param rollbackClassifier classifier
102         */
103        public void setRollbackClassifier(Classifier<Throwable, Boolean> rollbackClassifier) {
104                this.rollbackClassifier = rollbackClassifier;
105        }
106
107        /**
108         * @param chunkMonitor monitor
109         */
110        public void setChunkMonitor(ChunkMonitor chunkMonitor) {
111                this.chunkMonitor = chunkMonitor;
112        }
113
114        /**
115         * A flag to indicate that items have been buffered and therefore will
116         * always come back as a chunk after a rollback. Otherwise things are more
117         * complicated because after a rollback the new chunk might or might not
118         * contain items from the previous failed chunk.
119         *
120         * @param buffering true if items will be buffered
121         */
122        public void setBuffering(boolean buffering) {
123                this.buffering = buffering;
124        }
125
126        /**
127         * Flag to say that the {@link ItemProcessor} is transactional (defaults to
128         * true). If false then the processor is only called once per item per
129         * chunk, even if there are rollbacks with retries and skips.
130         *
131         * @param processorTransactional the flag value to set
132         */
133        public void setProcessorTransactional(boolean processorTransactional) {
134                this.processorTransactional = processorTransactional;
135        }
136
137        public FaultTolerantChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor,
138                        ItemWriter<? super O> itemWriter, BatchRetryTemplate batchRetryTemplate) {
139                super(itemProcessor, itemWriter);
140                this.batchRetryTemplate = batchRetryTemplate;
141        }
142
143        @Override
144        protected void initializeUserData(Chunk<I> inputs) {
145                @SuppressWarnings("unchecked")
146                UserData<O> data = (UserData<O>) inputs.getUserData();
147                if (data == null) {
148                        data = new UserData<O>();
149                        inputs.setUserData(data);
150                        data.setOutputs(new Chunk<O>());
151                }
152                else {
153                        // BATCH-2663: re-initialize filter count when scanning the chunk
154                        if (data.scanning()) {
155                                data.filterCount = 0;
156                        }
157                }
158        }
159
160        @Override
161        protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) {
162                @SuppressWarnings("unchecked")
163                UserData<O> data = (UserData<O>) inputs.getUserData();
164                return data.filterCount;
165        }
166
167        @Override
168        protected boolean isComplete(Chunk<I> inputs) {
169
170                /*
171                 * Need to remember the write skips across transactions, otherwise they
172                 * keep coming back. Since we register skips with the inputs they will
173                 * not be processed again but the output skips need to be saved for
174                 * registration later with the listeners. The inputs are going to be the
175                 * same for all transactions processing the same chunk, but the outputs
176                 * are not, so we stash them in user data on the inputs.
177                 */
178
179                @SuppressWarnings("unchecked")
180                UserData<O> data = (UserData<O>) inputs.getUserData();
181                Chunk<O> previous = data.getOutputs();
182
183                return inputs.isEmpty() && previous.getSkips().isEmpty();
184
185        }
186
187        @Override
188        protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) {
189
190                @SuppressWarnings("unchecked")
191                UserData<O> data = (UserData<O>) inputs.getUserData();
192                Chunk<O> previous = data.getOutputs();
193
194                Chunk<O> next = new Chunk<O>(outputs.getItems(), previous.getSkips());
195                next.setBusy(previous.isBusy());
196
197                // Remember for next time if there are skips accumulating
198                data.setOutputs(next);
199
200                return next;
201
202        }
203
204        @Override
205        protected Chunk<O> transform(final StepContribution contribution, Chunk<I> inputs) throws Exception {
206
207                Chunk<O> outputs = new Chunk<O>();
208                @SuppressWarnings("unchecked")
209                final UserData<O> data = (UserData<O>) inputs.getUserData();
210                final Chunk<O> cache = data.getOutputs();
211                final Iterator<O> cacheIterator = cache.isEmpty() ? null : new ArrayList<O>(cache.getItems()).iterator();
212                final AtomicInteger count = new AtomicInteger(0);
213
214                // final int scanLimit = processorTransactional && data.scanning() ? 1 :
215                // 0;
216
217                for (final Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
218
219                        final I item = iterator.next();
220
221                        RetryCallback<O, Exception> retryCallback = new RetryCallback<O, Exception>() {
222
223                                @Override
224                                public O doWithRetry(RetryContext context) throws Exception {
225                                        O output = null;
226                                        try {
227                                                count.incrementAndGet();
228                                                O cached = (cacheIterator != null && cacheIterator.hasNext()) ? cacheIterator.next() : null;
229                                                if (cached != null && !processorTransactional) {
230                                                        output = cached;
231                                                }
232                                                else {
233                                                        output = doProcess(item);
234                                                        if (output == null) {
235                                                                data.incrementFilterCount();
236                                                        } else if (!processorTransactional && !data.scanning()) {
237                                                                cache.add(output);
238                                                        }
239                                                }
240                                        }
241                                        catch (Exception e) {
242                                                if (rollbackClassifier.classify(e)) {
243                                                        // Default is to rollback unless the classifier
244                                                        // allows us to continue
245                                                        throw e;
246                                                }
247                                                else if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) {
248                                                        // If we are not re-throwing then we should check if
249                                                        // this is skippable
250                                                        contribution.incrementProcessSkipCount();
251                                                        logger.debug("Skipping after failed process with no rollback", e);
252                                                        // If not re-throwing then the listener will not be
253                                                        // called in next chunk.
254                                                        callProcessSkipListener(item, e);
255                                                }
256                                                else {
257                                                        // If it's not skippable that's an error in
258                                                        // configuration - it doesn't make sense to not roll
259                                                        // back if we are also not allowed to skip
260                                                        throw new NonSkippableProcessException(
261                                                                        "Non-skippable exception in processor.  Make sure any exceptions that do not cause a rollback are skippable.",
262                                                                        e);
263                                                }
264                                        }
265                                        if (output == null) {
266                                                // No need to re-process filtered items
267                                                iterator.remove();
268                                        }
269                                        return output;
270                                }
271
272                        };
273
274                        RecoveryCallback<O> recoveryCallback = new RecoveryCallback<O>() {
275
276                                @Override
277                                public O recover(RetryContext context) throws Exception {
278                                        Throwable e = context.getLastThrowable();
279                                        if (shouldSkip(itemProcessSkipPolicy, e, contribution.getStepSkipCount())) {
280                                                iterator.remove(e);
281                                                contribution.incrementProcessSkipCount();
282                                                logger.debug("Skipping after failed process", e);
283                                                return null;
284                                        }
285                                        else {
286                                                if (rollbackClassifier.classify(e)) {
287                                                        // Default is to rollback unless the classifier
288                                                        // allows us to continue
289                                                        throw new RetryException("Non-skippable exception in recoverer while processing", e);
290                                                }
291                                                iterator.remove(e);
292                                                return null;
293                                        }
294                                }
295
296                        };
297
298                        O output = batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(
299                                        getInputKey(item), rollbackClassifier));
300                        if (output != null) {
301                                outputs.add(output);
302                        }
303
304                        /*
305                         * We only want to process the first item if there is a scan for a
306                         * failed item.
307                         */
308                        if (data.scanning()) {
309                                while (cacheIterator != null && cacheIterator.hasNext()) {
310                                        outputs.add(cacheIterator.next());
311                                }
312                                // Only process the first item if scanning
313                                break;
314                        }
315                }
316
317                return outputs;
318
319        }
320
321        @Override
322        protected void write(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs)
323                        throws Exception {
324                @SuppressWarnings("unchecked")
325                final UserData<O> data = (UserData<O>) inputs.getUserData();
326                final AtomicReference<RetryContext> contextHolder = new AtomicReference<RetryContext>();
327
328                RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() {
329                        @Override
330                        public Object doWithRetry(RetryContext context) throws Exception {
331                                contextHolder.set(context);
332
333                                if (!data.scanning()) {
334                                        chunkMonitor.setChunkSize(inputs.size());
335                                        try {
336                                                doWrite(outputs.getItems());
337                                        }
338                                        catch (Exception e) {
339                                                if (rollbackClassifier.classify(e)) {
340                                                        throw e;
341                                                }
342                                                /*
343                                                 * If the exception is marked as no-rollback, we need to
344                                                 * override that, otherwise there's no way to write the
345                                                 * rest of the chunk or to honour the skip listener
346                                                 * contract.
347                                                 */
348                                                throw new ForceRollbackForWriteSkipException(
349                                                                "Force rollback on skippable exception so that skipped item can be located.", e);
350                                        }
351                                        contribution.incrementWriteCount(outputs.size());
352                                }
353                                else {
354                                        scan(contribution, inputs, outputs, chunkMonitor, false);
355                                }
356                                return null;
357
358                        }
359                };
360
361                if (!buffering) {
362
363                        RecoveryCallback<Object> batchRecoveryCallback = new RecoveryCallback<Object>() {
364
365                                @Override
366                                public Object recover(RetryContext context) throws Exception {
367
368                                        Throwable e = context.getLastThrowable();
369                                        if (outputs.size() > 1 && !rollbackClassifier.classify(e)) {
370                                                throw new RetryException("Invalid retry state during write caused by "
371                                                                + "exception that does not classify for rollback: ", e);
372                                        }
373
374                                        Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
375                                        for (Chunk<O>.ChunkIterator outputIterator = outputs.iterator(); outputIterator.hasNext();) {
376
377                                                inputIterator.next();
378                                                outputIterator.next();
379
380                                                checkSkipPolicy(inputIterator, outputIterator, e, contribution, true);
381                                                if (!rollbackClassifier.classify(e)) {
382                                                        throw new RetryException(
383                                                                        "Invalid retry state during recovery caused by exception that does not classify for rollback: ",
384                                                                        e);
385                                                }
386
387                                        }
388
389                                        return null;
390
391                                }
392
393                        };
394
395                        batchRetryTemplate.execute(retryCallback, batchRecoveryCallback,
396                                        BatchRetryTemplate.createState(getInputKeys(inputs), rollbackClassifier));
397
398                }
399                else {
400
401                        RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {
402
403                                @Override
404                                public Object recover(RetryContext context) throws Exception {
405                                        /*
406                                         * If the last exception was not skippable we don't need to
407                                         * do any scanning. We can just bomb out with a retry
408                                         * exhausted.
409                                         */
410                                        if (!shouldSkip(itemWriteSkipPolicy, context.getLastThrowable(), -1)) {
411                                                throw new ExhaustedRetryException(
412                                                                "Retry exhausted after last attempt in recovery path, but exception is not skippable.",
413                                                                context.getLastThrowable());
414                                        }
415
416                                        inputs.setBusy(true);
417                                        data.scanning(true);
418                                        scan(contribution, inputs, outputs, chunkMonitor, true);
419                                        return null;
420                                }
421
422                        };
423
424                        if (logger.isDebugEnabled()) {
425                                logger.debug("Attempting to write: " + inputs);
426                        }
427                        try {
428                                batchRetryTemplate.execute(retryCallback, recoveryCallback, new DefaultRetryState(inputs,
429                                                rollbackClassifier));
430                        }
431                        catch (Exception e) {
432                                RetryContext context = contextHolder.get();
433                                if (!batchRetryTemplate.canRetry(context)) {
434                                        /*
435                                         * BATCH-1761: we need advance warning of the scan about to
436                                         * start in the next transaction, so we can change the
437                                         * processing behaviour.
438                                         */
439                                        data.scanning(true);
440                                }
441                                throw e;
442                        }
443
444                }
445
446                callSkipListeners(inputs, outputs);
447
448        }
449
450        private void callSkipListeners(final Chunk<I> inputs, final Chunk<O> outputs) {
451
452                for (SkipWrapper<I> wrapper : inputs.getSkips()) {
453                        I item = wrapper.getItem();
454                        if (item == null) {
455                                continue;
456                        }
457                        Throwable e = wrapper.getException();
458                        callProcessSkipListener(item, e);
459                }
460
461                for (SkipWrapper<O> wrapper : outputs.getSkips()) {
462                        Throwable e = wrapper.getException();
463                        try {
464                                getListener().onSkipInWrite(wrapper.getItem(), e);
465                        }
466                        catch (RuntimeException ex) {
467                                throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e);
468                        }
469                }
470
471                // Clear skips if we are possibly going to process this chunk again
472                outputs.clearSkips();
473                inputs.clearSkips();
474
475        }
476
477        /**
478         * Convenience method for calling process skip listener, so that it can be
479         * called from multiple places.
480         *
481         * @param item the item that is skipped
482         * @param e the cause of the skip
483         */
484        private void callProcessSkipListener(I item, Throwable e) {
485                try {
486                        getListener().onSkipInProcess(item, e);
487                }
488                catch (RuntimeException ex) {
489                        throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e);
490                }
491        }
492
493        /**
494         * Convenience method for calling process skip policy, so that it can be
495         * called from multiple places.
496         *
497         * @param policy the skip policy
498         * @param e the cause of the skip
499         * @param skipCount the current skip count
500         */
501        private boolean shouldSkip(SkipPolicy policy, Throwable e, int skipCount) {
502                try {
503                        return policy.shouldSkip(e, skipCount);
504                }
505                catch (SkipLimitExceededException ex) {
506                        throw ex;
507                }
508                catch (RuntimeException ex) {
509                        throw new SkipListenerFailedException("Fatal exception in SkipPolicy.", ex, e);
510                }
511        }
512
513        private Object getInputKey(I item) {
514                if (keyGenerator == null) {
515                        return item;
516                }
517                return keyGenerator.getKey(item);
518        }
519
520        private List<?> getInputKeys(final Chunk<I> inputs) {
521                if (keyGenerator == null) {
522                        return inputs.getItems();
523                }
524                List<Object> keys = new ArrayList<Object>();
525                for (I item : inputs.getItems()) {
526                        keys.add(keyGenerator.getKey(item));
527                }
528                return keys;
529        }
530
531        private void checkSkipPolicy(Chunk<I>.ChunkIterator inputIterator, Chunk<O>.ChunkIterator outputIterator,
532                        Throwable e, StepContribution contribution, boolean recovery) throws Exception {
533                logger.debug("Checking skip policy after failed write");
534                if (shouldSkip(itemWriteSkipPolicy, e, contribution.getStepSkipCount())) {
535                        contribution.incrementWriteSkipCount();
536                        inputIterator.remove();
537                        outputIterator.remove(e);
538                        logger.debug("Skipping after failed write", e);
539                }
540                else {
541                        if (recovery) {
542                                // Only if already recovering should we check skip policy
543                                throw new RetryException("Non-skippable exception in recoverer", e);
544                        }
545                        else {
546                                if (e instanceof Exception) {
547                                        throw (Exception) e;
548                                }
549                                else if (e instanceof Error) {
550                                        throw (Error) e;
551                                }
552                                else {
553                                        throw new RetryException("Non-skippable throwable in recoverer", e);
554                                }
555                        }
556                }
557        }
558
559        private void scan(final StepContribution contribution, final Chunk<I> inputs, final Chunk<O> outputs,
560                        ChunkMonitor chunkMonitor, boolean recovery) throws Exception {
561
562                @SuppressWarnings("unchecked")
563                final UserData<O> data = (UserData<O>) inputs.getUserData();
564
565                if (logger.isDebugEnabled()) {
566                        if (recovery) {
567                                logger.debug("Scanning for failed item on recovery from write: " + inputs);
568                        }
569                        else {
570                                logger.debug("Scanning for failed item on write: " + inputs);
571                        }
572                }
573                if (outputs.isEmpty() || inputs.isEmpty()) {
574                        data.scanning(false);
575                        inputs.setBusy(false);
576                        chunkMonitor.resetOffset();
577                        return;
578                }
579
580                Chunk<I>.ChunkIterator inputIterator = inputs.iterator();
581                Chunk<O>.ChunkIterator outputIterator = outputs.iterator();
582
583                if (!inputs.getSkips().isEmpty() && inputs.getItems().size() != outputs.getItems().size()) {
584                        if (outputIterator.hasNext()) {
585                                outputIterator.remove();
586                                return;
587                        }
588                }
589
590                List<O> items = Collections.singletonList(outputIterator.next());
591                inputIterator.next();
592                try {
593                        writeItems(items);
594                        // If successful we are going to return and allow
595                        // the driver to commit...
596                        doAfterWrite(items);
597                        contribution.incrementWriteCount(1);
598                        inputIterator.remove();
599                        outputIterator.remove();
600                }
601                catch (Exception e) {
602                        try {
603                                doOnWriteError(e, items);
604                        }
605                        finally {
606                                Throwable cause = e;
607                                if(e instanceof StepListenerFailedException) {
608                                        cause = e.getCause();
609                                }
610
611                                if (!shouldSkip(itemWriteSkipPolicy, cause, -1) && !rollbackClassifier.classify(cause)) {
612                                        inputIterator.remove();
613                                        outputIterator.remove();
614                                }
615                                else {
616                                        checkSkipPolicy(inputIterator, outputIterator, cause, contribution, recovery);
617                                }
618                                if (rollbackClassifier.classify(cause)) {
619                                        throw (Exception) cause;
620                                }
621                        }
622                }
623                chunkMonitor.incrementOffset();
624                if (outputs.isEmpty()) {
625                        data.scanning(false);
626                        inputs.setBusy(false);
627                        chunkMonitor.resetOffset();
628                }
629        }
630
631        private static class UserData<O> {
632
633                private Chunk<O> outputs;
634
635                private int filterCount = 0;
636
637                private boolean scanning;
638
639                public boolean scanning() {
640                        return scanning;
641                }
642
643                public void scanning(boolean scanning) {
644                        this.scanning = scanning;
645                }
646
647                public void incrementFilterCount() {
648                        filterCount++;
649                }
650
651                public Chunk<O> getOutputs() {
652                        return outputs;
653                }
654
655                public void setOutputs(Chunk<O> outputs) {
656                        this.outputs = outputs;
657                }
658
659        }
660
661}