001/*
002 * Copyright 2012-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.boot.autoconfigure.batch;
018
019import java.util.Arrays;
020import java.util.Collection;
021import java.util.Collections;
022import java.util.HashMap;
023import java.util.LinkedHashMap;
024import java.util.Map;
025import java.util.Properties;
026
027import org.apache.commons.logging.Log;
028import org.apache.commons.logging.LogFactory;
029
030import org.springframework.batch.core.BatchStatus;
031import org.springframework.batch.core.Job;
032import org.springframework.batch.core.JobExecution;
033import org.springframework.batch.core.JobExecutionException;
034import org.springframework.batch.core.JobParameter;
035import org.springframework.batch.core.JobParameters;
036import org.springframework.batch.core.JobParametersBuilder;
037import org.springframework.batch.core.JobParametersInvalidException;
038import org.springframework.batch.core.configuration.JobRegistry;
039import org.springframework.batch.core.converter.DefaultJobParametersConverter;
040import org.springframework.batch.core.converter.JobParametersConverter;
041import org.springframework.batch.core.explore.JobExplorer;
042import org.springframework.batch.core.launch.JobLauncher;
043import org.springframework.batch.core.launch.JobParametersNotFoundException;
044import org.springframework.batch.core.launch.NoSuchJobException;
045import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
046import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
047import org.springframework.batch.core.repository.JobRepository;
048import org.springframework.batch.core.repository.JobRestartException;
049import org.springframework.beans.factory.annotation.Autowired;
050import org.springframework.boot.CommandLineRunner;
051import org.springframework.context.ApplicationEventPublisher;
052import org.springframework.context.ApplicationEventPublisherAware;
053import org.springframework.core.Ordered;
054import org.springframework.util.Assert;
055import org.springframework.util.PatternMatchUtils;
056import org.springframework.util.StringUtils;
057
058/**
059 * {@link CommandLineRunner} to {@link JobLauncher launch} Spring Batch jobs. Runs all
060 * jobs in the surrounding context by default. Can also be used to launch a specific job
061 * by providing a jobName
062 *
063 * @author Dave Syer
064 * @author Jean-Pierre Bergamin
065 * @author Mahmoud Ben Hassine
066 */
067public class JobLauncherCommandLineRunner
068                implements CommandLineRunner, Ordered, ApplicationEventPublisherAware {
069
070        /**
071         * The default order for the command line runner.
072         */
073        public static final int DEFAULT_ORDER = 0;
074
075        private static final Log logger = LogFactory
076                        .getLog(JobLauncherCommandLineRunner.class);
077
078        private JobParametersConverter converter = new DefaultJobParametersConverter();
079
080        private final JobLauncher jobLauncher;
081
082        private final JobExplorer jobExplorer;
083
084        private final JobRepository jobRepository;
085
086        private JobRegistry jobRegistry;
087
088        private String jobNames;
089
090        private Collection<Job> jobs = Collections.emptySet();
091
092        private int order = DEFAULT_ORDER;
093
094        private ApplicationEventPublisher publisher;
095
096        /**
097         * Create a new {@link JobLauncherCommandLineRunner}.
098         * @param jobLauncher to launch jobs
099         * @param jobExplorer to check the job repository for previous executions
100         * @deprecated since 2.0.7 in favor of
101         * {@link #JobLauncherCommandLineRunner(JobLauncher, JobExplorer, JobRepository)}. A
102         * job repository is required to check if a job instance exists with the given
103         * parameters when running a job (which is not possible with the job explorer).
104         */
105        @Deprecated
106        public JobLauncherCommandLineRunner(JobLauncher jobLauncher,
107                        JobExplorer jobExplorer) {
108                this.jobLauncher = jobLauncher;
109                this.jobExplorer = jobExplorer;
110                this.jobRepository = null;
111        }
112
113        /**
114         * Create a new {@link JobLauncherCommandLineRunner}.
115         * @param jobLauncher to launch jobs
116         * @param jobExplorer to check the job repository for previous executions
117         * @param jobRepository to check if a job instance exists with the given parameters
118         * when running a job
119         */
120        public JobLauncherCommandLineRunner(JobLauncher jobLauncher, JobExplorer jobExplorer,
121                        JobRepository jobRepository) {
122                Assert.notNull(jobLauncher, "JobLauncher must not be null");
123                Assert.notNull(jobExplorer, "JobExplorer must not be null");
124                Assert.notNull(jobRepository, "JobRepository must not be null");
125                this.jobLauncher = jobLauncher;
126                this.jobExplorer = jobExplorer;
127                this.jobRepository = jobRepository;
128        }
129
130        public void setOrder(int order) {
131                this.order = order;
132        }
133
134        @Override
135        public int getOrder() {
136                return this.order;
137        }
138
139        @Override
140        public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
141                this.publisher = publisher;
142        }
143
144        @Autowired(required = false)
145        public void setJobRegistry(JobRegistry jobRegistry) {
146                this.jobRegistry = jobRegistry;
147        }
148
149        public void setJobNames(String jobNames) {
150                this.jobNames = jobNames;
151        }
152
153        @Autowired(required = false)
154        public void setJobParametersConverter(JobParametersConverter converter) {
155                this.converter = converter;
156        }
157
158        @Autowired(required = false)
159        public void setJobs(Collection<Job> jobs) {
160                this.jobs = jobs;
161        }
162
163        @Override
164        public void run(String... args) throws JobExecutionException {
165                logger.info("Running default command line with: " + Arrays.asList(args));
166                launchJobFromProperties(StringUtils.splitArrayElementsIntoProperties(args, "="));
167        }
168
169        protected void launchJobFromProperties(Properties properties)
170                        throws JobExecutionException {
171                JobParameters jobParameters = this.converter.getJobParameters(properties);
172                executeLocalJobs(jobParameters);
173                executeRegisteredJobs(jobParameters);
174        }
175
176        private void executeLocalJobs(JobParameters jobParameters)
177                        throws JobExecutionException {
178                for (Job job : this.jobs) {
179                        if (StringUtils.hasText(this.jobNames)) {
180                                String[] jobsToRun = this.jobNames.split(",");
181                                if (!PatternMatchUtils.simpleMatch(jobsToRun, job.getName())) {
182                                        logger.debug("Skipped job: " + job.getName());
183                                        continue;
184                                }
185                        }
186                        execute(job, jobParameters);
187                }
188        }
189
190        private void executeRegisteredJobs(JobParameters jobParameters)
191                        throws JobExecutionException {
192                if (this.jobRegistry != null && StringUtils.hasText(this.jobNames)) {
193                        String[] jobsToRun = this.jobNames.split(",");
194                        for (String jobName : jobsToRun) {
195                                try {
196                                        Job job = this.jobRegistry.getJob(jobName);
197                                        if (this.jobs.contains(job)) {
198                                                continue;
199                                        }
200                                        execute(job, jobParameters);
201                                }
202                                catch (NoSuchJobException ex) {
203                                        logger.debug("No job found in registry for job name: " + jobName);
204                                }
205                        }
206                }
207        }
208
209        protected void execute(Job job, JobParameters jobParameters)
210                        throws JobExecutionAlreadyRunningException, JobRestartException,
211                        JobInstanceAlreadyCompleteException, JobParametersInvalidException,
212                        JobParametersNotFoundException {
213                JobParameters parameters = getNextJobParameters(job, jobParameters);
214                JobExecution execution = this.jobLauncher.run(job, parameters);
215                if (this.publisher != null) {
216                        this.publisher.publishEvent(new JobExecutionEvent(execution));
217                }
218        }
219
220        private JobParameters getNextJobParameters(Job job, JobParameters jobParameters) {
221                if (this.jobRepository != null
222                                && this.jobRepository.isJobInstanceExists(job.getName(), jobParameters)) {
223                        return getNextJobParametersForExisting(job, jobParameters);
224                }
225                if (job.getJobParametersIncrementer() == null) {
226                        return jobParameters;
227                }
228                JobParameters nextParameters = new JobParametersBuilder(jobParameters,
229                                this.jobExplorer).getNextJobParameters(job).toJobParameters();
230                return merge(nextParameters, jobParameters);
231        }
232
233        private JobParameters getNextJobParametersForExisting(Job job,
234                        JobParameters jobParameters) {
235                JobExecution lastExecution = this.jobRepository.getLastJobExecution(job.getName(),
236                                jobParameters);
237                if (isStoppedOrFailed(lastExecution) && job.isRestartable()) {
238                        JobParameters previousIdentifyingParameters = getGetIdentifying(
239                                        lastExecution.getJobParameters());
240                        return merge(previousIdentifyingParameters, jobParameters);
241                }
242                return jobParameters;
243        }
244
245        private boolean isStoppedOrFailed(JobExecution execution) {
246                BatchStatus status = (execution != null) ? execution.getStatus() : null;
247                return (status == BatchStatus.STOPPED || status == BatchStatus.FAILED);
248        }
249
250        private JobParameters getGetIdentifying(JobParameters parameters) {
251                HashMap<String, JobParameter> nonIdentifying = new LinkedHashMap<>(
252                                parameters.getParameters().size());
253                parameters.getParameters().forEach((key, value) -> {
254                        if (value.isIdentifying()) {
255                                nonIdentifying.put(key, value);
256                        }
257                });
258                return new JobParameters(nonIdentifying);
259        }
260
261        private JobParameters merge(JobParameters parameters, JobParameters additionals) {
262                Map<String, JobParameter> merged = new LinkedHashMap<>();
263                merged.putAll(parameters.getParameters());
264                merged.putAll(additionals.getParameters());
265                return new JobParameters(merged);
266        }
267
268}