001/*
002 * Copyright 2006-2007 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.repeat.support;
018
019import java.util.ArrayList;
020import java.util.Arrays;
021import java.util.Collection;
022import java.util.List;
023
024import org.apache.commons.logging.Log;
025import org.apache.commons.logging.LogFactory;
026
027import org.springframework.batch.repeat.CompletionPolicy;
028import org.springframework.batch.repeat.RepeatCallback;
029import org.springframework.batch.repeat.RepeatContext;
030import org.springframework.batch.repeat.RepeatException;
031import org.springframework.batch.repeat.RepeatListener;
032import org.springframework.batch.repeat.RepeatOperations;
033import org.springframework.batch.repeat.RepeatStatus;
034import org.springframework.batch.repeat.exception.DefaultExceptionHandler;
035import org.springframework.batch.repeat.exception.ExceptionHandler;
036import org.springframework.batch.repeat.policy.DefaultResultCompletionPolicy;
037import org.springframework.util.Assert;
038
039/**
040 * Simple implementation and base class for batch templates implementing
041 * {@link RepeatOperations}. Provides a framework including interceptors and
042 * policies. Subclasses just need to provide a method that gets the next result
043 * and one that waits for all the results to be returned from concurrent
044 * processes or threads.<br>
045 * 
046 * N.B. the template accumulates thrown exceptions during the iteration, and
047 * they are all processed together when the main loop ends (i.e. finished
048 * processing the items). Clients that do not want to stop execution when an
049 * exception is thrown can use a specific {@link CompletionPolicy} that does not
050 * finish when exceptions are received. This is not the default behaviour.<br>
051 * 
052 * Clients that want to take some business action when an exception is thrown by
053 * the {@link RepeatCallback} can consider using a custom {@link RepeatListener}
054 * instead of trying to customise the {@link CompletionPolicy}. This is
055 * generally a friendlier interface to implement, and the
056 * {@link RepeatListener#after(RepeatContext, RepeatStatus)} method is passed in
057 * the result of the callback, which would be an instance of {@link Throwable}
058 * if the business processing had thrown an exception. If the exception is not
059 * to be propagated to the caller, then a non-default {@link CompletionPolicy}
060 * needs to be provided as well, but that could be off the shelf, with the
061 * business action implemented only in the interceptor.
062 * 
063 * @author Dave Syer
064 * 
065 */
066public class RepeatTemplate implements RepeatOperations {
067
068        protected Log logger = LogFactory.getLog(getClass());
069
070        private RepeatListener[] listeners = new RepeatListener[] {};
071
072        private CompletionPolicy completionPolicy = new DefaultResultCompletionPolicy();
073
074        private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
075
076        /**
077         * Set the listeners for this template, registering them for callbacks at
078         * appropriate times in the iteration.
079         * 
080         * @param listeners listeners to be used
081         */
082        public void setListeners(RepeatListener[] listeners) {
083                this.listeners = Arrays.asList(listeners).toArray(new RepeatListener[listeners.length]);
084        }
085
086        /**
087         * Register an additional listener.
088         * 
089         * @param listener a single listener to be added to the list
090         */
091        public void registerListener(RepeatListener listener) {
092                List<RepeatListener> list = new ArrayList<>(Arrays.asList(listeners));
093                list.add(listener);
094                listeners = list.toArray(new RepeatListener[list.size()]);
095        }
096
097        /**
098         * Setter for exception handler strategy. The exception handler is called at
099         * the end of a batch, after the {@link CompletionPolicy} has determined
100         * that the batch is complete. By default all exceptions are re-thrown.
101         * 
102         * @see ExceptionHandler
103         * @see DefaultExceptionHandler
104         * @see #setCompletionPolicy(CompletionPolicy)
105         * 
106         * @param exceptionHandler the {@link ExceptionHandler} to use.
107         */
108        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
109                this.exceptionHandler = exceptionHandler;
110        }
111
112        /**
113         * Setter for policy to decide when the batch is complete. The default is to
114         * complete normally when the callback returns a {@link RepeatStatus} which
115         * is not marked as continuable, and abnormally when the callback throws an
116         * exception (but the decision to re-throw the exception is deferred to the
117         * {@link ExceptionHandler}).
118         * 
119         * @see #setExceptionHandler(ExceptionHandler)
120         * 
121         * @param terminationPolicy a TerminationPolicy.
122         * @throws IllegalArgumentException if the argument is null
123         */
124        public void setCompletionPolicy(CompletionPolicy terminationPolicy) {
125                Assert.notNull(terminationPolicy, "CompletionPolicy is required");
126                this.completionPolicy = terminationPolicy;
127        }
128
129        /**
130         * Execute the batch callback until the completion policy decides that we
131         * are finished. Wait for the whole batch to finish before returning even if
132         * the task executor is asynchronous.
133         * 
134         * @see org.springframework.batch.repeat.RepeatOperations#iterate(org.springframework.batch.repeat.RepeatCallback)
135         */
136    @Override
137        public RepeatStatus iterate(RepeatCallback callback) {
138
139                RepeatContext outer = RepeatSynchronizationManager.getContext();
140
141                RepeatStatus result = RepeatStatus.CONTINUABLE;
142                try {
143                        // This works with an asynchronous TaskExecutor: the
144                        // interceptors have to wait for the child processes.
145                        result = executeInternal(callback);
146                }
147                finally {
148                        RepeatSynchronizationManager.clear();
149                        if (outer != null) {
150                                RepeatSynchronizationManager.register(outer);
151                        }
152                }
153
154                return result;
155        }
156
157        /**
158         * Internal convenience method to loop over interceptors and batch
159         * callbacks.
160         * 
161         * @param callback the callback to process each element of the loop.
162         * 
163         * @return the aggregate of {@link RepeatTemplate#canContinue(RepeatStatus)}
164         * for all the results from the callback.
165         * 
166         */
167        private RepeatStatus executeInternal(final RepeatCallback callback) {
168
169                // Reset the termination policy if there is one...
170                RepeatContext context = start();
171
172                // Make sure if we are already marked complete before we start then no
173                // processing takes place.
174                boolean running = !isMarkedComplete(context);
175
176                for (RepeatListener interceptor : listeners) {
177                        interceptor.open(context);
178                        running = running && !isMarkedComplete(context);
179                        if (!running)
180                                break;
181                }
182
183                // Return value, default is to allow continued processing.
184                RepeatStatus result = RepeatStatus.CONTINUABLE;
185
186                RepeatInternalState state = createInternalState(context);
187                // This is the list of exceptions thrown by all active callbacks
188                Collection<Throwable> throwables = state.getThrowables();
189                // Keep a separate list of exceptions we handled that need to be
190                // rethrown
191                Collection<Throwable> deferred = new ArrayList<>();
192
193                try {
194
195                        while (running) {
196
197                                /*
198                                 * Run the before interceptors here, not in the task executor so
199                                 * that they all happen in the same thread - it's easier for
200                                 * tracking batch status, amongst other things.
201                                 */
202                                for (int i = 0; i < listeners.length; i++) {
203                                        RepeatListener interceptor = listeners[i];
204                                        interceptor.before(context);
205                                        // Allow before interceptors to veto the batch by setting
206                                        // flag.
207                                        running = running && !isMarkedComplete(context);
208                                }
209
210                                // Check that we are still running (should always be true) ...
211                                if (running) {
212
213                                        try {
214
215                                                result = getNextResult(context, callback, state);
216                                                executeAfterInterceptors(context, result);
217
218                                        }
219                                        catch (Throwable throwable) {
220                                                doHandle(throwable, context, deferred);
221                                        }
222
223                                        // N.B. the order may be important here:
224                                        if (isComplete(context, result) || isMarkedComplete(context) || !deferred.isEmpty()) {
225                                                running = false;
226                                        }
227
228                                }
229
230                        }
231
232                        result = result.and(waitForResults(state));
233                        for (Throwable throwable : throwables) {
234                                doHandle(throwable, context, deferred);
235                        }
236
237                        // Explicitly drop any references to internal state...
238                        state = null;
239
240                }
241                /*
242                 * No need for explicit catch here - if the business processing threw an
243                 * exception it was already handled by the helper methods. An exception
244                 * here is necessarily fatal.
245                 */
246                finally {
247
248                        try {
249
250                                if (!deferred.isEmpty()) {
251                                        Throwable throwable = deferred.iterator().next();
252                                        if (logger.isDebugEnabled()) {
253                                                logger.debug("Handling fatal exception explicitly (rethrowing first of " + deferred.size() + "): "
254                                                                + throwable.getClass().getName() + ": " + throwable.getMessage());
255                                        }
256                                        rethrow(throwable);
257                                }
258
259                        }
260                        finally {
261
262                                try {
263                                        for (int i = listeners.length; i-- > 0;) {
264                                                RepeatListener interceptor = listeners[i];
265                                                interceptor.close(context);
266                                        }
267                                }
268                                finally {
269                                        context.close();
270                                }
271
272                        }
273
274                }
275
276                return result;
277
278        }
279
280        private void doHandle(Throwable throwable, RepeatContext context, Collection<Throwable> deferred) {
281                // An exception alone is not sufficient grounds for not
282                // continuing
283                Throwable unwrappedThrowable = unwrapIfRethrown(throwable);
284                try {
285
286                        for (int i = listeners.length; i-- > 0;) {
287                                RepeatListener interceptor = listeners[i];
288                                // This is not an error - only log at debug
289                                // level.
290                                if (logger.isDebugEnabled()) {
291                                        logger.debug("Exception intercepted (" + (i + 1) + " of " + listeners.length + ")", unwrappedThrowable);
292                                }
293                                interceptor.onError(context, unwrappedThrowable);
294                        }
295
296                        if (logger.isDebugEnabled()) {
297                                logger.debug("Handling exception: " + throwable.getClass().getName() + ", caused by: "
298                                                + unwrappedThrowable.getClass().getName() + ": " + unwrappedThrowable.getMessage());
299                        }
300                        exceptionHandler.handleException(context, unwrappedThrowable);
301
302                }
303                catch (Throwable handled) {
304                        deferred.add(handled);
305                }
306        }
307
308        /**
309         * Re-throws the original throwable if it is unchecked, wraps checked
310         * exceptions into {@link RepeatException}.
311         */
312        private static void rethrow(Throwable throwable) throws RuntimeException {
313                if (throwable instanceof Error) {
314                        throw (Error) throwable;
315                }
316                else if (throwable instanceof RuntimeException) {
317                        throw (RuntimeException) throwable;
318                }
319                else {
320                        throw new RepeatException("Exception in batch process", throwable);
321                }
322        }
323
324        /**
325         * Unwraps the throwable if it has been wrapped by
326         * {@link #rethrow(Throwable)}.
327         */
328        private static Throwable unwrapIfRethrown(Throwable throwable) {
329                if (throwable instanceof RepeatException) {
330                        return throwable.getCause();
331                }
332                else {
333                        return throwable;
334                }
335        }
336
337        /**
338         * Create an internal state object that is used to store data needed
339         * internally in the scope of an iteration. Used by subclasses to manage the
340         * queueing and retrieval of asynchronous results. The default just provides
341         * an accumulation of Throwable instances for processing at the end of the
342         * batch.
343         * 
344         * @param context the current {@link RepeatContext}
345         * @return a {@link RepeatInternalState} instance.
346         * 
347         * @see RepeatTemplate#waitForResults(RepeatInternalState)
348         */
349        protected RepeatInternalState createInternalState(RepeatContext context) {
350                return new RepeatInternalStateSupport();
351        }
352
353        /**
354         * Get the next completed result, possibly executing several callbacks until
355         * one finally finishes. Normally a subclass would have to override both
356         * this method and {@link #createInternalState(RepeatContext)} because the
357         * implementation of this method would rely on the details of the internal
358         * state.
359         * 
360         * @param context current BatchContext.
361         * @param callback the callback to execute.
362         * @param state maintained by the implementation.
363         * @return a finished result.
364         * @throws Throwable any Throwable emitted during the iteration
365         * 
366         * @see #isComplete(RepeatContext)
367         * @see #createInternalState(RepeatContext)
368         */
369        protected RepeatStatus getNextResult(RepeatContext context, RepeatCallback callback, RepeatInternalState state)
370                        throws Throwable {
371                update(context);
372                if (logger.isDebugEnabled()) {
373                        logger.debug("Repeat operation about to start at count=" + context.getStartedCount());
374                }
375                return callback.doInIteration(context);
376
377        }
378
379        /**
380         * If necessary, wait for results to come back from remote or concurrent
381         * processes. By default does nothing and returns true.
382         * 
383         * @param state the internal state.
384         * @return true if {@link #canContinue(RepeatStatus)} is true for all
385         * results retrieved.
386         */
387        protected boolean waitForResults(RepeatInternalState state) {
388                // no-op by default
389                return true;
390        }
391
392        /**
393         * Check return value from batch operation.
394         * 
395         * @param value the last callback result.
396         * @return true if the value is {@link RepeatStatus#CONTINUABLE}.
397         */
398        protected final boolean canContinue(RepeatStatus value) {
399                return value.isContinuable();
400        }
401
402        private boolean isMarkedComplete(RepeatContext context) {
403                boolean complete = context.isCompleteOnly();
404                if (context.getParent() != null) {
405                        complete = complete || isMarkedComplete(context.getParent());
406                }
407                if (complete) {
408                        logger.debug("Repeat is complete according to context alone.");
409                }
410                return complete;
411
412        }
413
414        /**
415         * Convenience method to execute after interceptors on a callback result.
416         * 
417         * @param context the current batch context.
418         * @param value the result of the callback to process.
419         */
420        protected void executeAfterInterceptors(final RepeatContext context, RepeatStatus value) {
421
422                // Don't re-throw exceptions here: let the exception handler deal with
423                // that...
424
425                if (value != null && value.isContinuable()) {
426                        for (int i = listeners.length; i-- > 0;) {
427                                RepeatListener interceptor = listeners[i];
428                                interceptor.after(context, value);
429                        }
430
431                }
432
433        }
434
435        /**
436         * Delegate to the {@link CompletionPolicy}.
437         * @param context the current batch context.
438         * @param result the result of the latest batch item processing.
439         * @return true if complete according to policy and result value, else false.
440         *
441         * @see org.springframework.batch.repeat.CompletionPolicy#isComplete(RepeatContext,
442         * RepeatStatus)
443         */
444        protected boolean isComplete(RepeatContext context, RepeatStatus result) {
445                boolean complete = completionPolicy.isComplete(context, result);
446                if (complete) {
447                        logger.debug("Repeat is complete according to policy and result value.");
448                }
449                return complete;
450        }
451
452        /**
453         * Delegate to {@link CompletionPolicy}.
454         * @param context the current batch context.
455         * @return true if complete according to policy alone not including result value, else false.
456         * 
457         * @see org.springframework.batch.repeat.CompletionPolicy#isComplete(RepeatContext)
458         */
459        protected boolean isComplete(RepeatContext context) {
460                boolean complete = completionPolicy.isComplete(context);
461                if (complete) {
462                        logger.debug("Repeat is complete according to policy alone not including result.");
463                }
464                return complete;
465        }
466
467        /**
468         * Delegate to the {@link CompletionPolicy}.
469         *
470         * @return a {@link RepeatContext} object that can be used by the implementation to store
471         * internal state for a batch step.
472         * 
473         * @see org.springframework.batch.repeat.CompletionPolicy#start(RepeatContext)
474         */
475        protected RepeatContext start() {
476                RepeatContext parent = RepeatSynchronizationManager.getContext();
477                RepeatContext context = completionPolicy.start(parent);
478                RepeatSynchronizationManager.register(context);
479                logger.debug("Starting repeat context.");
480                return context;
481        }
482
483        /**
484         * Delegate to the {@link CompletionPolicy}.
485         * @param context the value returned by start.
486         * 
487         * @see org.springframework.batch.repeat.CompletionPolicy#update(RepeatContext)
488         */
489        protected void update(RepeatContext context) {
490                completionPolicy.update(context);
491        }
492
493}