001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.launch.support;
017
018import java.io.BufferedReader;
019import java.io.IOException;
020import java.io.InputStreamReader;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.Collections;
024import java.util.LinkedHashSet;
025import java.util.List;
026import java.util.Properties;
027import java.util.Set;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031
032import org.springframework.batch.core.BatchStatus;
033import org.springframework.batch.core.ExitStatus;
034import org.springframework.batch.core.Job;
035import org.springframework.batch.core.JobExecution;
036import org.springframework.batch.core.JobInstance;
037import org.springframework.batch.core.JobParameters;
038import org.springframework.batch.core.JobParametersBuilder;
039import org.springframework.batch.core.JobParametersIncrementer;
040import org.springframework.batch.core.configuration.JobLocator;
041import org.springframework.batch.core.converter.DefaultJobParametersConverter;
042import org.springframework.batch.core.converter.JobParametersConverter;
043import org.springframework.batch.core.explore.JobExplorer;
044import org.springframework.batch.core.launch.JobExecutionNotFailedException;
045import org.springframework.batch.core.launch.JobExecutionNotRunningException;
046import org.springframework.batch.core.launch.JobExecutionNotStoppedException;
047import org.springframework.batch.core.launch.JobLauncher;
048import org.springframework.batch.core.launch.NoSuchJobException;
049import org.springframework.batch.core.repository.JobRepository;
050import org.springframework.beans.factory.BeanDefinitionStoreException;
051import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
052import org.springframework.context.ConfigurableApplicationContext;
053import org.springframework.context.annotation.AnnotationConfigApplicationContext;
054import org.springframework.context.support.ClassPathXmlApplicationContext;
055import org.springframework.util.Assert;
056import org.springframework.util.StringUtils;
057
058/**
059 * <p>
060 * Basic launcher for starting jobs from the command line. In general, it is
061 * assumed that this launcher will primarily be used to start a job via a script
062 * from an Enterprise Scheduler. Therefore, exit codes are mapped to integers so
063 * that schedulers can use the returned values to determine the next course of
064 * action. The returned values can also be useful to operations teams in
065 * determining what should happen upon failure. For example, a returned code of
066 * 5 might mean that some resource wasn't available and the job should be
067 * restarted. However, a code of 10 might mean that something critical has
068 * happened and the issue should be escalated.
069 * </p>
070 *
071 * <p>
072 * With any launch of a batch job within Spring Batch, a Spring context
073 * containing the {@link Job} and some execution context has to be created. This
074 * command line launcher can be used to load the job and its context from a
075 * single location. All dependencies of the launcher will then be satisfied by
076 * autowiring by type from the combined application context. Default values are
077 * provided for all fields except the {@link JobLauncher} and {@link JobLocator}
078 * . Therefore, if autowiring fails to set it (it should be noted that
079 * dependency checking is disabled because most of the fields have default
080 * values and thus don't require dependencies to be fulfilled via autowiring)
081 * then an exception will be thrown. It should also be noted that even if an
082 * exception is thrown by this class, it will be mapped to an integer and
083 * returned.
084 * </p>
085 *
086 * <p>
087 * Notice a property is available to set the {@link SystemExiter}. This class is
088 * used to exit from the main method, rather than calling System.exit()
089 * directly. This is because unit testing a class the calls System.exit() is
090 * impossible without kicking off the test within a new JVM, which it is
091 * possible to do, however it is a complex solution, much more so than
092 * strategizing the exiter.
093 * </p>
094 *
095 * <p>
096 * The arguments to this class can be provided on the command line (separated by
097 * spaces), or through stdin (separated by new line). They are as follows:
098 * </p>
099 *
100 * <code>
101 * jobPath &lt;options&gt; jobIdentifier (jobParameters)*
102 * </code>
103 *
104 * <p>
105 * The command line options are as follows
106 * </p>
107 * <ul>
108 * <li>jobPath: the xml application context containing a {@link Job}
109 * <li>-restart: (optional) to restart the last failed execution</li>
110 * <li>-stop: (optional) to stop a running execution</li>
111 * <li>-abandon: (optional) to abandon a stopped execution</li>
112 * <li>-next: (optional) to start the next in a sequence according to the
113 * {@link JobParametersIncrementer} in the {@link Job}</li>
114 * <li>jobIdentifier: the name of the job or the id of a job execution (for
115 * -stop, -abandon or -restart).
116 * <li>jobParameters: 0 to many parameters that will be used to launch a job
117 * specified in the form of <code>key=value</code> pairs.
118 * </ul>
119 *
120 * <p>
121 * If the <code>-next</code> option is used the parameters on the command line
122 * (if any) are appended to those retrieved from the incrementer, overriding any
123 * with the same key.
124 * </p>
125 *
126 * <p>
127 * The combined application context must contain only one instance of
128 * {@link JobLauncher}. The job parameters passed in to the command line will be
129 * converted to {@link Properties} by assuming that each individual element is
130 * one parameter that is separated by an equals sign. For example,
131 * "vendor.id=290232". The resulting properties instance is converted to
132 * {@link JobParameters} using a {@link JobParametersConverter} from the
133 * application context (if there is one, or a
134 * {@link DefaultJobParametersConverter} otherwise). Below is an example
135 * arguments list: "</p>
136 *
137 * <p>
138 * <code>
139 * java org.springframework.batch.core.launch.support.CommandLineJobRunner testJob.xml
140 * testJob schedule.date=2008/01/24 vendor.id=3902483920
141 * </code>
142 * </p>
143 *
144 * <p>
145 * Once arguments have been successfully parsed, autowiring will be used to set
146 * various dependencies. The {@link JobLauncher} for example, will be
147 * loaded this way. If none is contained in the bean factory (it searches by
148 * type) then a {@link BeanDefinitionStoreException} will be thrown. The same
149 * exception will also be thrown if there is more than one present. Assuming the
150 * JobLauncher has been set correctly, the jobIdentifier argument will be used
151 * to obtain an actual {@link Job}. If a {@link JobLocator} has been set, then
152 * it will be used, if not the beanFactory will be asked, using the
153 * jobIdentifier as the bean id.
154 * </p>
155 *
156 * @author Dave Syer
157 * @author Lucas Ward
158 * @author Mahmoud Ben Hassine
159 * @since 1.0
160 */
161public class CommandLineJobRunner {
162
163        protected static final Log logger = LogFactory.getLog(CommandLineJobRunner.class);
164
165        private ExitCodeMapper exitCodeMapper = new SimpleJvmExitCodeMapper();
166
167        private JobLauncher launcher;
168
169        private JobLocator jobLocator;
170
171        // Package private for unit test
172        private static SystemExiter systemExiter = new JvmSystemExiter();
173
174        private static String message = "";
175
176        private JobParametersConverter jobParametersConverter = new DefaultJobParametersConverter();
177
178        private JobExplorer jobExplorer;
179
180        private JobRepository jobRepository;
181
182        private final static List<String> VALID_OPTS = Arrays.asList(new String [] {"-restart", "-next", "-stop", "-abandon"});
183
184        /**
185         * Injection setter for the {@link JobLauncher}.
186         *
187         * @param launcher the launcher to set
188         */
189        public void setLauncher(JobLauncher launcher) {
190                this.launcher = launcher;
191        }
192
193        /**
194         * @param jobRepository the jobRepository to set
195         */
196        public void setJobRepository(JobRepository jobRepository) {
197                this.jobRepository = jobRepository;
198        }
199
200        /**
201         * Injection setter for {@link JobExplorer}.
202         *
203         * @param jobExplorer the {@link JobExplorer} to set
204         */
205        public void setJobExplorer(JobExplorer jobExplorer) {
206                this.jobExplorer = jobExplorer;
207        }
208
209        /**
210         * Injection setter for the {@link ExitCodeMapper}.
211         *
212         * @param exitCodeMapper the exitCodeMapper to set
213         */
214        public void setExitCodeMapper(ExitCodeMapper exitCodeMapper) {
215                this.exitCodeMapper = exitCodeMapper;
216        }
217
218        /**
219         * Static setter for the {@link SystemExiter} so it can be adjusted before
220         * dependency injection. Typically overridden by
221         * {@link #setSystemExiter(SystemExiter)}.
222         *
223         * @param systemExiter {@link SystemExiter} instance to be used by CommandLineJobRunner instance.
224         */
225        public static void presetSystemExiter(SystemExiter systemExiter) {
226                CommandLineJobRunner.systemExiter = systemExiter;
227        }
228
229        /**
230         * Retrieve the error message set by an instance of
231         * {@link CommandLineJobRunner} as it exits. Empty if the last job launched
232         * was successful.
233         *
234         * @return the error message
235         */
236        public static String getErrorMessage() {
237                return message;
238        }
239
240        /**
241         * Injection setter for the {@link SystemExiter}.
242         *
243         * @param systemExiter {@link SystemExiter} instance to be used by CommandLineJobRunner instance.
244         */
245        public void setSystemExiter(SystemExiter systemExiter) {
246                CommandLineJobRunner.systemExiter = systemExiter;
247        }
248
249        /**
250         * Injection setter for {@link JobParametersConverter}.
251         *
252         * @param jobParametersConverter instance of {@link JobParametersConverter}
253         * to be used by the CommandLineJobRunner instance.
254         */
255        public void setJobParametersConverter(JobParametersConverter jobParametersConverter) {
256                this.jobParametersConverter = jobParametersConverter;
257        }
258
259        /**
260         * Delegate to the exiter to (possibly) exit the VM gracefully.
261         *
262         * @param status int exit code that should be reported.
263         */
264        public void exit(int status) {
265                systemExiter.exit(status);
266        }
267
268        /**
269         * {@link JobLocator} to find a job to run.
270         * @param jobLocator a {@link JobLocator}
271         */
272        public void setJobLocator(JobLocator jobLocator) {
273                this.jobLocator = jobLocator;
274        }
275
276        /*
277         * Start a job by obtaining a combined classpath using the job launcher and
278         * job paths. If a JobLocator has been set, then use it to obtain an actual
279         * job, if not ask the context for it.
280         */
281        @SuppressWarnings("resource")
282        int start(String jobPath, String jobIdentifier, String[] parameters, Set<String> opts) {
283
284                ConfigurableApplicationContext context = null;
285
286                try {
287                        try {
288                                context = new AnnotationConfigApplicationContext(Class.forName(jobPath));
289                        } catch (ClassNotFoundException cnfe) {
290                                context = new ClassPathXmlApplicationContext(jobPath);
291                        }
292
293                        context.getAutowireCapableBeanFactory().autowireBeanProperties(this,
294                                        AutowireCapableBeanFactory.AUTOWIRE_BY_TYPE, false);
295
296                        Assert.state(launcher != null, "A JobLauncher must be provided.  Please add one to the configuration.");
297                        if (opts.contains("-restart") || opts.contains("-next")) {
298                                Assert.state(jobExplorer != null,
299                                                "A JobExplorer must be provided for a restart or start next operation.  Please add one to the configuration.");
300                        }
301
302                        String jobName = jobIdentifier;
303                        
304                        JobParameters jobParameters = jobParametersConverter.getJobParameters(StringUtils
305                                        .splitArrayElementsIntoProperties(parameters, "="));
306                        Assert.isTrue(parameters == null || parameters.length == 0 || !jobParameters.isEmpty(),
307                                        "Invalid JobParameters " + Arrays.asList(parameters)
308                                        + ". If parameters are provided they should be in the form name=value (no whitespace).");
309
310                        if (opts.contains("-stop")) {
311                                List<JobExecution> jobExecutions = getRunningJobExecutions(jobIdentifier);
312                                if (jobExecutions == null) {
313                                        throw new JobExecutionNotRunningException("No running execution found for job=" + jobIdentifier);
314                                }
315                                for (JobExecution jobExecution : jobExecutions) {
316                                        jobExecution.setStatus(BatchStatus.STOPPING);
317                                        jobRepository.update(jobExecution);
318                                }
319                                return exitCodeMapper.intValue(ExitStatus.COMPLETED.getExitCode());
320                        }
321
322                        if (opts.contains("-abandon")) {
323                                List<JobExecution> jobExecutions = getStoppedJobExecutions(jobIdentifier);
324                                if (jobExecutions == null) {
325                                        throw new JobExecutionNotStoppedException("No stopped execution found for job=" + jobIdentifier);
326                                }
327                                for (JobExecution jobExecution : jobExecutions) {
328                                        jobExecution.setStatus(BatchStatus.ABANDONED);
329                                        jobRepository.update(jobExecution);
330                                }
331                                return exitCodeMapper.intValue(ExitStatus.COMPLETED.getExitCode());
332                        }
333
334                        if (opts.contains("-restart")) {
335                                JobExecution jobExecution = getLastFailedJobExecution(jobIdentifier);
336                                if (jobExecution == null) {
337                                        throw new JobExecutionNotFailedException("No failed or stopped execution found for job="
338                                                        + jobIdentifier);
339                                }
340                                jobParameters = jobExecution.getJobParameters();
341                                jobName = jobExecution.getJobInstance().getJobName();
342                        }
343
344                        Job job = null;
345                        if (jobLocator != null) {
346                                try {
347                                        job = jobLocator.getJob(jobName);
348                                } catch (NoSuchJobException e) {
349                                }
350                        }
351                        if (job == null) {
352                                job = (Job) context.getBean(jobName);
353                        }
354
355                        if (opts.contains("-next")) {
356                                jobParameters = new JobParametersBuilder(jobParameters, jobExplorer)
357                                                .getNextJobParameters(job)
358                                                .toJobParameters();
359                        }
360
361                        JobExecution jobExecution = launcher.run(job, jobParameters);
362                        return exitCodeMapper.intValue(jobExecution.getExitStatus().getExitCode());
363
364                }
365                catch (Throwable e) {
366                        String message = "Job Terminated in error: " + e.getMessage();
367                        logger.error(message, e);
368                        CommandLineJobRunner.message = message;
369                        return exitCodeMapper.intValue(ExitStatus.FAILED.getExitCode());
370                }
371                finally {
372                        if (context != null) {
373                                context.close();
374                        }
375                }
376        }
377
378        /**
379         * @param jobIdentifier a job execution id or job name
380         * @param minStatus the highest status to exclude from the result
381         * @return
382         */
383        private List<JobExecution> getJobExecutionsWithStatusGreaterThan(String jobIdentifier, BatchStatus minStatus) {
384
385                Long executionId = getLongIdentifier(jobIdentifier);
386                if (executionId != null) {
387                        JobExecution jobExecution = jobExplorer.getJobExecution(executionId);
388                        if (jobExecution.getStatus().isGreaterThan(minStatus)) {
389                                return Arrays.asList(jobExecution);
390                        }
391                        return Collections.emptyList();
392                }
393
394                int start = 0;
395                int count = 100;
396                List<JobExecution> executions = new ArrayList<JobExecution>();
397                List<JobInstance> lastInstances = jobExplorer.getJobInstances(jobIdentifier, start, count);
398
399                while (!lastInstances.isEmpty()) {
400
401                        for (JobInstance jobInstance : lastInstances) {
402                                List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance);
403                                if (jobExecutions == null || jobExecutions.isEmpty()) {
404                                        continue;
405                                }
406                                for (JobExecution jobExecution : jobExecutions) {
407                                        if (jobExecution.getStatus().isGreaterThan(minStatus)) {
408                                                executions.add(jobExecution);
409                                        }
410                                }
411                        }
412
413                        start += count;
414                        lastInstances = jobExplorer.getJobInstances(jobIdentifier, start, count);
415
416                }
417
418                return executions;
419
420        }
421
422        private JobExecution getLastFailedJobExecution(String jobIdentifier) {
423                List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.STOPPING);
424                if (jobExecutions.isEmpty()) {
425                        return null;
426                }
427                return jobExecutions.get(0);
428        }
429
430        private List<JobExecution> getStoppedJobExecutions(String jobIdentifier) {
431                List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.STARTED);
432                if (jobExecutions.isEmpty()) {
433                        return null;
434                }
435                List<JobExecution> result = new ArrayList<JobExecution>();
436                for (JobExecution jobExecution : jobExecutions) {
437                        if (jobExecution.getStatus() != BatchStatus.ABANDONED) {
438                                result.add(jobExecution);
439                        }
440                }
441                return result.isEmpty() ? null : result;
442        }
443
444        private List<JobExecution> getRunningJobExecutions(String jobIdentifier) {
445                List<JobExecution> jobExecutions = getJobExecutionsWithStatusGreaterThan(jobIdentifier, BatchStatus.COMPLETED);
446                if (jobExecutions.isEmpty()) {
447                        return null;
448                }
449                List<JobExecution> result = new ArrayList<JobExecution>();
450                for (JobExecution jobExecution : jobExecutions) {
451                        if (jobExecution.isRunning()) {
452                                result.add(jobExecution);
453                        }
454                }
455                return result.isEmpty() ? null : result;
456        }
457
458        private Long getLongIdentifier(String jobIdentifier) {
459                try {
460                        return new Long(jobIdentifier);
461                }
462                catch (NumberFormatException e) {
463                        // Not an ID - must be a name
464                        return null;
465                }
466        }
467
468        /**
469         * Launch a batch job using a {@link CommandLineJobRunner}. Creates a new
470         * Spring context for the job execution, and uses a common parent for all
471         * such contexts. No exception are thrown from this method, rather
472         * exceptions are logged and an integer returned through the exit status in
473         * a {@link JvmSystemExiter} (which can be overridden by defining one in the
474         * Spring context).<br>
475         * Parameters can be provided in the form key=value, and will be converted
476         * using the injected {@link JobParametersConverter}.
477         *
478         * @param args
479         * <ul>
480         * <li>-restart: (optional) if the job has failed or stopped and the most
481         * should be restarted. If specified then the jobIdentifier parameter can be
482         * interpreted either as the name of the job or the id of the job execution
483         * that failed.</li>
484         * <li>-next: (optional) if the job has a {@link JobParametersIncrementer}
485         * that can be used to launch the next instance in a sequence</li>
486         * <li>jobPath: the xml application context containing a {@link Job}
487         * <li>jobIdentifier: the bean id of the job or id of the failed execution
488         * in the case of a restart.
489         * <li>jobParameters: 0 to many parameters that will be used to launch a
490         * job.
491         * </ul>
492         * <p>
493         * The options (<code>-restart, -next</code>) can occur anywhere in the
494         * command line.
495         * </p>
496         *
497         * @throws Exception is thrown if error occurs.
498         */
499        public static void main(String[] args) throws Exception {
500
501                CommandLineJobRunner command = new CommandLineJobRunner();
502
503                List<String> newargs = new ArrayList<String>(Arrays.asList(args));
504
505                try {
506                        if (System.in.available() > 0) {
507                                BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
508                                String line = " ";
509                                while (line != null) {
510                                        if (!line.startsWith("#") && StringUtils.hasText(line)) {
511                                                if (logger.isDebugEnabled()) {
512                                                        logger.debug("Stdin arg: " + line);
513                                                }
514                                                newargs.add(line);
515                                        }
516                                        line = reader.readLine();
517                                }
518                        }
519                }
520                catch (IOException e) {
521                        logger.warn("Could not access stdin (maybe a platform limitation)");
522                        if (logger.isDebugEnabled()) {
523                                logger.debug("Exception details", e);
524                        }
525                }
526
527                Set<String> opts = new LinkedHashSet<String>();
528                List<String> params = new ArrayList<String>();
529
530                int count = 0;
531                String jobPath = null;
532                String jobIdentifier = null;
533
534                for (String arg : newargs) {
535                        if (VALID_OPTS.contains(arg)) {
536                                opts.add(arg);
537                        }
538                        else {
539                                switch (count) {
540                                case 0:
541                                        jobPath = arg;
542                                        break;
543                                case 1:
544                                        jobIdentifier = arg;
545                                        break;
546                                default:
547                                        params.add(arg);
548                                        break;
549                                }
550                                count++;
551                        }
552                }
553
554                if (jobPath == null || jobIdentifier == null) {
555                        String message = "At least 2 arguments are required: JobPath/JobClass and jobIdentifier.";
556                        logger.error(message);
557                        CommandLineJobRunner.message = message;
558                        command.exit(1);
559                        return;
560                }
561
562                String[] parameters = params.toArray(new String[params.size()]);
563
564                int result = command.start(jobPath, jobIdentifier, parameters, opts);
565                command.exit(result);
566        }
567
568}