001/* 002 * Copyright 2012-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.boot.autoconfigure.batch; 018 019import java.util.Arrays; 020import java.util.Collection; 021import java.util.Collections; 022import java.util.HashMap; 023import java.util.LinkedHashMap; 024import java.util.Map; 025import java.util.Properties; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029 030import org.springframework.batch.core.BatchStatus; 031import org.springframework.batch.core.Job; 032import org.springframework.batch.core.JobExecution; 033import org.springframework.batch.core.JobExecutionException; 034import org.springframework.batch.core.JobParameter; 035import org.springframework.batch.core.JobParameters; 036import org.springframework.batch.core.JobParametersBuilder; 037import org.springframework.batch.core.JobParametersInvalidException; 038import org.springframework.batch.core.configuration.JobRegistry; 039import org.springframework.batch.core.converter.DefaultJobParametersConverter; 040import org.springframework.batch.core.converter.JobParametersConverter; 041import org.springframework.batch.core.explore.JobExplorer; 042import org.springframework.batch.core.launch.JobLauncher; 043import org.springframework.batch.core.launch.JobParametersNotFoundException; 044import org.springframework.batch.core.launch.NoSuchJobException; 045import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; 046import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; 047import org.springframework.batch.core.repository.JobRepository; 048import org.springframework.batch.core.repository.JobRestartException; 049import org.springframework.beans.factory.annotation.Autowired; 050import org.springframework.boot.CommandLineRunner; 051import org.springframework.context.ApplicationEventPublisher; 052import org.springframework.context.ApplicationEventPublisherAware; 053import org.springframework.core.Ordered; 054import org.springframework.util.Assert; 055import org.springframework.util.PatternMatchUtils; 056import org.springframework.util.StringUtils; 057 058/** 059 * {@link CommandLineRunner} to {@link JobLauncher launch} Spring Batch jobs. Runs all 060 * jobs in the surrounding context by default. Can also be used to launch a specific job 061 * by providing a jobName 062 * 063 * @author Dave Syer 064 * @author Jean-Pierre Bergamin 065 * @author Mahmoud Ben Hassine 066 */ 067public class JobLauncherCommandLineRunner 068 implements CommandLineRunner, Ordered, ApplicationEventPublisherAware { 069 070 /** 071 * The default order for the command line runner. 072 */ 073 public static final int DEFAULT_ORDER = 0; 074 075 private static final Log logger = LogFactory 076 .getLog(JobLauncherCommandLineRunner.class); 077 078 private JobParametersConverter converter = new DefaultJobParametersConverter(); 079 080 private final JobLauncher jobLauncher; 081 082 private final JobExplorer jobExplorer; 083 084 private final JobRepository jobRepository; 085 086 private JobRegistry jobRegistry; 087 088 private String jobNames; 089 090 private Collection<Job> jobs = Collections.emptySet(); 091 092 private int order = DEFAULT_ORDER; 093 094 private ApplicationEventPublisher publisher; 095 096 /** 097 * Create a new {@link JobLauncherCommandLineRunner}. 098 * @param jobLauncher to launch jobs 099 * @param jobExplorer to check the job repository for previous executions 100 * @deprecated since 2.0.7 in favor of 101 * {@link #JobLauncherCommandLineRunner(JobLauncher, JobExplorer, JobRepository)}. A 102 * job repository is required to check if a job instance exists with the given 103 * parameters when running a job (which is not possible with the job explorer). 104 */ 105 @Deprecated 106 public JobLauncherCommandLineRunner(JobLauncher jobLauncher, 107 JobExplorer jobExplorer) { 108 this.jobLauncher = jobLauncher; 109 this.jobExplorer = jobExplorer; 110 this.jobRepository = null; 111 } 112 113 /** 114 * Create a new {@link JobLauncherCommandLineRunner}. 115 * @param jobLauncher to launch jobs 116 * @param jobExplorer to check the job repository for previous executions 117 * @param jobRepository to check if a job instance exists with the given parameters 118 * when running a job 119 */ 120 public JobLauncherCommandLineRunner(JobLauncher jobLauncher, JobExplorer jobExplorer, 121 JobRepository jobRepository) { 122 Assert.notNull(jobLauncher, "JobLauncher must not be null"); 123 Assert.notNull(jobExplorer, "JobExplorer must not be null"); 124 Assert.notNull(jobRepository, "JobRepository must not be null"); 125 this.jobLauncher = jobLauncher; 126 this.jobExplorer = jobExplorer; 127 this.jobRepository = jobRepository; 128 } 129 130 public void setOrder(int order) { 131 this.order = order; 132 } 133 134 @Override 135 public int getOrder() { 136 return this.order; 137 } 138 139 @Override 140 public void setApplicationEventPublisher(ApplicationEventPublisher publisher) { 141 this.publisher = publisher; 142 } 143 144 @Autowired(required = false) 145 public void setJobRegistry(JobRegistry jobRegistry) { 146 this.jobRegistry = jobRegistry; 147 } 148 149 public void setJobNames(String jobNames) { 150 this.jobNames = jobNames; 151 } 152 153 @Autowired(required = false) 154 public void setJobParametersConverter(JobParametersConverter converter) { 155 this.converter = converter; 156 } 157 158 @Autowired(required = false) 159 public void setJobs(Collection<Job> jobs) { 160 this.jobs = jobs; 161 } 162 163 @Override 164 public void run(String... args) throws JobExecutionException { 165 logger.info("Running default command line with: " + Arrays.asList(args)); 166 launchJobFromProperties(StringUtils.splitArrayElementsIntoProperties(args, "=")); 167 } 168 169 protected void launchJobFromProperties(Properties properties) 170 throws JobExecutionException { 171 JobParameters jobParameters = this.converter.getJobParameters(properties); 172 executeLocalJobs(jobParameters); 173 executeRegisteredJobs(jobParameters); 174 } 175 176 private void executeLocalJobs(JobParameters jobParameters) 177 throws JobExecutionException { 178 for (Job job : this.jobs) { 179 if (StringUtils.hasText(this.jobNames)) { 180 String[] jobsToRun = this.jobNames.split(","); 181 if (!PatternMatchUtils.simpleMatch(jobsToRun, job.getName())) { 182 logger.debug("Skipped job: " + job.getName()); 183 continue; 184 } 185 } 186 execute(job, jobParameters); 187 } 188 } 189 190 private void executeRegisteredJobs(JobParameters jobParameters) 191 throws JobExecutionException { 192 if (this.jobRegistry != null && StringUtils.hasText(this.jobNames)) { 193 String[] jobsToRun = this.jobNames.split(","); 194 for (String jobName : jobsToRun) { 195 try { 196 Job job = this.jobRegistry.getJob(jobName); 197 if (this.jobs.contains(job)) { 198 continue; 199 } 200 execute(job, jobParameters); 201 } 202 catch (NoSuchJobException ex) { 203 logger.debug("No job found in registry for job name: " + jobName); 204 } 205 } 206 } 207 } 208 209 protected void execute(Job job, JobParameters jobParameters) 210 throws JobExecutionAlreadyRunningException, JobRestartException, 211 JobInstanceAlreadyCompleteException, JobParametersInvalidException, 212 JobParametersNotFoundException { 213 JobParameters parameters = getNextJobParameters(job, jobParameters); 214 JobExecution execution = this.jobLauncher.run(job, parameters); 215 if (this.publisher != null) { 216 this.publisher.publishEvent(new JobExecutionEvent(execution)); 217 } 218 } 219 220 private JobParameters getNextJobParameters(Job job, JobParameters jobParameters) { 221 if (this.jobRepository != null 222 && this.jobRepository.isJobInstanceExists(job.getName(), jobParameters)) { 223 return getNextJobParametersForExisting(job, jobParameters); 224 } 225 if (job.getJobParametersIncrementer() == null) { 226 return jobParameters; 227 } 228 JobParameters nextParameters = new JobParametersBuilder(jobParameters, 229 this.jobExplorer).getNextJobParameters(job).toJobParameters(); 230 return merge(nextParameters, jobParameters); 231 } 232 233 private JobParameters getNextJobParametersForExisting(Job job, 234 JobParameters jobParameters) { 235 JobExecution lastExecution = this.jobRepository.getLastJobExecution(job.getName(), 236 jobParameters); 237 if (isStoppedOrFailed(lastExecution) && job.isRestartable()) { 238 JobParameters previousIdentifyingParameters = getGetIdentifying( 239 lastExecution.getJobParameters()); 240 return merge(previousIdentifyingParameters, jobParameters); 241 } 242 return jobParameters; 243 } 244 245 private boolean isStoppedOrFailed(JobExecution execution) { 246 BatchStatus status = (execution != null) ? execution.getStatus() : null; 247 return (status == BatchStatus.STOPPED || status == BatchStatus.FAILED); 248 } 249 250 private JobParameters getGetIdentifying(JobParameters parameters) { 251 HashMap<String, JobParameter> nonIdentifying = new LinkedHashMap<>( 252 parameters.getParameters().size()); 253 parameters.getParameters().forEach((key, value) -> { 254 if (value.isIdentifying()) { 255 nonIdentifying.put(key, value); 256 } 257 }); 258 return new JobParameters(nonIdentifying); 259 } 260 261 private JobParameters merge(JobParameters parameters, JobParameters additionals) { 262 Map<String, JobParameter> merged = new LinkedHashMap<>(); 263 merged.putAll(parameters.getParameters()); 264 merged.putAll(additionals.getParameters()); 265 return new JobParameters(merged); 266 } 267 268}