001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.launch.support; 017 018import org.apache.commons.logging.Log; 019import org.apache.commons.logging.LogFactory; 020import org.springframework.batch.core.BatchStatus; 021import org.springframework.batch.core.ExitStatus; 022import org.springframework.batch.core.Job; 023import org.springframework.batch.core.JobExecution; 024import org.springframework.batch.core.JobInstance; 025import org.springframework.batch.core.JobParameters; 026import org.springframework.batch.core.JobParametersInvalidException; 027import org.springframework.batch.core.StepExecution; 028import org.springframework.batch.core.launch.JobLauncher; 029import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; 030import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; 031import org.springframework.batch.core.repository.JobRepository; 032import org.springframework.batch.core.repository.JobRestartException; 033import org.springframework.beans.factory.InitializingBean; 034import org.springframework.core.task.SyncTaskExecutor; 035import org.springframework.core.task.TaskExecutor; 036import org.springframework.core.task.TaskRejectedException; 037import org.springframework.util.Assert; 038 039/** 040 * Simple implementation of the {@link JobLauncher} interface. The Spring Core 041 * {@link TaskExecutor} interface is used to launch a {@link Job}. This means 042 * that the type of executor set is very important. If a 043 * {@link SyncTaskExecutor} is used, then the job will be processed 044 * <strong>within the same thread that called the launcher.</strong> Care should 045 * be taken to ensure any users of this class understand fully whether or not 046 * the implementation of TaskExecutor used will start tasks synchronously or 047 * asynchronously. The default setting uses a synchronous task executor. 048 * 049 * There is only one required dependency of this Launcher, a 050 * {@link JobRepository}. The JobRepository is used to obtain a valid 051 * JobExecution. The Repository must be used because the provided {@link Job} 052 * could be a restart of an existing {@link JobInstance}, and only the 053 * Repository can reliably recreate it. 054 * 055 * @author Lucas Ward 056 * @author Dave Syer 057 * @author Will Schipp 058 * @author Michael Minella 059 * @author Mahmoud Ben Hassine 060 * 061 * @since 1.0 062 * 063 * @see JobRepository 064 * @see TaskExecutor 065 */ 066public class SimpleJobLauncher implements JobLauncher, InitializingBean { 067 068 protected static final Log logger = LogFactory.getLog(SimpleJobLauncher.class); 069 070 private JobRepository jobRepository; 071 072 private TaskExecutor taskExecutor; 073 074 /** 075 * Run the provided job with the given {@link JobParameters}. The 076 * {@link JobParameters} will be used to determine if this is an execution 077 * of an existing job instance, or if a new one should be created. 078 * 079 * @param job the job to be run. 080 * @param jobParameters the {@link JobParameters} for this particular 081 * execution. 082 * @return the {@link JobExecution} if it returns synchronously. If the 083 * implementation is asynchronous, the status might well be unknown. 084 * @throws JobExecutionAlreadyRunningException if the JobInstance already 085 * exists and has an execution already running. 086 * @throws JobRestartException if the execution would be a re-start, but a 087 * re-start is either not allowed or not needed. 088 * @throws JobInstanceAlreadyCompleteException if this instance has already 089 * completed successfully 090 * @throws JobParametersInvalidException thrown if jobParameters is invalid. 091 */ 092 @Override 093 public JobExecution run(final Job job, final JobParameters jobParameters) 094 throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, 095 JobParametersInvalidException { 096 097 Assert.notNull(job, "The Job must not be null."); 098 Assert.notNull(jobParameters, "The JobParameters must not be null."); 099 100 final JobExecution jobExecution; 101 JobExecution lastExecution = jobRepository.getLastJobExecution(job.getName(), jobParameters); 102 if (lastExecution != null) { 103 if (!job.isRestartable()) { 104 throw new JobRestartException("JobInstance already exists and is not restartable"); 105 } 106 /* 107 * validate here if it has stepExecutions that are UNKNOWN, STARTING, STARTED and STOPPING 108 * retrieve the previous execution and check 109 */ 110 for (StepExecution execution : lastExecution.getStepExecutions()) { 111 BatchStatus status = execution.getStatus(); 112 if (status.isRunning() || status == BatchStatus.STOPPING) { 113 throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: " 114 + lastExecution); 115 } else if (status == BatchStatus.UNKNOWN) { 116 throw new JobRestartException( 117 "Cannot restart step [" + execution.getStepName() + "] from UNKNOWN status. " 118 + "The last execution ended with a failure that could not be rolled back, " 119 + "so it may be dangerous to proceed. Manual intervention is probably necessary."); 120 } 121 } 122 } 123 124 // Check the validity of the parameters before doing creating anything 125 // in the repository... 126 job.getJobParametersValidator().validate(jobParameters); 127 128 /* 129 * There is a very small probability that a non-restartable job can be 130 * restarted, but only if another process or thread manages to launch 131 * <i>and</i> fail a job execution for this instance between the last 132 * assertion and the next method returning successfully. 133 */ 134 jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters); 135 136 try { 137 taskExecutor.execute(new Runnable() { 138 139 @Override 140 public void run() { 141 try { 142 logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters 143 + "]"); 144 job.execute(jobExecution); 145 logger.info("Job: [" + job + "] completed with the following parameters: [" + jobParameters 146 + "] and the following status: [" + jobExecution.getStatus() + "]"); 147 } 148 catch (Throwable t) { 149 logger.info("Job: [" + job 150 + "] failed unexpectedly and fatally with the following parameters: [" + jobParameters 151 + "]", t); 152 rethrow(t); 153 } 154 } 155 156 private void rethrow(Throwable t) { 157 if (t instanceof RuntimeException) { 158 throw (RuntimeException) t; 159 } 160 else if (t instanceof Error) { 161 throw (Error) t; 162 } 163 throw new IllegalStateException(t); 164 } 165 }); 166 } 167 catch (TaskRejectedException e) { 168 jobExecution.upgradeStatus(BatchStatus.FAILED); 169 if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) { 170 jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e)); 171 } 172 jobRepository.update(jobExecution); 173 } 174 175 return jobExecution; 176 } 177 178 /** 179 * Set the JobRepository. 180 * 181 * @param jobRepository instance of {@link JobRepository}. 182 */ 183 public void setJobRepository(JobRepository jobRepository) { 184 this.jobRepository = jobRepository; 185 } 186 187 /** 188 * Set the TaskExecutor. (Optional) 189 * 190 * @param taskExecutor instance of {@link TaskExecutor}. 191 */ 192 public void setTaskExecutor(TaskExecutor taskExecutor) { 193 this.taskExecutor = taskExecutor; 194 } 195 196 /** 197 * Ensure the required dependencies of a {@link JobRepository} have been 198 * set. 199 */ 200 @Override 201 public void afterPropertiesSet() throws Exception { 202 Assert.state(jobRepository != null, "A JobRepository has not been set."); 203 if (taskExecutor == null) { 204 logger.info("No TaskExecutor has been set, defaulting to synchronous executor."); 205 taskExecutor = new SyncTaskExecutor(); 206 } 207 } 208}