001/*
002 * Copyright 2006-2007 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.step.item;
018
019import org.springframework.batch.core.StepContribution;
020import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
021import org.springframework.batch.core.step.skip.NonSkippableReadException;
022import org.springframework.batch.core.step.skip.SkipException;
023import org.springframework.batch.core.step.skip.SkipListenerFailedException;
024import org.springframework.batch.core.step.skip.SkipPolicy;
025import org.springframework.batch.core.step.skip.SkipPolicyFailedException;
026import org.springframework.batch.item.ItemReader;
027import org.springframework.batch.repeat.RepeatOperations;
028import org.springframework.classify.BinaryExceptionClassifier;
029import org.springframework.classify.Classifier;
030
031/**
032 * FaultTolerant implementation of the {@link ChunkProcessor} interface, that
033 * allows for skipping or retry of items that cause exceptions during reading or
034 * processing.
035 * 
036 */
037public class FaultTolerantChunkProvider<I> extends SimpleChunkProvider<I> {
038
039        /**
040         * Hard limit for number of read skips in the same chunk. Should be
041         * sufficiently high that it is only encountered in a runaway step where all
042         * items are skipped before the chunk can complete (leading to a potential
043         * heap memory problem).
044         */
045        public static final int DEFAULT_MAX_SKIPS_ON_READ = 100;
046
047        private SkipPolicy skipPolicy = new LimitCheckingItemSkipPolicy();
048
049        private Classifier<Throwable, Boolean> rollbackClassifier = new BinaryExceptionClassifier(true);
050
051        private int maxSkipsOnRead = DEFAULT_MAX_SKIPS_ON_READ;
052
053        public FaultTolerantChunkProvider(ItemReader<? extends I> itemReader, RepeatOperations repeatOperations) {
054                super(itemReader, repeatOperations);
055        }
056
057        /**
058         * @param maxSkipsOnRead the maximum number of skips on read
059         */
060        public void setMaxSkipsOnRead(int maxSkipsOnRead) {
061                this.maxSkipsOnRead = maxSkipsOnRead;
062        }
063
064        /**
065         * The policy that determines whether exceptions can be skipped on read.
066         * @param skipPolicy instance of {@link SkipPolicy} to be used by FaultTolerantChunkProvider.
067         */
068        public void setSkipPolicy(SkipPolicy skipPolicy) {
069                this.skipPolicy = skipPolicy;
070        }
071
072        /**
073         * Classifier to determine whether exceptions have been marked as
074         * no-rollback (as opposed to skippable). If encountered they are simply
075         * ignored, unless also skippable.
076         * 
077         * @param rollbackClassifier the rollback classifier to set
078         */
079        public void setRollbackClassifier(Classifier<Throwable, Boolean> rollbackClassifier) {
080                this.rollbackClassifier = rollbackClassifier;
081        }
082
083        @Override
084        protected I read(StepContribution contribution, Chunk<I> chunk) throws Exception {
085                while (true) {
086                        try {
087                                return doRead();
088                        }
089                        catch (Exception e) {
090
091                                if (shouldSkip(skipPolicy, e, contribution.getStepSkipCount())) {
092
093                                        // increment skip count and try again
094                                        contribution.incrementReadSkipCount();
095                                        chunk.skip(e);
096
097                                        if (chunk.getErrors().size() >= maxSkipsOnRead) {
098                                                throw new SkipOverflowException("Too many skips on read");
099                                        }
100
101                                        logger.debug("Skipping failed input", e);
102                                }
103                                else {
104                                        if (rollbackClassifier.classify(e)) {
105                                                throw new NonSkippableReadException("Non-skippable exception during read", e);
106                                        }
107                                        logger.debug("No-rollback for non-skippable exception (ignored)", e);
108                                }
109
110                        }
111                }
112        }
113
114        @Override
115        public void postProcess(StepContribution contribution, Chunk<I> chunk) {
116                for (Exception e : chunk.getErrors()) {
117                        try {
118                                getListener().onSkipInRead(e);
119                        }
120                        catch (RuntimeException ex) {
121                                throw new SkipListenerFailedException("Fatal exception in SkipListener.", ex, e);
122                        }
123                }
124        }
125
126        /**
127         * Convenience method for calling process skip policy.
128         * 
129         * @param policy the skip policy
130         * @param e the cause of the skip
131         * @param skipCount the current skip count
132         */
133        private boolean shouldSkip(SkipPolicy policy, Throwable e, int skipCount) {
134                try {
135                        return policy.shouldSkip(e, skipCount);
136                }
137                catch (SkipException ex) {
138                        throw ex;
139                }
140                catch (RuntimeException ex) {
141                        throw new SkipPolicyFailedException("Fatal exception in SkipPolicy.", ex, e);
142                }
143        }
144
145}