001/* 002 * Copyright 2012-2016 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.boot.autoconfigure.batch; 018 019import java.util.Arrays; 020import java.util.Collection; 021import java.util.Collections; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.Properties; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029 030import org.springframework.batch.core.BatchStatus; 031import org.springframework.batch.core.Job; 032import org.springframework.batch.core.JobExecution; 033import org.springframework.batch.core.JobExecutionException; 034import org.springframework.batch.core.JobInstance; 035import org.springframework.batch.core.JobParameter; 036import org.springframework.batch.core.JobParameters; 037import org.springframework.batch.core.JobParametersIncrementer; 038import org.springframework.batch.core.JobParametersInvalidException; 039import org.springframework.batch.core.configuration.JobRegistry; 040import org.springframework.batch.core.converter.DefaultJobParametersConverter; 041import org.springframework.batch.core.converter.JobParametersConverter; 042import org.springframework.batch.core.explore.JobExplorer; 043import org.springframework.batch.core.launch.JobLauncher; 044import org.springframework.batch.core.launch.JobParametersNotFoundException; 045import org.springframework.batch.core.launch.NoSuchJobException; 046import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; 047import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; 048import org.springframework.batch.core.repository.JobRestartException; 049import org.springframework.beans.factory.annotation.Autowired; 050import org.springframework.boot.CommandLineRunner; 051import org.springframework.context.ApplicationEventPublisher; 052import org.springframework.context.ApplicationEventPublisherAware; 053import org.springframework.util.PatternMatchUtils; 054import org.springframework.util.StringUtils; 055 056/** 057 * {@link CommandLineRunner} to {@link JobLauncher launch} Spring Batch jobs. Runs all 058 * jobs in the surrounding context by default. Can also be used to launch a specific job 059 * by providing a jobName 060 * 061 * @author Dave Syer 062 * @author Jean-Pierre Bergamin 063 */ 064public class JobLauncherCommandLineRunner 065 implements CommandLineRunner, ApplicationEventPublisherAware { 066 067 private static final Log logger = LogFactory 068 .getLog(JobLauncherCommandLineRunner.class); 069 070 private JobParametersConverter converter = new DefaultJobParametersConverter(); 071 072 private JobLauncher jobLauncher; 073 074 private JobRegistry jobRegistry; 075 076 private JobExplorer jobExplorer; 077 078 private String jobNames; 079 080 private Collection<Job> jobs = Collections.emptySet(); 081 082 private ApplicationEventPublisher publisher; 083 084 public JobLauncherCommandLineRunner(JobLauncher jobLauncher, 085 JobExplorer jobExplorer) { 086 this.jobLauncher = jobLauncher; 087 this.jobExplorer = jobExplorer; 088 } 089 090 public void setJobNames(String jobNames) { 091 this.jobNames = jobNames; 092 } 093 094 @Override 095 public void setApplicationEventPublisher(ApplicationEventPublisher publisher) { 096 this.publisher = publisher; 097 } 098 099 @Autowired(required = false) 100 public void setJobRegistry(JobRegistry jobRegistry) { 101 this.jobRegistry = jobRegistry; 102 } 103 104 @Autowired(required = false) 105 public void setJobParametersConverter(JobParametersConverter converter) { 106 this.converter = converter; 107 } 108 109 @Autowired(required = false) 110 public void setJobs(Collection<Job> jobs) { 111 this.jobs = jobs; 112 } 113 114 @Override 115 public void run(String... args) throws JobExecutionException { 116 logger.info("Running default command line with: " + Arrays.asList(args)); 117 launchJobFromProperties(StringUtils.splitArrayElementsIntoProperties(args, "=")); 118 } 119 120 protected void launchJobFromProperties(Properties properties) 121 throws JobExecutionException { 122 JobParameters jobParameters = this.converter.getJobParameters(properties); 123 executeLocalJobs(jobParameters); 124 executeRegisteredJobs(jobParameters); 125 } 126 127 private JobParameters getNextJobParameters(Job job, 128 JobParameters additionalParameters) { 129 String name = job.getName(); 130 JobParameters parameters = new JobParameters(); 131 List<JobInstance> lastInstances = this.jobExplorer.getJobInstances(name, 0, 1); 132 JobParametersIncrementer incrementer = job.getJobParametersIncrementer(); 133 Map<String, JobParameter> additionals = additionalParameters.getParameters(); 134 if (lastInstances.isEmpty()) { 135 // Start from a completely clean sheet 136 if (incrementer != null) { 137 parameters = incrementer.getNext(new JobParameters()); 138 } 139 } 140 else { 141 List<JobExecution> previousExecutions = this.jobExplorer 142 .getJobExecutions(lastInstances.get(0)); 143 JobExecution previousExecution = previousExecutions.get(0); 144 if (previousExecution == null) { 145 // Normally this will not happen - an instance exists with no executions 146 if (incrementer != null) { 147 parameters = incrementer.getNext(new JobParameters()); 148 } 149 } 150 else if (isStoppedOrFailed(previousExecution) && job.isRestartable()) { 151 // Retry a failed or stopped execution 152 parameters = previousExecution.getJobParameters(); 153 // Non-identifying additional parameters can be removed to a retry 154 removeNonIdentifying(additionals); 155 } 156 else if (incrementer != null) { 157 // New instance so increment the parameters if we can 158 parameters = incrementer.getNext(previousExecution.getJobParameters()); 159 } 160 } 161 return merge(parameters, additionals); 162 } 163 164 private boolean isStoppedOrFailed(JobExecution execution) { 165 BatchStatus status = execution.getStatus(); 166 return (status == BatchStatus.STOPPED || status == BatchStatus.FAILED); 167 } 168 169 private void removeNonIdentifying(Map<String, JobParameter> parameters) { 170 HashMap<String, JobParameter> copy = new HashMap<String, JobParameter>( 171 parameters); 172 for (Map.Entry<String, JobParameter> parameter : copy.entrySet()) { 173 if (!parameter.getValue().isIdentifying()) { 174 parameters.remove(parameter.getKey()); 175 } 176 } 177 } 178 179 private JobParameters merge(JobParameters parameters, 180 Map<String, JobParameter> additionals) { 181 Map<String, JobParameter> merged = new HashMap<String, JobParameter>(); 182 merged.putAll(parameters.getParameters()); 183 merged.putAll(additionals); 184 return new JobParameters(merged); 185 } 186 187 private void executeRegisteredJobs(JobParameters jobParameters) 188 throws JobExecutionException { 189 if (this.jobRegistry != null && StringUtils.hasText(this.jobNames)) { 190 String[] jobsToRun = this.jobNames.split(","); 191 for (String jobName : jobsToRun) { 192 try { 193 Job job = this.jobRegistry.getJob(jobName); 194 if (this.jobs.contains(job)) { 195 continue; 196 } 197 execute(job, jobParameters); 198 } 199 catch (NoSuchJobException ex) { 200 logger.debug("No job found in registry for job name: " + jobName); 201 } 202 } 203 } 204 } 205 206 protected void execute(Job job, JobParameters jobParameters) 207 throws JobExecutionAlreadyRunningException, JobRestartException, 208 JobInstanceAlreadyCompleteException, JobParametersInvalidException, 209 JobParametersNotFoundException { 210 JobParameters nextParameters = getNextJobParameters(job, jobParameters); 211 JobExecution execution = this.jobLauncher.run(job, nextParameters); 212 if (this.publisher != null) { 213 this.publisher.publishEvent(new JobExecutionEvent(execution)); 214 } 215 } 216 217 private void executeLocalJobs(JobParameters jobParameters) 218 throws JobExecutionException { 219 for (Job job : this.jobs) { 220 if (StringUtils.hasText(this.jobNames)) { 221 String[] jobsToRun = this.jobNames.split(","); 222 if (!PatternMatchUtils.simpleMatch(jobsToRun, job.getName())) { 223 logger.debug("Skipped job: " + job.getName()); 224 continue; 225 } 226 } 227 execute(job, jobParameters); 228 } 229 } 230 231}