001/*
002 * Copyright 2006-2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.step.item;
018
019import org.springframework.classify.Classifier;
020import org.springframework.retry.ExhaustedRetryException;
021import org.springframework.retry.RecoveryCallback;
022import org.springframework.retry.RetryCallback;
023import org.springframework.retry.RetryContext;
024import org.springframework.retry.RetryListener;
025import org.springframework.retry.RetryOperations;
026import org.springframework.retry.RetryPolicy;
027import org.springframework.retry.RetryState;
028import org.springframework.retry.backoff.BackOffPolicy;
029import org.springframework.retry.context.RetryContextSupport;
030import org.springframework.retry.policy.RetryContextCache;
031import org.springframework.retry.support.DefaultRetryState;
032import org.springframework.retry.support.RetrySynchronizationManager;
033import org.springframework.retry.support.RetryTemplate;
034
035import java.util.ArrayList;
036import java.util.Collection;
037import java.util.Iterator;
038import java.util.List;
039
040/**
041 * A special purpose retry template that deals specifically with multi-valued
042 * stateful retry. This is useful in the case where the operation to be retried
043 * operates on multiple items, and when it fails there is no way to decide which
044 * (if any) of the items was responsible. The {@link RetryState} used in the
045 * execute methods is composite, and when a failure occurs, all of the keys in
046 * the composite are "tarred with the same brush". Subsequent attempts to
047 * execute with any of the keys that have failed previously results in a new
048 * attempt and the previous state is used to check the {@link RetryPolicy}. If
049 * one of the failed items eventually succeeds then the others in the current
050 * composite for that attempt will be cleared from the context cache (as
051 * normal), but there may still be entries in the cache for the original failed
052 * items. This might mean that an item that did not cause a failure is never
053 * retried because other items in the same batch fail fatally first.
054 *
055 * @author Dave Syer
056 *
057 */
058public class BatchRetryTemplate implements RetryOperations {
059
060        private class BatchRetryState extends DefaultRetryState {
061
062                private final Collection<RetryState> keys;
063
064                public BatchRetryState(Collection<RetryState> keys) {
065                        super(keys);
066                        this.keys = new ArrayList<RetryState>(keys);
067                }
068
069        }
070
071        @SuppressWarnings("serial")
072        private static class BatchRetryContext extends RetryContextSupport {
073
074                private final Collection<RetryContext> contexts;
075
076                public BatchRetryContext(RetryContext parent, Collection<RetryContext> contexts) {
077
078                        super(parent);
079
080                        this.contexts = contexts;
081                        int count = 0;
082
083                        for (RetryContext context : contexts) {
084                                int retryCount = context.getRetryCount();
085                                if (retryCount > count) {
086                                        count = retryCount;
087                                        registerThrowable(context.getLastThrowable());
088                                }
089                        }
090
091                }
092
093        }
094
095        private static class InnerRetryTemplate extends RetryTemplate {
096
097                @Override
098                protected boolean canRetry(RetryPolicy retryPolicy, RetryContext context) {
099
100                        BatchRetryContext batchContext = (BatchRetryContext) context;
101
102                        for (RetryContext nextContext : batchContext.contexts) {
103                                if (!super.canRetry(retryPolicy, nextContext)) {
104                                        return false;
105                                }
106                        }
107
108                        return true;
109
110                }
111
112                @Override
113                protected RetryContext open(RetryPolicy retryPolicy, RetryState state) {
114
115                        BatchRetryState batchState = (BatchRetryState) state;
116
117                        Collection<RetryContext> contexts = new ArrayList<RetryContext>();
118                        for (RetryState retryState : batchState.keys) {
119                                contexts.add(super.open(retryPolicy, retryState));
120                        }
121
122                        return new BatchRetryContext(RetrySynchronizationManager.getContext(), contexts);
123
124                }
125
126                @Override
127                protected void registerThrowable(RetryPolicy retryPolicy, RetryState state, RetryContext context, Throwable e) {
128
129                        BatchRetryState batchState = (BatchRetryState) state;
130                        BatchRetryContext batchContext = (BatchRetryContext) context;
131
132                        Iterator<RetryContext> contextIterator = batchContext.contexts.iterator();
133                        for (RetryState retryState : batchState.keys) {
134                                RetryContext nextContext = contextIterator.next();
135                                super.registerThrowable(retryPolicy, retryState, nextContext, e);
136                        }
137
138                }
139
140                @Override
141                protected void close(RetryPolicy retryPolicy, RetryContext context, RetryState state, boolean succeeded) {
142
143                        BatchRetryState batchState = (BatchRetryState) state;
144                        BatchRetryContext batchContext = (BatchRetryContext) context;
145
146                        Iterator<RetryContext> contextIterator = batchContext.contexts.iterator();
147                        for (RetryState retryState : batchState.keys) {
148                                RetryContext nextContext = contextIterator.next();
149                                super.close(retryPolicy, nextContext, retryState, succeeded);
150                        }
151
152                }
153
154                @Override
155                protected <T> T handleRetryExhausted(RecoveryCallback<T> recoveryCallback, RetryContext context,
156                                RetryState state) throws Throwable {
157
158                        BatchRetryState batchState = (BatchRetryState) state;
159                        BatchRetryContext batchContext = (BatchRetryContext) context;
160
161                        // Accumulate exceptions to be thrown so all the keys get a crack
162                        Throwable rethrowable = null;
163                        ExhaustedRetryException exhausted = null;
164
165                        Iterator<RetryContext> contextIterator = batchContext.contexts.iterator();
166                        for (RetryState retryState : batchState.keys) {
167
168                                RetryContext nextContext = contextIterator.next();
169
170                                try {
171                                        super.handleRetryExhausted(null, nextContext, retryState);
172                                }
173                                catch (ExhaustedRetryException e) {
174                                        exhausted = e;
175                                }
176                                catch (Throwable e) {
177                                        rethrowable = e;
178                                }
179
180                        }
181
182                        if (recoveryCallback != null) {
183                                return recoveryCallback.recover(context);
184                        }
185
186                        if (exhausted != null) {
187                                throw exhausted;
188                        }
189
190                        throw rethrowable;
191
192                }
193
194        }
195
196        private final InnerRetryTemplate delegate = new InnerRetryTemplate();
197
198        private final RetryTemplate regular = new RetryTemplate();
199
200        private RetryPolicy retryPolicy;
201
202        public <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, Collection<RetryState> states) throws E,
203        Exception {
204                RetryState batchState = new BatchRetryState(states);
205                return delegate.execute(retryCallback, batchState);
206        }
207
208        public <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback,
209                        Collection<RetryState> states) throws E, Exception {
210                RetryState batchState = new BatchRetryState(states);
211                return delegate.execute(retryCallback, recoveryCallback, batchState);
212        }
213
214        @Override
215        public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback,
216                        RetryState retryState) throws E {
217                return regular.execute(retryCallback, recoveryCallback, retryState);
218        }
219
220        @Override
221        public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback) throws E {
222                return regular.execute(retryCallback, recoveryCallback);
223        }
224
225        @Override
226        public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RetryState retryState) throws E,
227        ExhaustedRetryException {
228                return regular.execute(retryCallback, retryState);
229        }
230
231        @Override
232        public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback) throws E {
233                return regular.execute(retryCallback);
234        }
235
236        public static List<RetryState> createState(List<?> keys) {
237                List<RetryState> states = new ArrayList<RetryState>();
238                for (Object key : keys) {
239                        states.add(new DefaultRetryState(key));
240                }
241                return states;
242        }
243
244        public static List<RetryState> createState(List<?> keys, Classifier<? super Throwable, Boolean> classifier) {
245                List<RetryState> states = new ArrayList<RetryState>();
246                for (Object key : keys) {
247                        states.add(new DefaultRetryState(key, classifier));
248                }
249                return states;
250        }
251
252        public void registerListener(RetryListener listener) {
253                delegate.registerListener(listener);
254                regular.registerListener(listener);
255        }
256
257        public void setBackOffPolicy(BackOffPolicy backOffPolicy) {
258                delegate.setBackOffPolicy(backOffPolicy);
259                regular.setBackOffPolicy(backOffPolicy);
260        }
261
262        public void setListeners(RetryListener[] listeners) {
263                delegate.setListeners(listeners);
264                regular.setListeners(listeners);
265        }
266
267        public void setRetryContextCache(RetryContextCache retryContextCache) {
268                delegate.setRetryContextCache(retryContextCache);
269                regular.setRetryContextCache(retryContextCache);
270        }
271
272        public void setRetryPolicy(RetryPolicy retryPolicy) {
273                this.retryPolicy = retryPolicy;
274                delegate.setRetryPolicy(retryPolicy);
275                regular.setRetryPolicy(retryPolicy);
276        }
277
278        public boolean canRetry(RetryContext context) {
279                return context==null ? true : retryPolicy.canRetry(context);
280        }
281
282}