001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.launch.support;
017
018import java.util.ArrayList;
019import java.util.Date;
020import java.util.LinkedHashMap;
021import java.util.LinkedHashSet;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import java.util.TreeSet;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029import org.springframework.batch.core.BatchStatus;
030import org.springframework.batch.core.Job;
031import org.springframework.batch.core.JobExecution;
032import org.springframework.batch.core.JobInstance;
033import org.springframework.batch.core.JobParameters;
034import org.springframework.batch.core.JobParametersBuilder;
035import org.springframework.batch.core.JobParametersInvalidException;
036import org.springframework.batch.core.Step;
037import org.springframework.batch.core.StepExecution;
038import org.springframework.batch.core.UnexpectedJobExecutionException;
039import org.springframework.batch.core.configuration.JobRegistry;
040import org.springframework.batch.core.configuration.ListableJobLocator;
041import org.springframework.batch.core.converter.DefaultJobParametersConverter;
042import org.springframework.batch.core.converter.JobParametersConverter;
043import org.springframework.batch.core.explore.JobExplorer;
044import org.springframework.batch.core.launch.JobExecutionNotRunningException;
045import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException;
046import org.springframework.batch.core.launch.JobLauncher;
047import org.springframework.batch.core.launch.JobOperator;
048import org.springframework.batch.core.launch.NoSuchJobException;
049import org.springframework.batch.core.launch.NoSuchJobExecutionException;
050import org.springframework.batch.core.launch.NoSuchJobInstanceException;
051import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
052import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
053import org.springframework.batch.core.repository.JobRepository;
054import org.springframework.batch.core.repository.JobRestartException;
055import org.springframework.batch.core.scope.context.StepSynchronizationManager;
056import org.springframework.batch.core.step.NoSuchStepException;
057import org.springframework.batch.core.step.StepLocator;
058import org.springframework.batch.core.step.tasklet.StoppableTasklet;
059import org.springframework.batch.core.step.tasklet.Tasklet;
060import org.springframework.batch.core.step.tasklet.TaskletStep;
061import org.springframework.batch.support.PropertiesConverter;
062import org.springframework.beans.factory.InitializingBean;
063import org.springframework.transaction.annotation.Transactional;
064import org.springframework.util.Assert;
065
066/**
067 * Simple implementation of the JobOperator interface.  Due to the amount of
068 * functionality the implementation is combining, the following dependencies
069 * are required:
070 *
071 * <ul>
072 *      <li> {@link JobLauncher}
073 *  <li> {@link JobExplorer}
074 *  <li> {@link JobRepository}
075 *  <li> {@link JobRegistry}
076 * </ul>
077 *
078 * @author Dave Syer
079 * @author Lucas Ward
080 * @author Will Schipp
081 * @author Mahmoud Ben Hassine
082 * @since 2.0
083 */
084public class SimpleJobOperator implements JobOperator, InitializingBean {
085
086        private static final String ILLEGAL_STATE_MSG = "Illegal state (only happens on a race condition): "
087                        + "%s with name=%s and parameters=%s";
088
089        private ListableJobLocator jobRegistry;
090
091        private JobExplorer jobExplorer;
092
093        private JobLauncher jobLauncher;
094
095        private JobRepository jobRepository;
096
097        private JobParametersConverter jobParametersConverter = new DefaultJobParametersConverter();
098
099        private final Log logger = LogFactory.getLog(getClass());
100
101        /**
102         * Check mandatory properties.
103         *
104         * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
105         */
106        @Override
107        public void afterPropertiesSet() throws Exception {
108                Assert.notNull(jobLauncher, "JobLauncher must be provided");
109                Assert.notNull(jobRegistry, "JobLocator must be provided");
110                Assert.notNull(jobExplorer, "JobExplorer must be provided");
111                Assert.notNull(jobRepository, "JobRepository must be provided");
112        }
113
114        /**
115         * Public setter for the {@link JobParametersConverter}.
116         * @param jobParametersConverter the {@link JobParametersConverter} to set
117         */
118        public void setJobParametersConverter(JobParametersConverter jobParametersConverter) {
119                this.jobParametersConverter = jobParametersConverter;
120        }
121
122        /**
123         * Public setter for the {@link ListableJobLocator}.
124         * @param jobRegistry the {@link ListableJobLocator} to set
125         */
126        public void setJobRegistry(ListableJobLocator jobRegistry) {
127                this.jobRegistry = jobRegistry;
128        }
129
130        /**
131         * Public setter for the {@link JobExplorer}.
132         * @param jobExplorer the {@link JobExplorer} to set
133         */
134        public void setJobExplorer(JobExplorer jobExplorer) {
135                this.jobExplorer = jobExplorer;
136        }
137
138        public void setJobRepository(JobRepository jobRepository) {
139                this.jobRepository = jobRepository;
140        }
141
142        /**
143         * Public setter for the {@link JobLauncher}.
144         * @param jobLauncher the {@link JobLauncher} to set
145         */
146        public void setJobLauncher(JobLauncher jobLauncher) {
147                this.jobLauncher = jobLauncher;
148        }
149
150        /*
151         * (non-Javadoc)
152         *
153         * @see org.springframework.batch.core.launch.JobOperator#getExecutions(java.lang.Long)
154         */
155        @Override
156        public List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException {
157                JobInstance jobInstance = jobExplorer.getJobInstance(instanceId);
158                if (jobInstance == null) {
159                        throw new NoSuchJobInstanceException(String.format("No job instance with id=%d", instanceId));
160                }
161                List<Long> list = new ArrayList<Long>();
162                for (JobExecution jobExecution : jobExplorer.getJobExecutions(jobInstance)) {
163                        list.add(jobExecution.getId());
164                }
165                return list;
166        }
167
168        /*
169         * (non-Javadoc)
170         *
171         * @see org.springframework.batch.core.launch.JobOperator#getJobNames()
172         */
173        @Override
174        public Set<String> getJobNames() {
175                return new TreeSet<String>(jobRegistry.getJobNames());
176        }
177
178        /*
179         * (non-Javadoc)
180         *
181         * @see JobOperator#getLastInstances(String, int, int)
182         */
183        @Override
184        public List<Long> getJobInstances(String jobName, int start, int count) throws NoSuchJobException {
185                List<Long> list = new ArrayList<Long>();
186                List<JobInstance> jobInstances = jobExplorer.getJobInstances(jobName, start, count);
187                for (JobInstance jobInstance : jobInstances) {
188                        list.add(jobInstance.getId());
189                }
190                if (list.isEmpty() && !jobRegistry.getJobNames().contains(jobName)) {
191                        throw new NoSuchJobException("No such job (either in registry or in historical data): " + jobName);
192                }
193                return list;
194        }
195
196        /*
197         * (non-Javadoc)
198         *
199         * @see
200         * org.springframework.batch.core.launch.JobOperator#getParameters(java.
201         * lang.Long)
202         */
203        @Override
204        public String getParameters(long executionId) throws NoSuchJobExecutionException {
205                JobExecution jobExecution = findExecutionById(executionId);
206
207                return PropertiesConverter.propertiesToString(jobParametersConverter.getProperties(jobExecution
208                                .getJobParameters()));
209        }
210
211        /*
212         * (non-Javadoc)
213         *
214         * @see
215         * org.springframework.batch.core.launch.JobOperator#getRunningExecutions
216         * (java.lang.String)
217         */
218        @Override
219        public Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException {
220                Set<Long> set = new LinkedHashSet<Long>();
221                for (JobExecution jobExecution : jobExplorer.findRunningJobExecutions(jobName)) {
222                        set.add(jobExecution.getId());
223                }
224                if (set.isEmpty() && !jobRegistry.getJobNames().contains(jobName)) {
225                        throw new NoSuchJobException("No such job (either in registry or in historical data): " + jobName);
226                }
227                return set;
228        }
229
230        /*
231         * (non-Javadoc)
232         *
233         * @see
234         * org.springframework.batch.core.launch.JobOperator#getStepExecutionSummaries
235         * (java.lang.Long)
236         */
237        @Override
238        public Map<Long, String> getStepExecutionSummaries(long executionId) throws NoSuchJobExecutionException {
239                JobExecution jobExecution = findExecutionById(executionId);
240
241                Map<Long, String> map = new LinkedHashMap<Long, String>();
242                for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
243                        map.put(stepExecution.getId(), stepExecution.toString());
244                }
245                return map;
246        }
247
248        /*
249         * (non-Javadoc)
250         *
251         * @see
252         * org.springframework.batch.core.launch.JobOperator#getSummary(java.lang
253         * .Long)
254         */
255        @Override
256        public String getSummary(long executionId) throws NoSuchJobExecutionException {
257                JobExecution jobExecution = findExecutionById(executionId);
258                return jobExecution.toString();
259        }
260
261        /*
262         * (non-Javadoc)
263         *
264         * @see
265         * org.springframework.batch.core.launch.JobOperator#resume(java.lang.Long)
266         */
267        @Override
268        public Long restart(long executionId) throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException, NoSuchJobException, JobRestartException, JobParametersInvalidException {
269
270                logger.info("Checking status of job execution with id=" + executionId);
271
272                JobExecution jobExecution = findExecutionById(executionId);
273
274                String jobName = jobExecution.getJobInstance().getJobName();
275                Job job = jobRegistry.getJob(jobName);
276                JobParameters parameters = jobExecution.getJobParameters();
277
278                logger.info(String.format("Attempting to resume job with name=%s and parameters=%s", jobName, parameters));
279                try {
280                        return jobLauncher.run(job, parameters).getId();
281                }
282                catch (JobExecutionAlreadyRunningException e) {
283                        throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job execution already running",
284                                        jobName, parameters), e);
285                }
286
287        }
288
289        /*
290         * (non-Javadoc)
291         *
292         * @see
293         * org.springframework.batch.core.launch.JobOperator#start(java.lang.String,
294         * java.lang.String)
295         */
296        @Override
297        public Long start(String jobName, String parameters) throws NoSuchJobException, JobInstanceAlreadyExistsException, JobParametersInvalidException {
298
299                logger.info("Checking status of job with name=" + jobName);
300
301                JobParameters jobParameters = jobParametersConverter.getJobParameters(PropertiesConverter
302                                .stringToProperties(parameters));
303
304                if (jobRepository.isJobInstanceExists(jobName, jobParameters)) {
305                        throw new JobInstanceAlreadyExistsException(String.format(
306                                        "Cannot start a job instance that already exists with name=%s and parameters=%s", jobName,
307                                        parameters));
308                }
309
310                Job job = jobRegistry.getJob(jobName);
311
312                logger.info(String.format("Attempting to launch job with name=%s and parameters=%s", jobName, parameters));
313                try {
314                        return jobLauncher.run(job, jobParameters).getId();
315                }
316                catch (JobExecutionAlreadyRunningException e) {
317                        throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job execution already running",
318                                        jobName, parameters), e);
319                }
320                catch (JobRestartException e) {
321                        throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job not restartable", jobName,
322                                        parameters), e);
323                }
324                catch (JobInstanceAlreadyCompleteException e) {
325                        throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job already complete", jobName,
326                                        parameters), e);
327                }
328
329        }
330
331        /*
332         * (non-Javadoc)
333         *
334         * @see JobOperator#startNextInstance(String )
335         */
336        @Override
337        public Long startNextInstance(String jobName) throws NoSuchJobException,
338        UnexpectedJobExecutionException, JobParametersInvalidException {
339
340                logger.info("Locating parameters for next instance of job with name=" + jobName);
341
342                Job job = jobRegistry.getJob(jobName);
343                JobParameters parameters = new JobParametersBuilder(jobExplorer)
344                                .getNextJobParameters(job)
345                                .toJobParameters();
346
347                logger.info(String.format("Attempting to launch job with name=%s and parameters=%s", jobName, parameters));
348                try {
349                        return jobLauncher.run(job, parameters).getId();
350                }
351                catch (JobExecutionAlreadyRunningException e) {
352                        throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job already running", jobName,
353                                        parameters), e);
354                }
355                catch (JobRestartException e) {
356                        throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job not restartable", jobName,
357                                        parameters), e);
358                }
359                catch (JobInstanceAlreadyCompleteException e) {
360                        throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job instance already complete",
361                                        jobName, parameters), e);
362                }
363
364        }
365
366        /*
367         * (non-Javadoc)
368         *
369         * @see
370         * org.springframework.batch.core.launch.JobOperator#stop(java.lang.Long)
371         */
372        @Override
373        @Transactional
374        public boolean stop(long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException {
375
376                JobExecution jobExecution = findExecutionById(executionId);
377                // Indicate the execution should be stopped by setting it's status to
378                // 'STOPPING'. It is assumed that
379                // the step implementation will check this status at chunk boundaries.
380                BatchStatus status = jobExecution.getStatus();
381                if (!(status == BatchStatus.STARTED || status == BatchStatus.STARTING)) {
382                        throw new JobExecutionNotRunningException("JobExecution must be running so that it can be stopped: "+jobExecution);
383                }
384                jobExecution.setStatus(BatchStatus.STOPPING);
385                jobRepository.update(jobExecution);
386
387                try {
388                        Job job = jobRegistry.getJob(jobExecution.getJobInstance().getJobName());
389                        if (job instanceof StepLocator) {//can only process as StepLocator is the only way to get the step object
390                                //get the current stepExecution
391                                for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
392                                        if (stepExecution.getStatus().isRunning()) {
393                                                try {
394                                                        //have the step execution that's running -> need to 'stop' it
395                                                        Step step = ((StepLocator)job).getStep(stepExecution.getStepName());
396                                                        if (step instanceof TaskletStep) {
397                                                                Tasklet tasklet = ((TaskletStep)step).getTasklet();
398                                                                if (tasklet instanceof StoppableTasklet) {
399                                                                        StepSynchronizationManager.register(stepExecution);
400                                                                        ((StoppableTasklet)tasklet).stop();
401                                                                        StepSynchronizationManager.release();
402                                                                }
403                                                        }
404                                                }
405                                                catch (NoSuchStepException e) {
406                                                        logger.warn("Step not found",e);
407                                                }
408                                        }
409                                }
410                        }
411                }
412                catch (NoSuchJobException e) {
413                        logger.warn("Cannot find Job object in the job registry. StoppableTasklet#stop() will not be called",e);
414                }
415
416                return true;
417        }
418
419        @Override
420        public JobExecution abandon(long jobExecutionId) throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException {
421                JobExecution jobExecution = findExecutionById(jobExecutionId);
422
423                if (jobExecution.getStatus().isLessThan(BatchStatus.STOPPING)) {
424                        throw new JobExecutionAlreadyRunningException(
425                                        "JobExecution is running or complete and therefore cannot be aborted");
426                }
427
428                logger.info("Aborting job execution: " + jobExecution);
429                jobExecution.upgradeStatus(BatchStatus.ABANDONED);
430                jobExecution.setEndTime(new Date());
431                jobRepository.update(jobExecution);
432
433                return jobExecution;
434        }
435
436        private JobExecution findExecutionById(long executionId) throws NoSuchJobExecutionException {
437                JobExecution jobExecution = jobExplorer.getJobExecution(executionId);
438
439                if (jobExecution == null) {
440                        throw new NoSuchJobExecutionException("No JobExecution found for id: [" + executionId + "]");
441                }
442                return jobExecution;
443
444        }
445}