001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.launch.support;
017
018import org.apache.commons.logging.Log;
019import org.apache.commons.logging.LogFactory;
020import org.springframework.batch.core.BatchStatus;
021import org.springframework.batch.core.ExitStatus;
022import org.springframework.batch.core.Job;
023import org.springframework.batch.core.JobExecution;
024import org.springframework.batch.core.JobInstance;
025import org.springframework.batch.core.JobParameters;
026import org.springframework.batch.core.JobParametersInvalidException;
027import org.springframework.batch.core.StepExecution;
028import org.springframework.batch.core.launch.JobLauncher;
029import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
030import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
031import org.springframework.batch.core.repository.JobRepository;
032import org.springframework.batch.core.repository.JobRestartException;
033import org.springframework.beans.factory.InitializingBean;
034import org.springframework.core.task.SyncTaskExecutor;
035import org.springframework.core.task.TaskExecutor;
036import org.springframework.core.task.TaskRejectedException;
037import org.springframework.util.Assert;
038
039/**
040 * Simple implementation of the {@link JobLauncher} interface. The Spring Core
041 * {@link TaskExecutor} interface is used to launch a {@link Job}. This means
042 * that the type of executor set is very important. If a
043 * {@link SyncTaskExecutor} is used, then the job will be processed
044 * <strong>within the same thread that called the launcher.</strong> Care should
045 * be taken to ensure any users of this class understand fully whether or not
046 * the implementation of TaskExecutor used will start tasks synchronously or
047 * asynchronously. The default setting uses a synchronous task executor.
048 *
049 * There is only one required dependency of this Launcher, a
050 * {@link JobRepository}. The JobRepository is used to obtain a valid
051 * JobExecution. The Repository must be used because the provided {@link Job}
052 * could be a restart of an existing {@link JobInstance}, and only the
053 * Repository can reliably recreate it.
054 *
055 * @author Lucas Ward
056 * @author Dave Syer
057 * @author Will Schipp
058 * @author Michael Minella
059 * @author Mahmoud Ben Hassine
060 *
061 * @since 1.0
062 *
063 * @see JobRepository
064 * @see TaskExecutor
065 */
066public class SimpleJobLauncher implements JobLauncher, InitializingBean {
067
068        protected static final Log logger = LogFactory.getLog(SimpleJobLauncher.class);
069
070        private JobRepository jobRepository;
071
072        private TaskExecutor taskExecutor;
073
074        /**
075         * Run the provided job with the given {@link JobParameters}. The
076         * {@link JobParameters} will be used to determine if this is an execution
077         * of an existing job instance, or if a new one should be created.
078         *
079         * @param job the job to be run.
080         * @param jobParameters the {@link JobParameters} for this particular
081         * execution.
082         * @return the {@link JobExecution} if it returns synchronously. If the
083         * implementation is asynchronous, the status might well be unknown.
084         * @throws JobExecutionAlreadyRunningException if the JobInstance already
085         * exists and has an execution already running.
086         * @throws JobRestartException if the execution would be a re-start, but a
087         * re-start is either not allowed or not needed.
088         * @throws JobInstanceAlreadyCompleteException if this instance has already
089         * completed successfully
090         * @throws JobParametersInvalidException thrown if jobParameters is invalid.
091         */
092        @Override
093        public JobExecution run(final Job job, final JobParameters jobParameters)
094                        throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
095                        JobParametersInvalidException {
096
097                Assert.notNull(job, "The Job must not be null.");
098                Assert.notNull(jobParameters, "The JobParameters must not be null.");
099
100                final JobExecution jobExecution;
101                JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters);
102                if (lastExecution != null) {
103                        if (!job.isRestartable()) {
104                                throw new JobRestartException("JobInstance already exists and is not restartable");
105                        }
106                        /*
107                         * validate here if it has stepExecutions that are UNKNOWN, STARTING, STARTED and STOPPING
108                         * retrieve the previous execution and check
109                         */
110                        for (StepExecution execution : lastExecution.getStepExecutions()) {
111                                BatchStatus status = execution.getStatus();
112                                if (status.isRunning() || status == BatchStatus.STOPPING) {
113                                        throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
114                                                        + lastExecution);
115                                } else if (status == BatchStatus.UNKNOWN) {
116                                        throw new JobRestartException(
117                                                        "Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. "
118                                                                + "The last execution ended with a failure that could not be rolled back, "
119                                                                + "so it may be dangerous to proceed. Manual intervention is probably necessary.");
120                                }
121                        }
122                }
123
124                // Check the validity of the parameters before doing creating anything
125                // in the repository...
126                job.getJobParametersValidator().validate(jobParameters);
127
128                /*
129                 * There is a very small probability that a non-restartable job can be
130                 * restarted, but only if another process or thread manages to launch
131                 * <i>and</i> fail a job execution for this instance between the last
132                 * assertion and the next method returning successfully.
133                 */
134                jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
135
136                try {
137                        taskExecutor.execute(new Runnable() {
138
139                                @Override
140                                public void run() {
141                                        try {
142                                                logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
143                                                                + "]");
144                                                job.execute(jobExecution);
145                                                logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters
146                                                                + "] and the following status: [" + jobExecution.getStatus() + "]");
147                                        }
148                                        catch (Throwable t) {
149                                                logger.info("Job: [" + job
150                                                                + "] failed unexpectedly and fatally with the following parameters: [" + jobParameters
151                                                                + "]", t);
152                                                rethrow(t);
153                                        }
154                                }
155
156                                private void rethrow(Throwable t) {
157                                        if (t instanceof RuntimeException) {
158                                                throw (RuntimeException) t;
159                                        }
160                                        else if (t instanceof Error) {
161                                                throw (Error) t;
162                                        }
163                                        throw new IllegalStateException(t);
164                                }
165                        });
166                }
167                catch (TaskRejectedException e) {
168                        jobExecution.upgradeStatus(BatchStatus.FAILED);
169                        if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) {
170                                jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e));
171                        }
172                        jobRepository.update(jobExecution);
173                }
174
175                return jobExecution;
176        }
177
178        /**
179         * Set the JobRepository.
180         *
181         * @param jobRepository instance of {@link JobRepository}.
182         */
183        public void setJobRepository(JobRepository jobRepository) {
184                this.jobRepository = jobRepository;
185        }
186
187        /**
188         * Set the TaskExecutor. (Optional)
189         *
190         * @param taskExecutor instance of {@link TaskExecutor}.
191         */
192        public void setTaskExecutor(TaskExecutor taskExecutor) {
193                this.taskExecutor = taskExecutor;
194        }
195
196        /**
197         * Ensure the required dependencies of a {@link JobRepository} have been
198         * set.
199         */
200        @Override
201        public void afterPropertiesSet() throws Exception {
202                Assert.state(jobRepository != null, "A JobRepository has not been set.");
203                if (taskExecutor == null) {
204                        logger.info("No TaskExecutor has been set, defaulting to synchronous executor.");
205                        taskExecutor = new SyncTaskExecutor();
206                }
207        }
208}