001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.step.tasklet;
017
018import org.apache.commons.logging.Log;
019import org.apache.commons.logging.LogFactory;
020import org.springframework.batch.core.BatchStatus;
021import org.springframework.batch.core.ChunkListener;
022import org.springframework.batch.core.JobInterruptedException;
023import org.springframework.batch.core.StepContribution;
024import org.springframework.batch.core.StepExecution;
025import org.springframework.batch.core.StepExecutionListener;
026import org.springframework.batch.core.listener.CompositeChunkListener;
027import org.springframework.batch.core.repository.JobRepository;
028import org.springframework.batch.core.scope.context.ChunkContext;
029import org.springframework.batch.core.scope.context.StepContextRepeatCallback;
030import org.springframework.batch.core.step.AbstractStep;
031import org.springframework.batch.core.step.FatalStepExecutionException;
032import org.springframework.batch.core.step.StepInterruptionPolicy;
033import org.springframework.batch.core.step.ThreadStepInterruptionPolicy;
034import org.springframework.batch.item.ExecutionContext;
035import org.springframework.batch.item.ItemReader;
036import org.springframework.batch.item.ItemStream;
037import org.springframework.batch.item.ItemWriter;
038import org.springframework.batch.item.support.CompositeItemStream;
039import org.springframework.batch.repeat.RepeatContext;
040import org.springframework.batch.repeat.RepeatOperations;
041import org.springframework.batch.repeat.RepeatStatus;
042import org.springframework.batch.repeat.support.RepeatTemplate;
043import org.springframework.transaction.PlatformTransactionManager;
044import org.springframework.transaction.TransactionStatus;
045import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
046import org.springframework.transaction.interceptor.TransactionAttribute;
047import org.springframework.transaction.support.TransactionCallback;
048import org.springframework.transaction.support.TransactionSynchronization;
049import org.springframework.transaction.support.TransactionSynchronizationAdapter;
050import org.springframework.transaction.support.TransactionSynchronizationManager;
051import org.springframework.transaction.support.TransactionTemplate;
052import org.springframework.util.Assert;
053
054import java.util.concurrent.Semaphore;
055
056/**
057 * Simple implementation of executing the step as a call to a {@link Tasklet},
058 * possibly repeated, and each call surrounded by a transaction. The structure
059 * is therefore that of a loop with transaction boundary inside the loop. The
060 * loop is controlled by the step operations (
061 * {@link #setStepOperations(RepeatOperations)}).<br>
062 * <br>
063 *
064 * Clients can use interceptors in the step operations to intercept or listen to
065 * the iteration on a step-wide basis, for instance to get a callback when the
066 * step is complete. Those that want callbacks at the level of an individual
067 * tasks, can specify interceptors for the chunk operations.
068 *
069 * @author Dave Syer
070 * @author Lucas Ward
071 * @author Ben Hale
072 * @author Robert Kasanicky
073 * @author Michael Minella
074 * @author Will Schipp
075 * @author Mahmoud Ben Hassine
076 */
077@SuppressWarnings("serial")
078public class TaskletStep extends AbstractStep {
079
080        private static final Log logger = LogFactory.getLog(TaskletStep.class);
081
082        private RepeatOperations stepOperations = new RepeatTemplate();
083
084        private CompositeChunkListener chunkListener = new CompositeChunkListener();
085
086        // default to checking current thread for interruption.
087        private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy();
088
089        private CompositeItemStream stream = new CompositeItemStream();
090
091        private PlatformTransactionManager transactionManager;
092
093        private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute() {
094
095                @Override
096                public boolean rollbackOn(Throwable ex) {
097                        return true;
098                }
099
100        };
101
102        private Tasklet tasklet;
103
104        public static final String TASKLET_TYPE_KEY = "batch.taskletType";
105
106        /**
107         * Default constructor.
108         */
109        public TaskletStep() {
110                this(null);
111        }
112
113        /**
114         * @param name the name for the {@link TaskletStep}
115         */
116        public TaskletStep(String name) {
117                super(name);
118        }
119
120        /*
121         * (non-Javadoc)
122         *
123         * @see
124         * org.springframework.batch.core.step.AbstractStep#afterPropertiesSet()
125         */
126        @Override
127        public void afterPropertiesSet() throws Exception {
128                super.afterPropertiesSet();
129                Assert.state(transactionManager != null, "A transaction manager must be provided");
130        }
131
132        /**
133         * Public setter for the {@link PlatformTransactionManager}.
134         *
135         * @param transactionManager the transaction manager to set
136         */
137        public void setTransactionManager(PlatformTransactionManager transactionManager) {
138                this.transactionManager = transactionManager;
139        }
140
141        /**
142         * Public setter for the {@link TransactionAttribute}.
143         *
144         * @param transactionAttribute the {@link TransactionAttribute} to set
145         */
146        public void setTransactionAttribute(TransactionAttribute transactionAttribute) {
147                this.transactionAttribute = transactionAttribute;
148        }
149
150        /**
151         * Public setter for the {@link Tasklet}.
152         *
153         * @param tasklet the {@link Tasklet} to set
154         */
155        public void setTasklet(Tasklet tasklet) {
156                this.tasklet = tasklet;
157                if (tasklet instanceof StepExecutionListener) {
158                        registerStepExecutionListener((StepExecutionListener) tasklet);
159                }
160        }
161
162        /**
163         * Register a chunk listener for callbacks at the appropriate stages in a
164         * step execution.
165         *
166         * @param listener a {@link ChunkListener}
167         */
168        public void registerChunkListener(ChunkListener listener) {
169                this.chunkListener.register(listener);
170        }
171
172        /**
173         * Register each of the objects as listeners.
174         *
175         * @param listeners an array of listener objects of known types.
176         */
177        public void setChunkListeners(ChunkListener[] listeners) {
178                for (int i = 0; i < listeners.length; i++) {
179                        registerChunkListener(listeners[i]);
180                }
181        }
182
183        /**
184         * Register each of the streams for callbacks at the appropriate time in the
185         * step. The {@link ItemReader} and {@link ItemWriter} are automatically
186         * registered, but it doesn't hurt to also register them here. Injected
187         * dependencies of the reader and writer are not automatically registered,
188         * so if you implement {@link ItemWriter} using delegation to another object
189         * which itself is a {@link ItemStream}, you need to register the delegate
190         * here.
191         *
192         * @param streams an array of {@link ItemStream} objects.
193         */
194        public void setStreams(ItemStream[] streams) {
195                for (int i = 0; i < streams.length; i++) {
196                        registerStream(streams[i]);
197                }
198        }
199
200        /**
201         * Register a single {@link ItemStream} for callbacks to the stream
202         * interface.
203         *
204         * @param stream instance of {@link ItemStream}
205         */
206        public void registerStream(ItemStream stream) {
207                this.stream.register(stream);
208        }
209
210        /**
211         * The {@link RepeatOperations} to use for the outer loop of the batch
212         * processing. Should be set up by the caller through a factory. Defaults to
213         * a plain {@link RepeatTemplate}.
214         *
215         * @param stepOperations a {@link RepeatOperations} instance.
216         */
217        public void setStepOperations(RepeatOperations stepOperations) {
218                this.stepOperations = stepOperations;
219        }
220
221        /**
222         * Setter for the {@link StepInterruptionPolicy}. The policy is used to
223         * check whether an external request has been made to interrupt the job
224         * execution.
225         *
226         * @param interruptionPolicy a {@link StepInterruptionPolicy}
227         */
228        public void setInterruptionPolicy(StepInterruptionPolicy interruptionPolicy) {
229                this.interruptionPolicy = interruptionPolicy;
230        }
231
232        /**
233         * Process the step and update its context so that progress can be monitored
234         * by the caller. The step is broken down into chunks, each one executing in
235         * a transaction. The step and its execution and execution context are all
236         * given an up to date {@link BatchStatus}, and the {@link JobRepository} is
237         * used to store the result. Various reporting information are also added to
238         * the current context governing the step execution, which would normally be
239         * available to the caller through the step's {@link ExecutionContext}.<br>
240         *
241         * @throws JobInterruptedException if the step or a chunk is interrupted
242         * @throws RuntimeException if there is an exception during a chunk
243         * execution
244         *
245         */
246        @Override
247        protected void doExecute(StepExecution stepExecution) throws Exception {
248                stepExecution.getExecutionContext().put(TASKLET_TYPE_KEY, tasklet.getClass().getName());
249                stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());
250
251                stream.update(stepExecution.getExecutionContext());
252                getJobRepository().updateExecutionContext(stepExecution);
253
254                // Shared semaphore per step execution, so other step executions can run
255                // in parallel without needing the lock
256                final Semaphore semaphore = createSemaphore();
257
258                stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {
259
260                        @Override
261                        public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
262                                        throws Exception {
263
264                                StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
265
266                                // Before starting a new transaction, check for
267                                // interruption.
268                                interruptionPolicy.checkInterrupted(stepExecution);
269
270                                RepeatStatus result;
271                                try {
272                                        result = new TransactionTemplate(transactionManager, transactionAttribute)
273                                        .execute(new ChunkTransactionCallback(chunkContext, semaphore));
274                                }
275                                catch (UncheckedTransactionException e) {
276                                        // Allow checked exceptions to be thrown inside callback
277                                        throw (Exception) e.getCause();
278                                }
279
280                                chunkListener.afterChunk(chunkContext);
281
282                                // Check for interruption after transaction as well, so that
283                                // the interrupted exception is correctly propagated up to
284                                // caller
285                                interruptionPolicy.checkInterrupted(stepExecution);
286
287                                return result == null ? RepeatStatus.FINISHED : result;
288                        }
289
290                });
291
292        }
293
294        /**
295         * Extension point mainly for test purposes so that the behaviour of the
296         * lock can be manipulated to simulate various pathologies.
297         *
298         * @return a semaphore for locking access to the JobRepository
299         */
300        protected Semaphore createSemaphore() {
301                return new Semaphore(1);
302        }
303
304        @Override
305        protected void close(ExecutionContext ctx) throws Exception {
306                stream.close();
307        }
308
309        @Override
310        protected void open(ExecutionContext ctx) throws Exception {
311                stream.open(ctx);
312        }
313
314        /**
315         * retrieve the tasklet - helper method for JobOperator
316         * @return the {@link Tasklet} instance being executed within this step
317         */
318        public Tasklet getTasklet() {
319                return tasklet;
320        }
321
322        /**
323         * A callback for the transactional work inside a chunk. Also detects
324         * failures in the transaction commit and rollback, only panicking if the
325         * transaction status is unknown (i.e. if a commit failure leads to a clean
326         * rollback then we assume the state is consistent).
327         *
328         * @author Dave Syer
329         *
330         */
331        private class ChunkTransactionCallback extends TransactionSynchronizationAdapter implements TransactionCallback<RepeatStatus> {
332
333                private final StepExecution stepExecution;
334
335                private final ChunkContext chunkContext;
336
337                private boolean rolledBack = false;
338
339                private boolean stepExecutionUpdated = false;
340
341                private StepExecution oldVersion;
342
343                private boolean locked = false;
344
345                private final Semaphore semaphore;
346
347                public ChunkTransactionCallback(ChunkContext chunkContext, Semaphore semaphore) {
348                        this.chunkContext = chunkContext;
349                        this.stepExecution = chunkContext.getStepContext().getStepExecution();
350                        this.semaphore = semaphore;
351                }
352
353                @Override
354                public void afterCompletion(int status) {
355                        try {
356                                if (status != TransactionSynchronization.STATUS_COMMITTED) {
357                                        if (stepExecutionUpdated) {
358                                                // Wah! the commit failed. We need to rescue the step
359                                                // execution data.
360                                                logger.info("Commit failed while step execution data was already updated. "
361                                                                + "Reverting to old version.");
362                                                copy(oldVersion, stepExecution);
363                                                if (status == TransactionSynchronization.STATUS_ROLLED_BACK) {
364                                                        rollback(stepExecution);
365                                                }
366                                        }
367                                        chunkListener.afterChunkError(chunkContext);
368                                }
369
370                                if (status == TransactionSynchronization.STATUS_UNKNOWN) {
371                                        logger.error("Rolling back with transaction in unknown state");
372                                        rollback(stepExecution);
373                                        stepExecution.upgradeStatus(BatchStatus.UNKNOWN);
374                                        stepExecution.setTerminateOnly();
375                                }
376                        }
377                        finally {
378                                // Only release the lock if we acquired it, and release as late
379                                // as possible
380                                if (locked) {
381                                        semaphore.release();
382                                }
383
384                                locked = false;
385                        }
386                }
387
388                @Override
389                public RepeatStatus doInTransaction(TransactionStatus status) {
390                        TransactionSynchronizationManager.registerSynchronization(this);
391
392                        RepeatStatus result = RepeatStatus.CONTINUABLE;
393
394                        StepContribution contribution = stepExecution.createStepContribution();
395
396                        chunkListener.beforeChunk(chunkContext);
397
398                        // In case we need to push it back to its old value
399                        // after a commit fails...
400                        oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution());
401                        copy(stepExecution, oldVersion);
402
403                        try {
404
405                                try {
406                                        try {
407                                                result = tasklet.execute(contribution, chunkContext);
408                                                if (result == null) {
409                                                        result = RepeatStatus.FINISHED;
410                                                }
411                                        }
412                                        catch (Exception e) {
413                                                if (transactionAttribute.rollbackOn(e)) {
414                                                        chunkContext.setAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY, e);
415                                                        throw e;
416                                                }
417                                        }
418                                }
419                                finally {
420
421                                        // If the step operations are asynchronous then we need
422                                        // to synchronize changes to the step execution (at a
423                                        // minimum). Take the lock *before* changing the step
424                                        // execution.
425                                        try {
426                                                semaphore.acquire();
427                                                locked = true;
428                                        }
429                                        catch (InterruptedException e) {
430                                                logger.error("Thread interrupted while locking for repository update");
431                                                stepExecution.setStatus(BatchStatus.STOPPED);
432                                                stepExecution.setTerminateOnly();
433                                                Thread.currentThread().interrupt();
434                                        }
435
436                                        // Apply the contribution to the step
437                                        // even if unsuccessful
438                                        if (logger.isDebugEnabled()) {
439                                                logger.debug("Applying contribution: " + contribution);
440                                        }
441                                        stepExecution.apply(contribution);
442
443                                }
444
445                                stepExecutionUpdated = true;
446
447                                stream.update(stepExecution.getExecutionContext());
448
449                                try {
450                                        // Going to attempt a commit. If it fails this flag will
451                                        // stay false and we can use that later.
452                                        getJobRepository().updateExecutionContext(stepExecution);
453                                        stepExecution.incrementCommitCount();
454                                        if (logger.isDebugEnabled()) {
455                                                logger.debug("Saving step execution before commit: " + stepExecution);
456                                        }
457                                        getJobRepository().update(stepExecution);
458                                }
459                                catch (Exception e) {
460                                        // If we get to here there was a problem saving the step
461                                        // execution and we have to fail.
462                                        String msg = "JobRepository failure forcing rollback";
463                                        logger.error(msg, e);
464                                        throw new FatalStepExecutionException(msg, e);
465                                }
466                        }
467                        catch (Error e) {
468                                if (logger.isDebugEnabled()) {
469                                        logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage());
470                                }
471                                rollback(stepExecution);
472                                throw e;
473                        }
474                        catch (RuntimeException e) {
475                                if (logger.isDebugEnabled()) {
476                                        logger.debug("Rollback for RuntimeException: " + e.getClass().getName() + ": " + e.getMessage());
477                                }
478                                rollback(stepExecution);
479                                throw e;
480                        }
481                        catch (Exception e) {
482                                if (logger.isDebugEnabled()) {
483                                        logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage());
484                                }
485                                rollback(stepExecution);
486                                // Allow checked exceptions
487                                throw new UncheckedTransactionException(e);
488                        }
489
490                        return result;
491
492                }
493
494                private void rollback(StepExecution stepExecution) {
495                        if (!rolledBack) {
496                                stepExecution.incrementRollbackCount();
497                                rolledBack = true;
498                        }
499                }
500
501                private void copy(final StepExecution source, final StepExecution target) {
502                        target.setVersion(source.getVersion());
503                        target.setWriteCount(source.getWriteCount());
504                        target.setFilterCount(source.getFilterCount());
505                        target.setCommitCount(source.getCommitCount());
506                        target.setExecutionContext(new ExecutionContext(source.getExecutionContext()));
507                }
508
509        }
510}