001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.launch.support; 017 018import java.io.BufferedReader; 019import java.io.IOException; 020import java.io.InputStreamReader; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.Collections; 024import java.util.LinkedHashSet; 025import java.util.List; 026import java.util.Properties; 027import java.util.Set; 028 029import org.apache.commons.logging.Log; 030import org.apache.commons.logging.LogFactory; 031 032import org.springframework.batch.core.BatchStatus; 033import org.springframework.batch.core.ExitStatus; 034import org.springframework.batch.core.Job; 035import org.springframework.batch.core.JobExecution; 036import org.springframework.batch.core.JobInstance; 037import org.springframework.batch.core.JobParameters; 038import org.springframework.batch.core.JobParametersBuilder; 039import org.springframework.batch.core.JobParametersIncrementer; 040import org.springframework.batch.core.configuration.JobLocator; 041import org.springframework.batch.core.converter.DefaultJobParametersConverter; 042import org.springframework.batch.core.converter.JobParametersConverter; 043import org.springframework.batch.core.explore.JobExplorer; 044import org.springframework.batch.core.launch.JobExecutionNotFailedException; 045import org.springframework.batch.core.launch.JobExecutionNotRunningException; 046import org.springframework.batch.core.launch.JobExecutionNotStoppedException; 047import org.springframework.batch.core.launch.JobLauncher; 048import org.springframework.batch.core.launch.NoSuchJobException; 049import org.springframework.batch.core.repository.JobRepository; 050import org.springframework.beans.factory.BeanDefinitionStoreException; 051import org.springframework.beans.factory.config.AutowireCapableBeanFactory; 052import org.springframework.context.ConfigurableApplicationContext; 053import org.springframework.context.annotation.AnnotationConfigApplicationContext; 054import org.springframework.context.support.ClassPathXmlApplicationContext; 055import org.springframework.util.Assert; 056import org.springframework.util.StringUtils; 057 058/** 059 * <p> 060 * Basic launcher for starting jobs from the command line. In general, it is 061 * assumed that this launcher will primarily be used to start a job via a script 062 * from an Enterprise Scheduler. Therefore, exit codes are mapped to integers so 063 * that schedulers can use the returned values to determine the next course of 064 * action. The returned values can also be useful to operations teams in 065 * determining what should happen upon failure. For example, a returned code of 066 * 5 might mean that some resource wasn't available and the job should be 067 * restarted. However, a code of 10 might mean that something critical has 068 * happened and the issue should be escalated. 069 * </p> 070 * 071 * <p> 072 * With any launch of a batch job within Spring Batch, a Spring context 073 * containing the {@link Job} and some execution context has to be created. This 074 * command line launcher can be used to load the job and its context from a 075 * single location. All dependencies of the launcher will then be satisfied by 076 * autowiring by type from the combined application context. Default values are 077 * provided for all fields except the {@link JobLauncher} and {@link JobLocator} 078 * . Therefore, if autowiring fails to set it (it should be noted that 079 * dependency checking is disabled because most of the fields have default 080 * values and thus don't require dependencies to be fulfilled via autowiring) 081 * then an exception will be thrown. It should also be noted that even if an 082 * exception is thrown by this class, it will be mapped to an integer and 083 * returned. 084 * </p> 085 * 086 * <p> 087 * Notice a property is available to set the {@link SystemExiter}. This class is 088 * used to exit from the main method, rather than calling System.exit() 089 * directly. This is because unit testing a class the calls System.exit() is 090 * impossible without kicking off the test within a new JVM, which it is 091 * possible to do, however it is a complex solution, much more so than 092 * strategizing the exiter. 093 * </p> 094 * 095 * <p> 096 * The arguments to this class can be provided on the command line (separated by 097 * spaces), or through stdin (separated by new line). They are as follows: 098 * </p> 099 * 100 * <code> 101 * jobPath <options> jobIdentifier (jobParameters)* 102 * </code> 103 * 104 * <p> 105 * The command line options are as follows 106 * </p> 107 * <ul> 108 * <li>jobPath: the xml application context containing a {@link Job} 109 * <li>-restart: (optional) to restart the last failed execution</li> 110 * <li>-stop: (optional) to stop a running execution</li> 111 * <li>-abandon: (optional) to abandon a stopped execution</li> 112 * <li>-next: (optional) to start the next in a sequence according to the 113 * {@link JobParametersIncrementer} in the {@link Job}</li> 114 * <li>jobIdentifier: the name of the job or the id of a job execution (for 115 * -stop, -abandon or -restart). 116 * <li>jobParameters: 0 to many parameters that will be used to launch a job 117 * specified in the form of <code>key=value</code> pairs. 118 * </ul> 119 * 120 * <p> 121 * If the <code>-next</code> option is used the parameters on the command line 122 * (if any) are appended to those retrieved from the incrementer, overriding any 123 * with the same key. 124 * </p> 125 * 126 * <p> 127 * The combined application context must contain only one instance of 128 * {@link JobLauncher}. The job parameters passed in to the command line will be 129 * converted to {@link Properties} by assuming that each individual element is 130 * one parameter that is separated by an equals sign. For example, 131 * "vendor.id=290232". The resulting properties instance is converted to 132 * {@link JobParameters} using a {@link JobParametersConverter} from the 133 * application context (if there is one, or a 134 * {@link DefaultJobParametersConverter} otherwise). Below is an example 135 * arguments list: "</p> 136 * 137 * <p> 138 * <code> 139 * java org.springframework.batch.core.launch.support.CommandLineJobRunner testJob.xml 140 * testJob schedule.date=2008/01/24 vendor.id=3902483920 141 * </code> 142 * </p> 143 * 144 * <p> 145 * Once arguments have been successfully parsed, autowiring will be used to set 146 * various dependencies. The {@link JobLauncher} for example, will be 147 * loaded this way. If none is contained in the bean factory (it searches by 148 * type) then a {@link BeanDefinitionStoreException} will be thrown. The same 149 * exception will also be thrown if there is more than one present. Assuming the 150 * JobLauncher has been set correctly, the jobIdentifier argument will be used 151 * to obtain an actual {@link Job}. If a {@link JobLocator} has been set, then 152 * it will be used, if not the beanFactory will be asked, using the 153 * jobIdentifier as the bean id. 154 * </p> 155 * 156 * @author Dave Syer 157 * @author Lucas Ward 158 * @author Mahmoud Ben Hassine 159 * @since 1.0 160 */ 161public class CommandLineJobRunner { 162 163 protected static final Log logger = LogFactory.getLog(CommandLineJobRunner.class); 164 165 private ExitCodeMapper exitCodeMapper = new SimpleJvmExitCodeMapper(); 166 167 private JobLauncher launcher; 168 169 private JobLocator jobLocator; 170 171 // Package private for unit test 172 private static SystemExiter systemExiter = new JvmSystemExiter(); 173 174 private static String message = ""; 175 176 private JobParametersConverter jobParametersConverter = new DefaultJobParametersConverter(); 177 178 private JobExplorer jobExplorer; 179 180 private JobRepository jobRepository; 181 182 private final static List<String> VALID_OPTS = Arrays.asList(new String [] {"-restart", "-next", "-stop", "-abandon"}); 183 184 /** 185 * Injection setter for the {@link JobLauncher}. 186 * 187 * @param launcher the launcher to set 188 */ 189 public void setLauncher(JobLauncher launcher) { 190 this.launcher = launcher; 191 } 192 193 /** 194 * @param jobRepository the jobRepository to set 195 */ 196 public void setJobRepository(JobRepository jobRepository) { 197 this.jobRepository = jobRepository; 198 } 199 200 /** 201 * Injection setter for {@link JobExplorer}. 202 * 203 * @param jobExplorer the {@link JobExplorer} to set 204 */ 205 public void setJobExplorer(JobExplorer jobExplorer) { 206 this.jobExplorer = jobExplorer; 207 } 208 209 /** 210 * Injection setter for the {@link ExitCodeMapper}. 211 * 212 * @param exitCodeMapper the exitCodeMapper to set 213 */ 214 public void setExitCodeMapper(ExitCodeMapper exitCodeMapper) { 215 this.exitCodeMapper = exitCodeMapper; 216 } 217 218 /** 219 * Static setter for the {@link SystemExiter} so it can be adjusted before 220 * dependency injection. Typically overridden by 221 * {@link #setSystemExiter(SystemExiter)}. 222 * 223 * @param systemExiter {@link SystemExiter} instance to be used by CommandLineJobRunner instance. 224 */ 225 public static void presetSystemExiter(SystemExiter systemExiter) { 226 CommandLineJobRunner.systemExiter = systemExiter; 227 } 228 229 /** 230 * Retrieve the error message set by an instance of 231 * {@link CommandLineJobRunner} as it exits. Empty if the last job launched 232 * was successful. 233 * 234 * @return the error message 235 */ 236 public static String getErrorMessage() { 237 return message; 238 } 239 240 /** 241 * Injection setter for the {@link SystemExiter}. 242 * 243 * @param systemExiter {@link SystemExiter} instance to be used by CommandLineJobRunner instance. 244 */ 245 public void setSystemExiter(SystemExiter systemExiter) { 246 CommandLineJobRunner.systemExiter = systemExiter; 247 } 248 249 /** 250 * Injection setter for {@link JobParametersConverter}. 251 * 252 * @param jobParametersConverter instance of {@link JobParametersConverter} 253 * to be used by the CommandLineJobRunner instance. 254 */ 255 public void setJobParametersConverter(JobParametersConverter jobParametersConverter) { 256 this.jobParametersConverter = jobParametersConverter; 257 } 258 259 /** 260 * Delegate to the exiter to (possibly) exit the VM gracefully. 261 * 262 * @param status int exit code that should be reported. 263 */ 264 public void exit(int status) { 265 systemExiter.exit(status); 266 } 267 268 /** 269 * {@link JobLocator} to find a job to run. 270 * @param jobLocator a {@link JobLocator} 271 */ 272 public void setJobLocator(JobLocator jobLocator) { 273 this.jobLocator = jobLocator; 274 } 275 276 /* 277 * Start a job by obtaining a combined classpath using the job launcher and 278 * job paths. If a JobLocator has been set, then use it to obtain an actual 279 * job, if not ask the context for it. 280 */ 281 @SuppressWarnings("resource") 282 int start(String jobPath, String jobIdentifier, String[] parameters, Set<String> opts) { 283 284 ConfigurableApplicationContext context = null; 285 286 try { 287 try { 288 context = new AnnotationConfigApplicationContext(Class.forName(jobPath)); 289 } catch (ClassNotFoundException cnfe) { 290 context = new ClassPathXmlApplicationContext(jobPath); 291 } 292 293 context.getAutowireCapableBeanFactory().autowireBeanProperties(this, 294 AutowireCapableBeanFactory.AUTOWIRE_BY_TYPE, false); 295 296 Assert.state(launcher != null, "A JobLauncher must be provided. Please add one to the configuration."); 297 if (opts.contains("-restart") || opts.contains("-next")) { 298 Assert.state(jobExplorer != null, 299 "A JobExplorer must be provided for a restart or start next operation. Please add one to the configuration."); 300 } 301 302 String jobName = jobIdentifier; 303 304 JobParameters jobParameters = jobParametersConverter.getJobParameters(StringUtils 305 .splitArrayElementsIntoProperties(parameters, "=")); 306 Assert.isTrue(parameters == null || parameters.length == 0 || !jobParameters.isEmpty(), 307 "Invalid JobParameters " + Arrays.asList(parameters) 308 + ". If parameters are provided they should be in the form name=value (no whitespace)."); 309 310 if (opts.contains("-stop")) { 311 List<JobExecution> jobExecutions = getRunningJobExecutions(jobIdentifier); 312 if (jobExecutions == null) { 313 throw new JobExecutionNotRunningException("No running execution found for job=" + jobIdentifier); 314 } 315 for (JobExecution jobExecution : jobExecutions) { 316 jobExecution.setStatus(BatchStatus.STOPPING); 317 jobRepository.update(jobExecution); 318 } 319 return exitCodeMapper.intValue(ExitStatus.COMPLETED.getExitCode()); 320 } 321 322 if (opts.contains("-abandon")) { 323 List<JobExecution> jobExecutions = getStoppedJobExecutions(jobIdentifier); 324 if (jobExecutions == null) { 325 throw new JobExecutionNotStoppedException("No stopped execution found for job=" + jobIdentifier); 326 } 327 for (JobExecution jobExecution : jobExecutions) { 328 jobExecution.setStatus(BatchStatus.ABANDONED); 329 jobRepository.update(jobExecution); 330 } 331 return exitCodeMapper.intValue(ExitStatus.COMPLETED.getExitCode()); 332 } 333 334 if (opts.contains("-restart")) { 335 JobExecution jobExecution = getLastFailedJobExecution(jobIdentifier); 336 if (jobExecution == null) { 337 throw new JobExecutionNotFailedException("No failed or stopped execution found for job=" 338 + jobIdentifier); 339 } 340 jobParameters = jobExecution.getJobParameters(); 341 jobName = jobExecution.getJobInstance().getJobName(); 342 } 343 344 Job job = null; 345 if (jobLocator != null) { 346 try { 347 job = jobLocator.getJob(jobName); 348 } catch (NoSuchJobException e) { 349 } 350 } 351 if (job == null) { 352 job = (Job) context.getBean(jobName); 353 } 354 355 if (opts.contains("-next")) { 356 jobParameters = new JobParametersBuilder(jobParameters, jobExplorer) 357 .getNextJobParameters(job) 358 .toJobParameters(); 359 } 360 361 JobExecution jobExecution = launcher.run(job, jobParameters); 362 return exitCodeMapper.intValue(jobExecution.getExitStatus().getExitCode()); 363 364 } 365 catch (Throwable e) { 366 String message = "Job Terminated in error: " + e.getMessage(); 367 logger.error(message, e); 368 CommandLineJobRunner.message = message; 369 return exitCodeMapper.intValue(ExitStatus.FAILED.getExitCode()); 370 } 371 finally { 372 if (context != null) { 373 context.close(); 374 } 375 } 376 } 377 378 /** 379 * @param jobIdentifier a job execution id or job name 380 * @param minStatus the highest status to exclude from the result 381 * @return 382 */ 383 private List<JobExecution> getJobExecutionsWithStatusGreaterThan(String jobIdentifier, BatchStatus minStatus) { 384 385 Long executionId = getLongIdentifier(jobIdentifier); 386 if (executionId != null) { 387 JobExecution jobExecution = jobExplorer.getJobExecution(executionId); 388 if (jobExecution.getStatus().isGreaterThan(minStatus)) { 389 return Arrays.asList(jobExecution); 390 } 391 return Collections.emptyList(); 392 } 393 394 int start = 0; 395 int count = 100; 396 List<JobExecution> executions = new ArrayList<JobExecution>(); 397 List<JobInstance> lastInstances = jobExplorer.getJobInstances(jobIdentifier, start, count); 398 399 while (!lastInstances.isEmpty()) { 400 401 for (JobInstance jobInstance : lastInstances) { 402 List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance); 403 if (jobExecutions == null || jobExecutions.isEmpty()) { 404 continue; 405 } 406 for (JobExecution jobExecution : jobExecutions) { 407 if (jobExecution.getStatus().isGreaterThan(minStatus)) { 408 executions.add(jobExecution); 409 } 410 } 411 } 412 413 start += count; 414 lastInstances = jobExplorer.getJobInstances(jobIdentifier, start, count); 415 416 } 417 418 return executions; 419 420 } 421 422 private JobExecution getLastFailedJobExecution(String jobIdentifier) { 423 List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.STOPPING); 424 if (jobExecutions.isEmpty()) { 425 return null; 426 } 427 return jobExecutions.get(0); 428 } 429 430 private List<JobExecution> getStoppedJobExecutions(String jobIdentifier) { 431 List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.STARTED); 432 if (jobExecutions.isEmpty()) { 433 return null; 434 } 435 List<JobExecution> result = new ArrayList<JobExecution>(); 436 for (JobExecution jobExecution : jobExecutions) { 437 if (jobExecution.getStatus() != BatchStatus.ABANDONED) { 438 result.add(jobExecution); 439 } 440 } 441 return result.isEmpty() ? null : result; 442 } 443 444 private List<JobExecution> getRunningJobExecutions(String jobIdentifier) { 445 List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.COMPLETED); 446 if (jobExecutions.isEmpty()) { 447 return null; 448 } 449 List<JobExecution> result = new ArrayList<JobExecution>(); 450 for (JobExecution jobExecution : jobExecutions) { 451 if (jobExecution.isRunning()) { 452 result.add(jobExecution); 453 } 454 } 455 return result.isEmpty() ? null : result; 456 } 457 458 private Long getLongIdentifier(String jobIdentifier) { 459 try { 460 return new Long(jobIdentifier); 461 } 462 catch (NumberFormatException e) { 463 // Not an ID - must be a name 464 return null; 465 } 466 } 467 468 /** 469 * Launch a batch job using a {@link CommandLineJobRunner}. Creates a new 470 * Spring context for the job execution, and uses a common parent for all 471 * such contexts. No exception are thrown from this method, rather 472 * exceptions are logged and an integer returned through the exit status in 473 * a {@link JvmSystemExiter} (which can be overridden by defining one in the 474 * Spring context).<br> 475 * Parameters can be provided in the form key=value, and will be converted 476 * using the injected {@link JobParametersConverter}. 477 * 478 * @param args 479 * <ul> 480 * <li>-restart: (optional) if the job has failed or stopped and the most 481 * should be restarted. If specified then the jobIdentifier parameter can be 482 * interpreted either as the name of the job or the id of the job execution 483 * that failed.</li> 484 * <li>-next: (optional) if the job has a {@link JobParametersIncrementer} 485 * that can be used to launch the next instance in a sequence</li> 486 * <li>jobPath: the xml application context containing a {@link Job} 487 * <li>jobIdentifier: the bean id of the job or id of the failed execution 488 * in the case of a restart. 489 * <li>jobParameters: 0 to many parameters that will be used to launch a 490 * job. 491 * </ul> 492 * <p> 493 * The options (<code>-restart, -next</code>) can occur anywhere in the 494 * command line. 495 * </p> 496 * 497 * @throws Exception is thrown if error occurs. 498 */ 499 public static void main(String[] args) throws Exception { 500 501 CommandLineJobRunner command = new CommandLineJobRunner(); 502 503 List<String> newargs = new ArrayList<String>(Arrays.asList(args)); 504 505 try { 506 if (System.in.available() > 0) { 507 BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); 508 String line = " "; 509 while (line != null) { 510 if (!line.startsWith("#") && StringUtils.hasText(line)) { 511 if (logger.isDebugEnabled()) { 512 logger.debug("Stdin arg: " + line); 513 } 514 newargs.add(line); 515 } 516 line = reader.readLine(); 517 } 518 } 519 } 520 catch (IOException e) { 521 logger.warn("Could not access stdin (maybe a platform limitation)"); 522 if (logger.isDebugEnabled()) { 523 logger.debug("Exception details", e); 524 } 525 } 526 527 Set<String> opts = new LinkedHashSet<String>(); 528 List<String> params = new ArrayList<String>(); 529 530 int count = 0; 531 String jobPath = null; 532 String jobIdentifier = null; 533 534 for (String arg : newargs) { 535 if (VALID_OPTS.contains(arg)) { 536 opts.add(arg); 537 } 538 else { 539 switch (count) { 540 case 0: 541 jobPath = arg; 542 break; 543 case 1: 544 jobIdentifier = arg; 545 break; 546 default: 547 params.add(arg); 548 break; 549 } 550 count++; 551 } 552 } 553 554 if (jobPath == null || jobIdentifier == null) { 555 String message = "At least 2 arguments are required: JobPath/JobClass and jobIdentifier."; 556 logger.error(message); 557 CommandLineJobRunner.message = message; 558 command.exit(1); 559 return; 560 } 561 562 String[] parameters = params.toArray(new String[params.size()]); 563 564 int result = command.start(jobPath, jobIdentifier, parameters, opts); 565 command.exit(result); 566 } 567 568}