001/* 002 * Copyright 2006-2007 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.repeat.support; 018 019import java.util.ArrayList; 020import java.util.Arrays; 021import java.util.Collection; 022import java.util.List; 023 024import org.apache.commons.logging.Log; 025import org.apache.commons.logging.LogFactory; 026 027import org.springframework.batch.repeat.CompletionPolicy; 028import org.springframework.batch.repeat.RepeatCallback; 029import org.springframework.batch.repeat.RepeatContext; 030import org.springframework.batch.repeat.RepeatException; 031import org.springframework.batch.repeat.RepeatListener; 032import org.springframework.batch.repeat.RepeatOperations; 033import org.springframework.batch.repeat.RepeatStatus; 034import org.springframework.batch.repeat.exception.DefaultExceptionHandler; 035import org.springframework.batch.repeat.exception.ExceptionHandler; 036import org.springframework.batch.repeat.policy.DefaultResultCompletionPolicy; 037import org.springframework.util.Assert; 038 039/** 040 * Simple implementation and base class for batch templates implementing 041 * {@link RepeatOperations}. Provides a framework including interceptors and 042 * policies. Subclasses just need to provide a method that gets the next result 043 * and one that waits for all the results to be returned from concurrent 044 * processes or threads.<br> 045 * 046 * N.B. the template accumulates thrown exceptions during the iteration, and 047 * they are all processed together when the main loop ends (i.e. finished 048 * processing the items). Clients that do not want to stop execution when an 049 * exception is thrown can use a specific {@link CompletionPolicy} that does not 050 * finish when exceptions are received. This is not the default behaviour.<br> 051 * 052 * Clients that want to take some business action when an exception is thrown by 053 * the {@link RepeatCallback} can consider using a custom {@link RepeatListener} 054 * instead of trying to customise the {@link CompletionPolicy}. This is 055 * generally a friendlier interface to implement, and the 056 * {@link RepeatListener#after(RepeatContext, RepeatStatus)} method is passed in 057 * the result of the callback, which would be an instance of {@link Throwable} 058 * if the business processing had thrown an exception. If the exception is not 059 * to be propagated to the caller, then a non-default {@link CompletionPolicy} 060 * needs to be provided as well, but that could be off the shelf, with the 061 * business action implemented only in the interceptor. 062 * 063 * @author Dave Syer 064 * 065 */ 066public class RepeatTemplate implements RepeatOperations { 067 068 protected Log logger = LogFactory.getLog(getClass()); 069 070 private RepeatListener[] listeners = new RepeatListener[] {}; 071 072 private CompletionPolicy completionPolicy = new DefaultResultCompletionPolicy(); 073 074 private ExceptionHandler exceptionHandler = new DefaultExceptionHandler(); 075 076 /** 077 * Set the listeners for this template, registering them for callbacks at 078 * appropriate times in the iteration. 079 * 080 * @param listeners listeners to be used 081 */ 082 public void setListeners(RepeatListener[] listeners) { 083 this.listeners = Arrays.asList(listeners).toArray(new RepeatListener[listeners.length]); 084 } 085 086 /** 087 * Register an additional listener. 088 * 089 * @param listener a single listener to be added to the list 090 */ 091 public void registerListener(RepeatListener listener) { 092 List<RepeatListener> list = new ArrayList<>(Arrays.asList(listeners)); 093 list.add(listener); 094 listeners = list.toArray(new RepeatListener[list.size()]); 095 } 096 097 /** 098 * Setter for exception handler strategy. The exception handler is called at 099 * the end of a batch, after the {@link CompletionPolicy} has determined 100 * that the batch is complete. By default all exceptions are re-thrown. 101 * 102 * @see ExceptionHandler 103 * @see DefaultExceptionHandler 104 * @see #setCompletionPolicy(CompletionPolicy) 105 * 106 * @param exceptionHandler the {@link ExceptionHandler} to use. 107 */ 108 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 109 this.exceptionHandler = exceptionHandler; 110 } 111 112 /** 113 * Setter for policy to decide when the batch is complete. The default is to 114 * complete normally when the callback returns a {@link RepeatStatus} which 115 * is not marked as continuable, and abnormally when the callback throws an 116 * exception (but the decision to re-throw the exception is deferred to the 117 * {@link ExceptionHandler}). 118 * 119 * @see #setExceptionHandler(ExceptionHandler) 120 * 121 * @param terminationPolicy a TerminationPolicy. 122 * @throws IllegalArgumentException if the argument is null 123 */ 124 public void setCompletionPolicy(CompletionPolicy terminationPolicy) { 125 Assert.notNull(terminationPolicy, "CompletionPolicy is required"); 126 this.completionPolicy = terminationPolicy; 127 } 128 129 /** 130 * Execute the batch callback until the completion policy decides that we 131 * are finished. Wait for the whole batch to finish before returning even if 132 * the task executor is asynchronous. 133 * 134 * @see org.springframework.batch.repeat.RepeatOperations#iterate(org.springframework.batch.repeat.RepeatCallback) 135 */ 136 @Override 137 public RepeatStatus iterate(RepeatCallback callback) { 138 139 RepeatContext outer = RepeatSynchronizationManager.getContext(); 140 141 RepeatStatus result = RepeatStatus.CONTINUABLE; 142 try { 143 // This works with an asynchronous TaskExecutor: the 144 // interceptors have to wait for the child processes. 145 result = executeInternal(callback); 146 } 147 finally { 148 RepeatSynchronizationManager.clear(); 149 if (outer != null) { 150 RepeatSynchronizationManager.register(outer); 151 } 152 } 153 154 return result; 155 } 156 157 /** 158 * Internal convenience method to loop over interceptors and batch 159 * callbacks. 160 * 161 * @param callback the callback to process each element of the loop. 162 * 163 * @return the aggregate of {@link RepeatTemplate#canContinue(RepeatStatus)} 164 * for all the results from the callback. 165 * 166 */ 167 private RepeatStatus executeInternal(final RepeatCallback callback) { 168 169 // Reset the termination policy if there is one... 170 RepeatContext context = start(); 171 172 // Make sure if we are already marked complete before we start then no 173 // processing takes place. 174 boolean running = !isMarkedComplete(context); 175 176 for (RepeatListener interceptor : listeners) { 177 interceptor.open(context); 178 running = running && !isMarkedComplete(context); 179 if (!running) 180 break; 181 } 182 183 // Return value, default is to allow continued processing. 184 RepeatStatus result = RepeatStatus.CONTINUABLE; 185 186 RepeatInternalState state = createInternalState(context); 187 // This is the list of exceptions thrown by all active callbacks 188 Collection<Throwable> throwables = state.getThrowables(); 189 // Keep a separate list of exceptions we handled that need to be 190 // rethrown 191 Collection<Throwable> deferred = new ArrayList<>(); 192 193 try { 194 195 while (running) { 196 197 /* 198 * Run the before interceptors here, not in the task executor so 199 * that they all happen in the same thread - it's easier for 200 * tracking batch status, amongst other things. 201 */ 202 for (int i = 0; i < listeners.length; i++) { 203 RepeatListener interceptor = listeners[i]; 204 interceptor.before(context); 205 // Allow before interceptors to veto the batch by setting 206 // flag. 207 running = running && !isMarkedComplete(context); 208 } 209 210 // Check that we are still running (should always be true) ... 211 if (running) { 212 213 try { 214 215 result = getNextResult(context, callback, state); 216 executeAfterInterceptors(context, result); 217 218 } 219 catch (Throwable throwable) { 220 doHandle(throwable, context, deferred); 221 } 222 223 // N.B. the order may be important here: 224 if (isComplete(context, result) || isMarkedComplete(context) || !deferred.isEmpty()) { 225 running = false; 226 } 227 228 } 229 230 } 231 232 result = result.and(waitForResults(state)); 233 for (Throwable throwable : throwables) { 234 doHandle(throwable, context, deferred); 235 } 236 237 // Explicitly drop any references to internal state... 238 state = null; 239 240 } 241 /* 242 * No need for explicit catch here - if the business processing threw an 243 * exception it was already handled by the helper methods. An exception 244 * here is necessarily fatal. 245 */ 246 finally { 247 248 try { 249 250 if (!deferred.isEmpty()) { 251 Throwable throwable = deferred.iterator().next(); 252 if (logger.isDebugEnabled()) { 253 logger.debug("Handling fatal exception explicitly (rethrowing first of " + deferred.size() + "): " 254 + throwable.getClass().getName() + ": " + throwable.getMessage()); 255 } 256 rethrow(throwable); 257 } 258 259 } 260 finally { 261 262 try { 263 for (int i = listeners.length; i-- > 0;) { 264 RepeatListener interceptor = listeners[i]; 265 interceptor.close(context); 266 } 267 } 268 finally { 269 context.close(); 270 } 271 272 } 273 274 } 275 276 return result; 277 278 } 279 280 private void doHandle(Throwable throwable, RepeatContext context, Collection<Throwable> deferred) { 281 // An exception alone is not sufficient grounds for not 282 // continuing 283 Throwable unwrappedThrowable = unwrapIfRethrown(throwable); 284 try { 285 286 for (int i = listeners.length; i-- > 0;) { 287 RepeatListener interceptor = listeners[i]; 288 // This is not an error - only log at debug 289 // level. 290 if (logger.isDebugEnabled()) { 291 logger.debug("Exception intercepted (" + (i + 1) + " of " + listeners.length + ")", unwrappedThrowable); 292 } 293 interceptor.onError(context, unwrappedThrowable); 294 } 295 296 if (logger.isDebugEnabled()) { 297 logger.debug("Handling exception: " + throwable.getClass().getName() + ", caused by: " 298 + unwrappedThrowable.getClass().getName() + ": " + unwrappedThrowable.getMessage()); 299 } 300 exceptionHandler.handleException(context, unwrappedThrowable); 301 302 } 303 catch (Throwable handled) { 304 deferred.add(handled); 305 } 306 } 307 308 /** 309 * Re-throws the original throwable if it is unchecked, wraps checked 310 * exceptions into {@link RepeatException}. 311 */ 312 private static void rethrow(Throwable throwable) throws RuntimeException { 313 if (throwable instanceof Error) { 314 throw (Error) throwable; 315 } 316 else if (throwable instanceof RuntimeException) { 317 throw (RuntimeException) throwable; 318 } 319 else { 320 throw new RepeatException("Exception in batch process", throwable); 321 } 322 } 323 324 /** 325 * Unwraps the throwable if it has been wrapped by 326 * {@link #rethrow(Throwable)}. 327 */ 328 private static Throwable unwrapIfRethrown(Throwable throwable) { 329 if (throwable instanceof RepeatException) { 330 return throwable.getCause(); 331 } 332 else { 333 return throwable; 334 } 335 } 336 337 /** 338 * Create an internal state object that is used to store data needed 339 * internally in the scope of an iteration. Used by subclasses to manage the 340 * queueing and retrieval of asynchronous results. The default just provides 341 * an accumulation of Throwable instances for processing at the end of the 342 * batch. 343 * 344 * @param context the current {@link RepeatContext} 345 * @return a {@link RepeatInternalState} instance. 346 * 347 * @see RepeatTemplate#waitForResults(RepeatInternalState) 348 */ 349 protected RepeatInternalState createInternalState(RepeatContext context) { 350 return new RepeatInternalStateSupport(); 351 } 352 353 /** 354 * Get the next completed result, possibly executing several callbacks until 355 * one finally finishes. Normally a subclass would have to override both 356 * this method and {@link #createInternalState(RepeatContext)} because the 357 * implementation of this method would rely on the details of the internal 358 * state. 359 * 360 * @param context current BatchContext. 361 * @param callback the callback to execute. 362 * @param state maintained by the implementation. 363 * @return a finished result. 364 * @throws Throwable any Throwable emitted during the iteration 365 * 366 * @see #isComplete(RepeatContext) 367 * @see #createInternalState(RepeatContext) 368 */ 369 protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state) 370 throws Throwable { 371 update(context); 372 if (logger.isDebugEnabled()) { 373 logger.debug("Repeat operation about to start at count=" + context.getStartedCount()); 374 } 375 return callback.doInIteration(context); 376 377 } 378 379 /** 380 * If necessary, wait for results to come back from remote or concurrent 381 * processes. By default does nothing and returns true. 382 * 383 * @param state the internal state. 384 * @return true if {@link #canContinue(RepeatStatus)} is true for all 385 * results retrieved. 386 */ 387 protected boolean waitForResults(RepeatInternalState state) { 388 // no-op by default 389 return true; 390 } 391 392 /** 393 * Check return value from batch operation. 394 * 395 * @param value the last callback result. 396 * @return true if the value is {@link RepeatStatus#CONTINUABLE}. 397 */ 398 protected final boolean canContinue(RepeatStatus value) { 399 return value.isContinuable(); 400 } 401 402 private boolean isMarkedComplete(RepeatContext context) { 403 boolean complete = context.isCompleteOnly(); 404 if (context.getParent() != null) { 405 complete = complete || isMarkedComplete(context.getParent()); 406 } 407 if (complete) { 408 logger.debug("Repeat is complete according to context alone."); 409 } 410 return complete; 411 412 } 413 414 /** 415 * Convenience method to execute after interceptors on a callback result. 416 * 417 * @param context the current batch context. 418 * @param value the result of the callback to process. 419 */ 420 protected void executeAfterInterceptors(final RepeatContext context, RepeatStatus value) { 421 422 // Don't re-throw exceptions here: let the exception handler deal with 423 // that... 424 425 if (value != null && value.isContinuable()) { 426 for (int i = listeners.length; i-- > 0;) { 427 RepeatListener interceptor = listeners[i]; 428 interceptor.after(context, value); 429 } 430 431 } 432 433 } 434 435 /** 436 * Delegate to the {@link CompletionPolicy}. 437 * @param context the current batch context. 438 * @param result the result of the latest batch item processing. 439 * @return true if complete according to policy and result value, else false. 440 * 441 * @see org.springframework.batch.repeat.CompletionPolicy#isComplete(RepeatContext, 442 * RepeatStatus) 443 */ 444 protected boolean isComplete(RepeatContext context, RepeatStatus result) { 445 boolean complete = completionPolicy.isComplete(context, result); 446 if (complete) { 447 logger.debug("Repeat is complete according to policy and result value."); 448 } 449 return complete; 450 } 451 452 /** 453 * Delegate to {@link CompletionPolicy}. 454 * @param context the current batch context. 455 * @return true if complete according to policy alone not including result value, else false. 456 * 457 * @see org.springframework.batch.repeat.CompletionPolicy#isComplete(RepeatContext) 458 */ 459 protected boolean isComplete(RepeatContext context) { 460 boolean complete = completionPolicy.isComplete(context); 461 if (complete) { 462 logger.debug("Repeat is complete according to policy alone not including result."); 463 } 464 return complete; 465 } 466 467 /** 468 * Delegate to the {@link CompletionPolicy}. 469 * 470 * @return a {@link RepeatContext} object that can be used by the implementation to store 471 * internal state for a batch step. 472 * 473 * @see org.springframework.batch.repeat.CompletionPolicy#start(RepeatContext) 474 */ 475 protected RepeatContext start() { 476 RepeatContext parent = RepeatSynchronizationManager.getContext(); 477 RepeatContext context = completionPolicy.start(parent); 478 RepeatSynchronizationManager.register(context); 479 logger.debug("Starting repeat context."); 480 return context; 481 } 482 483 /** 484 * Delegate to the {@link CompletionPolicy}. 485 * @param context the value returned by start. 486 * 487 * @see org.springframework.batch.repeat.CompletionPolicy#update(RepeatContext) 488 */ 489 protected void update(RepeatContext context) { 490 completionPolicy.update(context); 491 } 492 493}