001/*
002 * Copyright 2012-2016 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.boot.autoconfigure.batch;
018
019import java.util.Arrays;
020import java.util.Collection;
021import java.util.Collections;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.Properties;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029
030import org.springframework.batch.core.BatchStatus;
031import org.springframework.batch.core.Job;
032import org.springframework.batch.core.JobExecution;
033import org.springframework.batch.core.JobExecutionException;
034import org.springframework.batch.core.JobInstance;
035import org.springframework.batch.core.JobParameter;
036import org.springframework.batch.core.JobParameters;
037import org.springframework.batch.core.JobParametersIncrementer;
038import org.springframework.batch.core.JobParametersInvalidException;
039import org.springframework.batch.core.configuration.JobRegistry;
040import org.springframework.batch.core.converter.DefaultJobParametersConverter;
041import org.springframework.batch.core.converter.JobParametersConverter;
042import org.springframework.batch.core.explore.JobExplorer;
043import org.springframework.batch.core.launch.JobLauncher;
044import org.springframework.batch.core.launch.JobParametersNotFoundException;
045import org.springframework.batch.core.launch.NoSuchJobException;
046import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
047import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
048import org.springframework.batch.core.repository.JobRestartException;
049import org.springframework.beans.factory.annotation.Autowired;
050import org.springframework.boot.CommandLineRunner;
051import org.springframework.context.ApplicationEventPublisher;
052import org.springframework.context.ApplicationEventPublisherAware;
053import org.springframework.util.PatternMatchUtils;
054import org.springframework.util.StringUtils;
055
056/**
057 * {@link CommandLineRunner} to {@link JobLauncher launch} Spring Batch jobs. Runs all
058 * jobs in the surrounding context by default. Can also be used to launch a specific job
059 * by providing a jobName
060 *
061 * @author Dave Syer
062 * @author Jean-Pierre Bergamin
063 */
064public class JobLauncherCommandLineRunner
065                implements CommandLineRunner, ApplicationEventPublisherAware {
066
067        private static final Log logger = LogFactory
068                        .getLog(JobLauncherCommandLineRunner.class);
069
070        private JobParametersConverter converter = new DefaultJobParametersConverter();
071
072        private JobLauncher jobLauncher;
073
074        private JobRegistry jobRegistry;
075
076        private JobExplorer jobExplorer;
077
078        private String jobNames;
079
080        private Collection<Job> jobs = Collections.emptySet();
081
082        private ApplicationEventPublisher publisher;
083
084        public JobLauncherCommandLineRunner(JobLauncher jobLauncher,
085                        JobExplorer jobExplorer) {
086                this.jobLauncher = jobLauncher;
087                this.jobExplorer = jobExplorer;
088        }
089
090        public void setJobNames(String jobNames) {
091                this.jobNames = jobNames;
092        }
093
094        @Override
095        public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
096                this.publisher = publisher;
097        }
098
099        @Autowired(required = false)
100        public void setJobRegistry(JobRegistry jobRegistry) {
101                this.jobRegistry = jobRegistry;
102        }
103
104        @Autowired(required = false)
105        public void setJobParametersConverter(JobParametersConverter converter) {
106                this.converter = converter;
107        }
108
109        @Autowired(required = false)
110        public void setJobs(Collection<Job> jobs) {
111                this.jobs = jobs;
112        }
113
114        @Override
115        public void run(String... args) throws JobExecutionException {
116                logger.info("Running default command line with: " + Arrays.asList(args));
117                launchJobFromProperties(StringUtils.splitArrayElementsIntoProperties(args, "="));
118        }
119
120        protected void launchJobFromProperties(Properties properties)
121                        throws JobExecutionException {
122                JobParameters jobParameters = this.converter.getJobParameters(properties);
123                executeLocalJobs(jobParameters);
124                executeRegisteredJobs(jobParameters);
125        }
126
127        private JobParameters getNextJobParameters(Job job,
128                        JobParameters additionalParameters) {
129                String name = job.getName();
130                JobParameters parameters = new JobParameters();
131                List<JobInstance> lastInstances = this.jobExplorer.getJobInstances(name, 0, 1);
132                JobParametersIncrementer incrementer = job.getJobParametersIncrementer();
133                Map<String, JobParameter> additionals = additionalParameters.getParameters();
134                if (lastInstances.isEmpty()) {
135                        // Start from a completely clean sheet
136                        if (incrementer != null) {
137                                parameters = incrementer.getNext(new JobParameters());
138                        }
139                }
140                else {
141                        List<JobExecution> previousExecutions = this.jobExplorer
142                                        .getJobExecutions(lastInstances.get(0));
143                        JobExecution previousExecution = previousExecutions.get(0);
144                        if (previousExecution == null) {
145                                // Normally this will not happen - an instance exists with no executions
146                                if (incrementer != null) {
147                                        parameters = incrementer.getNext(new JobParameters());
148                                }
149                        }
150                        else if (isStoppedOrFailed(previousExecution) && job.isRestartable()) {
151                                // Retry a failed or stopped execution
152                                parameters = previousExecution.getJobParameters();
153                                // Non-identifying additional parameters can be removed to a retry
154                                removeNonIdentifying(additionals);
155                        }
156                        else if (incrementer != null) {
157                                // New instance so increment the parameters if we can
158                                parameters = incrementer.getNext(previousExecution.getJobParameters());
159                        }
160                }
161                return merge(parameters, additionals);
162        }
163
164        private boolean isStoppedOrFailed(JobExecution execution) {
165                BatchStatus status = execution.getStatus();
166                return (status == BatchStatus.STOPPED || status == BatchStatus.FAILED);
167        }
168
169        private void removeNonIdentifying(Map<String, JobParameter> parameters) {
170                HashMap<String, JobParameter> copy = new HashMap<String, JobParameter>(
171                                parameters);
172                for (Map.Entry<String, JobParameter> parameter : copy.entrySet()) {
173                        if (!parameter.getValue().isIdentifying()) {
174                                parameters.remove(parameter.getKey());
175                        }
176                }
177        }
178
179        private JobParameters merge(JobParameters parameters,
180                        Map<String, JobParameter> additionals) {
181                Map<String, JobParameter> merged = new HashMap<String, JobParameter>();
182                merged.putAll(parameters.getParameters());
183                merged.putAll(additionals);
184                return new JobParameters(merged);
185        }
186
187        private void executeRegisteredJobs(JobParameters jobParameters)
188                        throws JobExecutionException {
189                if (this.jobRegistry != null && StringUtils.hasText(this.jobNames)) {
190                        String[] jobsToRun = this.jobNames.split(",");
191                        for (String jobName : jobsToRun) {
192                                try {
193                                        Job job = this.jobRegistry.getJob(jobName);
194                                        if (this.jobs.contains(job)) {
195                                                continue;
196                                        }
197                                        execute(job, jobParameters);
198                                }
199                                catch (NoSuchJobException ex) {
200                                        logger.debug("No job found in registry for job name: " + jobName);
201                                }
202                        }
203                }
204        }
205
206        protected void execute(Job job, JobParameters jobParameters)
207                        throws JobExecutionAlreadyRunningException, JobRestartException,
208                        JobInstanceAlreadyCompleteException, JobParametersInvalidException,
209                        JobParametersNotFoundException {
210                JobParameters nextParameters = getNextJobParameters(job, jobParameters);
211                JobExecution execution = this.jobLauncher.run(job, nextParameters);
212                if (this.publisher != null) {
213                        this.publisher.publishEvent(new JobExecutionEvent(execution));
214                }
215        }
216
217        private void executeLocalJobs(JobParameters jobParameters)
218                        throws JobExecutionException {
219                for (Job job : this.jobs) {
220                        if (StringUtils.hasText(this.jobNames)) {
221                                String[] jobsToRun = this.jobNames.split(",");
222                                if (!PatternMatchUtils.simpleMatch(jobsToRun, job.getName())) {
223                                        logger.debug("Skipped job: " + job.getName());
224                                        continue;
225                                }
226                        }
227                        execute(job, jobParameters);
228                }
229        }
230
231}