001/*
002 * Copyright 2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.jsr.step.item;
017
018import org.apache.commons.logging.Log;
019import org.apache.commons.logging.LogFactory;
020import org.springframework.batch.core.StepContribution;
021import org.springframework.batch.core.StepListener;
022import org.springframework.batch.core.listener.MulticasterBatchListener;
023import org.springframework.batch.core.step.item.BatchRetryTemplate;
024import org.springframework.batch.core.step.item.Chunk;
025import org.springframework.batch.core.step.item.ChunkMonitor;
026import org.springframework.batch.core.step.item.ForceRollbackForWriteSkipException;
027import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
028import org.springframework.batch.core.step.skip.SkipException;
029import org.springframework.batch.core.step.skip.SkipPolicy;
030import org.springframework.batch.core.step.skip.SkipPolicyFailedException;
031import org.springframework.batch.item.ItemProcessor;
032import org.springframework.batch.item.ItemReader;
033import org.springframework.batch.item.ItemWriter;
034import org.springframework.batch.repeat.RepeatOperations;
035import org.springframework.classify.BinaryExceptionClassifier;
036import org.springframework.classify.Classifier;
037import org.springframework.retry.RecoveryCallback;
038import org.springframework.retry.RetryCallback;
039import org.springframework.retry.RetryContext;
040import org.springframework.retry.RetryException;
041import org.springframework.util.Assert;
042
043import javax.batch.operations.BatchRuntimeException;
044import java.util.List;
045
046/**
047 * Extension of the {@link JsrChunkProcessor} that adds skip and retry functionality.
048 *
049 * @author Michael Minella
050 * @author Chris Schaefer
051 *
052 * @param <I> input type for the step
053 * @param <O> output type for the step
054 */
055public class JsrFaultTolerantChunkProcessor<I,O> extends JsrChunkProcessor<I, O> {
056        protected final Log logger = LogFactory.getLog(getClass());
057        private SkipPolicy skipPolicy = new LimitCheckingItemSkipPolicy();
058        private Classifier<Throwable, Boolean> rollbackClassifier = new BinaryExceptionClassifier(true);
059        private final BatchRetryTemplate batchRetryTemplate;
060        private ChunkMonitor chunkMonitor = new ChunkMonitor();
061        private boolean hasProcessor = false;
062
063        public JsrFaultTolerantChunkProcessor(ItemReader<? extends I> reader, ItemProcessor<? super I, ? extends O> processor, ItemWriter<? super O> writer, RepeatOperations repeatTemplate, BatchRetryTemplate batchRetryTemplate) {
064                super(reader, processor, writer, repeatTemplate);
065                hasProcessor = processor != null;
066                this.batchRetryTemplate = batchRetryTemplate;
067        }
068
069        /**
070         * @param skipPolicy a {@link SkipPolicy}
071         */
072        public void setSkipPolicy(SkipPolicy skipPolicy) {
073                Assert.notNull(skipPolicy, "A skip policy is required");
074
075                this.skipPolicy = skipPolicy;
076        }
077
078        /**
079         * @param rollbackClassifier a {@link Classifier}
080         */
081        public void setRollbackClassifier(Classifier<Throwable, Boolean> rollbackClassifier) {
082                Assert.notNull(rollbackClassifier, "A rollbackClassifier is required");
083
084                this.rollbackClassifier = rollbackClassifier;
085        }
086
087        /**
088         * @param chunkMonitor a {@link ChunkMonitor}
089         */
090        public void setChunkMonitor(ChunkMonitor chunkMonitor) {
091                Assert.notNull(chunkMonitor, "A chunkMonitor is required");
092
093                this.chunkMonitor = chunkMonitor;
094        }
095
096        /**
097         * Register some {@link StepListener}s with the handler. Each will get the
098         * callbacks in the order specified at the correct stage.
099         *
100         * @param listeners listeners to be registered
101         */
102        @Override
103        public void setListeners(List<? extends StepListener> listeners) {
104                for (StepListener listener : listeners) {
105                        registerListener(listener);
106                }
107        }
108
109        /**
110         * Register a listener for callbacks at the appropriate stages in a process.
111         *
112         * @param listener a {@link StepListener}
113         */
114        @Override
115        public void registerListener(StepListener listener) {
116                getListener().register(listener);
117        }
118
119        /**
120         * Adds retry and skip logic to the reading phase of the chunk loop.
121         *
122         * @param contribution a {@link StepContribution}
123         * @param chunk a {@link Chunk}
124         * @return I an item
125         * @throws Exception thrown if error occurs.
126         */
127        @Override
128        protected I provide(final StepContribution contribution, final Chunk<I> chunk) throws Exception {
129                RetryCallback<I, Exception> retryCallback = new RetryCallback<I, Exception>() {
130
131                        @Override
132                        public I doWithRetry(RetryContext arg0) throws Exception {
133                                while (true) {
134                                        try {
135                                                return doProvide(contribution, chunk);
136                                        }
137                                        catch (Exception e) {
138                                                if (shouldSkip(skipPolicy, e, contribution.getStepSkipCount())) {
139
140                                                        // increment skip count and try again
141                                                        contribution.incrementReadSkipCount();
142                                                        chunk.skip(e);
143
144                                                        getListener().onSkipInRead(e);
145
146                                                        logger.debug("Skipping failed input", e);
147                                                }
148                                                else {
149                                                        getListener().onRetryReadException(e);
150
151                                                        if(rollbackClassifier.classify(e)) {
152                                                                throw e;
153                                                        }
154                                                        else {
155                                                                throw e;
156                                                        }
157                                                }
158                                        }
159                                }
160                        }
161                };
162
163                RecoveryCallback<I> recoveryCallback = new RecoveryCallback<I>() {
164
165                        @Override
166                        public I recover(RetryContext context) throws Exception {
167                                Throwable e = context.getLastThrowable();
168                                if (shouldSkip(skipPolicy, e, contribution.getStepSkipCount())) {
169                                        contribution.incrementReadSkipCount();
170                                        logger.debug("Skipping after failed process", e);
171                                        return null;
172                                }
173                                else {
174                                        if (rollbackClassifier.classify(e)) {
175                                                // Default is to rollback unless the classifier
176                                                // allows us to continue
177                                                throw new RetryException("Non-skippable exception in recoverer while reading", e);
178                                        }
179
180                                        throw new BatchRuntimeException(e);
181                                }
182                        }
183
184                };
185
186                return batchRetryTemplate.execute(retryCallback, recoveryCallback);
187        }
188
189        /**
190         * Convenience method for calling process skip policy.
191         *
192         * @param policy the skip policy
193         * @param e the cause of the skip
194         * @param skipCount the current skip count
195         */
196        private boolean shouldSkip(SkipPolicy policy, Throwable e, int skipCount) {
197                try {
198                        return policy.shouldSkip(e, skipCount);
199                }
200                catch (SkipException ex) {
201                        throw ex;
202                }
203                catch (RuntimeException ex) {
204                        throw new SkipPolicyFailedException("Fatal exception in SkipPolicy.", ex, e);
205                }
206        }
207
208        /**
209         * Adds retry and skip logic to the process phase of the chunk loop.
210         *
211         * @param contribution a {@link StepContribution}
212         * @param item an item to be processed
213         * @return O an item that has been processed if a processor is available
214         * @throws Exception thrown if error occurs.
215         */
216        @Override
217        @SuppressWarnings("unchecked")
218        protected O transform(final StepContribution contribution, final I item) throws Exception {
219                if (!hasProcessor) {
220                        return (O) item;
221                }
222
223                RetryCallback<O, Exception> retryCallback = new RetryCallback<O, Exception>() {
224
225                        @Override
226                        public O doWithRetry(RetryContext context) throws Exception {
227                                try {
228                                        return doTransform(item);
229                                }
230                                catch (Exception e) {
231                                        if (shouldSkip(skipPolicy, e, contribution.getStepSkipCount())) {
232                                                // If we are not re-throwing then we should check if
233                                                // this is skippable
234                                                contribution.incrementProcessSkipCount();
235                                                logger.debug("Skipping after failed process with no rollback", e);
236                                                // If not re-throwing then the listener will not be
237                                                // called in next chunk.
238                                                getListener().onSkipInProcess(item, e);
239                                        } else {
240                                                getListener().onRetryProcessException(item, e);
241
242                                                if (rollbackClassifier.classify(e)) {
243                                                        // Default is to rollback unless the classifier
244                                                        // allows us to continue
245                                                        throw e;
246                                                }
247                                                else {
248                                                        throw e;
249                                                }
250                                        }
251                                }
252                                return null;
253                        }
254
255                };
256
257                RecoveryCallback<O> recoveryCallback = new RecoveryCallback<O>() {
258                        @Override
259                        public O recover(RetryContext context) throws Exception {
260                                Throwable e = context.getLastThrowable();
261                                if (shouldSkip(skipPolicy, e, contribution.getStepSkipCount())) {
262                                        contribution.incrementProcessSkipCount();
263                                        logger.debug("Skipping after failed process", e);
264                                        return null;
265                                }
266                                else {
267                                        if (rollbackClassifier.classify(e)) {
268                                                // Default is to rollback unless the classifier
269                                                // allows us to continue
270                                                throw new RetryException("Non-skippable exception in recoverer while processing", e);
271                                        }
272
273                                        throw new BatchRuntimeException(e);
274                                }
275                        }
276                };
277
278                return batchRetryTemplate.execute(retryCallback, recoveryCallback);
279        }
280
281        /**
282         * Adds retry and skip logic to the write phase of the chunk loop.
283         *
284         * @param contribution a {@link StepContribution}
285         * @param chunk a {@link Chunk}
286         * @throws Exception thrown if error occurs.
287         */
288        @Override
289        protected void persist(final StepContribution contribution, final Chunk<O> chunk) throws Exception {
290
291                RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() {
292                        @Override
293                        @SuppressWarnings({ "unchecked", "rawtypes" })
294                        public Object doWithRetry(RetryContext context) throws Exception {
295
296                                chunkMonitor.setChunkSize(chunk.size());
297                                try {
298                                        doPersist(contribution, chunk);
299                                }
300                                catch (Exception e) {
301                                        if (shouldSkip(skipPolicy, e, contribution.getStepSkipCount())) {
302                                                // Per section 9.2.7 of JSR-352, the SkipListener receives all the items within the chunk                                                
303                                                ((MulticasterBatchListener) getListener()).onSkipInWrite(chunk.getItems(), e);
304                                        } else {
305                                                getListener().onRetryWriteException((List<Object>) chunk.getItems(), e);
306
307                                                if (rollbackClassifier.classify(e)) {
308                                                        throw e;
309                                                }
310                                        }
311                                        /*
312                                         * If the exception is marked as no-rollback, we need to
313                                         * override that, otherwise there's no way to write the
314                                         * rest of the chunk or to honour the skip listener
315                                         * contract.
316                                         */
317                                        throw new ForceRollbackForWriteSkipException(
318                                                        "Force rollback on skippable exception so that skipped item can be located.", e);
319                                }
320                                contribution.incrementWriteCount(chunk.size());
321                                return null;
322
323                        }
324                };
325
326                RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() {
327
328                        @Override
329                        public O recover(RetryContext context) throws Exception {
330                                Throwable e = context.getLastThrowable();
331                                if (shouldSkip(skipPolicy, e, contribution.getStepSkipCount())) {
332                                        contribution.incrementWriteSkipCount();
333                                        logger.debug("Skipping after failed write", e);
334                                        return null;
335                                }
336                                else {
337                                        if (rollbackClassifier.classify(e)) {
338                                                // Default is to rollback unless the classifier
339                                                // allows us to continue
340                                                throw new RetryException("Non-skippable exception in recoverer while write", e);
341                                        }
342                                        return null;
343                                }
344                        }
345
346                };
347
348                batchRetryTemplate.execute(retryCallback, recoveryCallback);
349        }
350}