001/* 002 * Copyright 2006-2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.core.step.item; 018 019import org.springframework.classify.Classifier; 020import org.springframework.retry.ExhaustedRetryException; 021import org.springframework.retry.RecoveryCallback; 022import org.springframework.retry.RetryCallback; 023import org.springframework.retry.RetryContext; 024import org.springframework.retry.RetryListener; 025import org.springframework.retry.RetryOperations; 026import org.springframework.retry.RetryPolicy; 027import org.springframework.retry.RetryState; 028import org.springframework.retry.backoff.BackOffPolicy; 029import org.springframework.retry.context.RetryContextSupport; 030import org.springframework.retry.policy.RetryContextCache; 031import org.springframework.retry.support.DefaultRetryState; 032import org.springframework.retry.support.RetrySynchronizationManager; 033import org.springframework.retry.support.RetryTemplate; 034 035import java.util.ArrayList; 036import java.util.Collection; 037import java.util.Iterator; 038import java.util.List; 039 040/** 041 * A special purpose retry template that deals specifically with multi-valued 042 * stateful retry. This is useful in the case where the operation to be retried 043 * operates on multiple items, and when it fails there is no way to decide which 044 * (if any) of the items was responsible. The {@link RetryState} used in the 045 * execute methods is composite, and when a failure occurs, all of the keys in 046 * the composite are "tarred with the same brush". Subsequent attempts to 047 * execute with any of the keys that have failed previously results in a new 048 * attempt and the previous state is used to check the {@link RetryPolicy}. If 049 * one of the failed items eventually succeeds then the others in the current 050 * composite for that attempt will be cleared from the context cache (as 051 * normal), but there may still be entries in the cache for the original failed 052 * items. This might mean that an item that did not cause a failure is never 053 * retried because other items in the same batch fail fatally first. 054 * 055 * @author Dave Syer 056 * 057 */ 058public class BatchRetryTemplate implements RetryOperations { 059 060 private class BatchRetryState extends DefaultRetryState { 061 062 private final Collection<RetryState> keys; 063 064 public BatchRetryState(Collection<RetryState> keys) { 065 super(keys); 066 this.keys = new ArrayList<RetryState>(keys); 067 } 068 069 } 070 071 @SuppressWarnings("serial") 072 private static class BatchRetryContext extends RetryContextSupport { 073 074 private final Collection<RetryContext> contexts; 075 076 public BatchRetryContext(RetryContext parent, Collection<RetryContext> contexts) { 077 078 super(parent); 079 080 this.contexts = contexts; 081 int count = 0; 082 083 for (RetryContext context : contexts) { 084 int retryCount = context.getRetryCount(); 085 if (retryCount > count) { 086 count = retryCount; 087 registerThrowable(context.getLastThrowable()); 088 } 089 } 090 091 } 092 093 } 094 095 private static class InnerRetryTemplate extends RetryTemplate { 096 097 @Override 098 protected boolean canRetry(RetryPolicy retryPolicy, RetryContext context) { 099 100 BatchRetryContext batchContext = (BatchRetryContext) context; 101 102 for (RetryContext nextContext : batchContext.contexts) { 103 if (!super.canRetry(retryPolicy, nextContext)) { 104 return false; 105 } 106 } 107 108 return true; 109 110 } 111 112 @Override 113 protected RetryContext open(RetryPolicy retryPolicy, RetryState state) { 114 115 BatchRetryState batchState = (BatchRetryState) state; 116 117 Collection<RetryContext> contexts = new ArrayList<RetryContext>(); 118 for (RetryState retryState : batchState.keys) { 119 contexts.add(super.open(retryPolicy, retryState)); 120 } 121 122 return new BatchRetryContext(RetrySynchronizationManager.getContext(), contexts); 123 124 } 125 126 @Override 127 protected void registerThrowable(RetryPolicy retryPolicy, RetryState state, RetryContext context, Throwable e) { 128 129 BatchRetryState batchState = (BatchRetryState) state; 130 BatchRetryContext batchContext = (BatchRetryContext) context; 131 132 Iterator<RetryContext> contextIterator = batchContext.contexts.iterator(); 133 for (RetryState retryState : batchState.keys) { 134 RetryContext nextContext = contextIterator.next(); 135 super.registerThrowable(retryPolicy, retryState, nextContext, e); 136 } 137 138 } 139 140 @Override 141 protected void close(RetryPolicy retryPolicy, RetryContext context, RetryState state, boolean succeeded) { 142 143 BatchRetryState batchState = (BatchRetryState) state; 144 BatchRetryContext batchContext = (BatchRetryContext) context; 145 146 Iterator<RetryContext> contextIterator = batchContext.contexts.iterator(); 147 for (RetryState retryState : batchState.keys) { 148 RetryContext nextContext = contextIterator.next(); 149 super.close(retryPolicy, nextContext, retryState, succeeded); 150 } 151 152 } 153 154 @Override 155 protected <T> T handleRetryExhausted(RecoveryCallback<T> recoveryCallback, RetryContext context, 156 RetryState state) throws Throwable { 157 158 BatchRetryState batchState = (BatchRetryState) state; 159 BatchRetryContext batchContext = (BatchRetryContext) context; 160 161 // Accumulate exceptions to be thrown so all the keys get a crack 162 Throwable rethrowable = null; 163 ExhaustedRetryException exhausted = null; 164 165 Iterator<RetryContext> contextIterator = batchContext.contexts.iterator(); 166 for (RetryState retryState : batchState.keys) { 167 168 RetryContext nextContext = contextIterator.next(); 169 170 try { 171 super.handleRetryExhausted(null, nextContext, retryState); 172 } 173 catch (ExhaustedRetryException e) { 174 exhausted = e; 175 } 176 catch (Throwable e) { 177 rethrowable = e; 178 } 179 180 } 181 182 if (recoveryCallback != null) { 183 return recoveryCallback.recover(context); 184 } 185 186 if (exhausted != null) { 187 throw exhausted; 188 } 189 190 throw rethrowable; 191 192 } 193 194 } 195 196 private final InnerRetryTemplate delegate = new InnerRetryTemplate(); 197 198 private final RetryTemplate regular = new RetryTemplate(); 199 200 private RetryPolicy retryPolicy; 201 202 public <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, Collection<RetryState> states) throws E, 203 Exception { 204 RetryState batchState = new BatchRetryState(states); 205 return delegate.execute(retryCallback, batchState); 206 } 207 208 public <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback, 209 Collection<RetryState> states) throws E, Exception { 210 RetryState batchState = new BatchRetryState(states); 211 return delegate.execute(retryCallback, recoveryCallback, batchState); 212 } 213 214 @Override 215 public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback, 216 RetryState retryState) throws E { 217 return regular.execute(retryCallback, recoveryCallback, retryState); 218 } 219 220 @Override 221 public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback) throws E { 222 return regular.execute(retryCallback, recoveryCallback); 223 } 224 225 @Override 226 public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RetryState retryState) throws E, 227 ExhaustedRetryException { 228 return regular.execute(retryCallback, retryState); 229 } 230 231 @Override 232 public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback) throws E { 233 return regular.execute(retryCallback); 234 } 235 236 public static List<RetryState> createState(List<?> keys) { 237 List<RetryState> states = new ArrayList<RetryState>(); 238 for (Object key : keys) { 239 states.add(new DefaultRetryState(key)); 240 } 241 return states; 242 } 243 244 public static List<RetryState> createState(List<?> keys, Classifier<? super Throwable, Boolean> classifier) { 245 List<RetryState> states = new ArrayList<RetryState>(); 246 for (Object key : keys) { 247 states.add(new DefaultRetryState(key, classifier)); 248 } 249 return states; 250 } 251 252 public void registerListener(RetryListener listener) { 253 delegate.registerListener(listener); 254 regular.registerListener(listener); 255 } 256 257 public void setBackOffPolicy(BackOffPolicy backOffPolicy) { 258 delegate.setBackOffPolicy(backOffPolicy); 259 regular.setBackOffPolicy(backOffPolicy); 260 } 261 262 public void setListeners(RetryListener[] listeners) { 263 delegate.setListeners(listeners); 264 regular.setListeners(listeners); 265 } 266 267 public void setRetryContextCache(RetryContextCache retryContextCache) { 268 delegate.setRetryContextCache(retryContextCache); 269 regular.setRetryContextCache(retryContextCache); 270 } 271 272 public void setRetryPolicy(RetryPolicy retryPolicy) { 273 this.retryPolicy = retryPolicy; 274 delegate.setRetryPolicy(retryPolicy); 275 regular.setRetryPolicy(retryPolicy); 276 } 277 278 public boolean canRetry(RetryContext context) { 279 return context==null ? true : retryPolicy.canRetry(context); 280 } 281 282}