001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.launch.support; 017 018import java.util.ArrayList; 019import java.util.Date; 020import java.util.LinkedHashMap; 021import java.util.LinkedHashSet; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025import java.util.TreeSet; 026 027import org.apache.commons.logging.Log; 028import org.apache.commons.logging.LogFactory; 029import org.springframework.batch.core.BatchStatus; 030import org.springframework.batch.core.Job; 031import org.springframework.batch.core.JobExecution; 032import org.springframework.batch.core.JobInstance; 033import org.springframework.batch.core.JobParameters; 034import org.springframework.batch.core.JobParametersBuilder; 035import org.springframework.batch.core.JobParametersInvalidException; 036import org.springframework.batch.core.Step; 037import org.springframework.batch.core.StepExecution; 038import org.springframework.batch.core.UnexpectedJobExecutionException; 039import org.springframework.batch.core.configuration.JobRegistry; 040import org.springframework.batch.core.configuration.ListableJobLocator; 041import org.springframework.batch.core.converter.DefaultJobParametersConverter; 042import org.springframework.batch.core.converter.JobParametersConverter; 043import org.springframework.batch.core.explore.JobExplorer; 044import org.springframework.batch.core.launch.JobExecutionNotRunningException; 045import org.springframework.batch.core.launch.JobInstanceAlreadyExistsException; 046import org.springframework.batch.core.launch.JobLauncher; 047import org.springframework.batch.core.launch.JobOperator; 048import org.springframework.batch.core.launch.NoSuchJobException; 049import org.springframework.batch.core.launch.NoSuchJobExecutionException; 050import org.springframework.batch.core.launch.NoSuchJobInstanceException; 051import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; 052import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; 053import org.springframework.batch.core.repository.JobRepository; 054import org.springframework.batch.core.repository.JobRestartException; 055import org.springframework.batch.core.scope.context.StepSynchronizationManager; 056import org.springframework.batch.core.step.NoSuchStepException; 057import org.springframework.batch.core.step.StepLocator; 058import org.springframework.batch.core.step.tasklet.StoppableTasklet; 059import org.springframework.batch.core.step.tasklet.Tasklet; 060import org.springframework.batch.core.step.tasklet.TaskletStep; 061import org.springframework.batch.support.PropertiesConverter; 062import org.springframework.beans.factory.InitializingBean; 063import org.springframework.transaction.annotation.Transactional; 064import org.springframework.util.Assert; 065 066/** 067 * Simple implementation of the JobOperator interface. Due to the amount of 068 * functionality the implementation is combining, the following dependencies 069 * are required: 070 * 071 * <ul> 072 * <li> {@link JobLauncher} 073 * <li> {@link JobExplorer} 074 * <li> {@link JobRepository} 075 * <li> {@link JobRegistry} 076 * </ul> 077 * 078 * @author Dave Syer 079 * @author Lucas Ward 080 * @author Will Schipp 081 * @author Mahmoud Ben Hassine 082 * @since 2.0 083 */ 084public class SimpleJobOperator implements JobOperator, InitializingBean { 085 086 private static final String ILLEGAL_STATE_MSG = "Illegal state (only happens on a race condition): " 087 + "%s with name=%s and parameters=%s"; 088 089 private ListableJobLocator jobRegistry; 090 091 private JobExplorer jobExplorer; 092 093 private JobLauncher jobLauncher; 094 095 private JobRepository jobRepository; 096 097 private JobParametersConverter jobParametersConverter = new DefaultJobParametersConverter(); 098 099 private final Log logger = LogFactory.getLog(getClass()); 100 101 /** 102 * Check mandatory properties. 103 * 104 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() 105 */ 106 @Override 107 public void afterPropertiesSet() throws Exception { 108 Assert.notNull(jobLauncher, "JobLauncher must be provided"); 109 Assert.notNull(jobRegistry, "JobLocator must be provided"); 110 Assert.notNull(jobExplorer, "JobExplorer must be provided"); 111 Assert.notNull(jobRepository, "JobRepository must be provided"); 112 } 113 114 /** 115 * Public setter for the {@link JobParametersConverter}. 116 * @param jobParametersConverter the {@link JobParametersConverter} to set 117 */ 118 public void setJobParametersConverter(JobParametersConverter jobParametersConverter) { 119 this.jobParametersConverter = jobParametersConverter; 120 } 121 122 /** 123 * Public setter for the {@link ListableJobLocator}. 124 * @param jobRegistry the {@link ListableJobLocator} to set 125 */ 126 public void setJobRegistry(ListableJobLocator jobRegistry) { 127 this.jobRegistry = jobRegistry; 128 } 129 130 /** 131 * Public setter for the {@link JobExplorer}. 132 * @param jobExplorer the {@link JobExplorer} to set 133 */ 134 public void setJobExplorer(JobExplorer jobExplorer) { 135 this.jobExplorer = jobExplorer; 136 } 137 138 public void setJobRepository(JobRepository jobRepository) { 139 this.jobRepository = jobRepository; 140 } 141 142 /** 143 * Public setter for the {@link JobLauncher}. 144 * @param jobLauncher the {@link JobLauncher} to set 145 */ 146 public void setJobLauncher(JobLauncher jobLauncher) { 147 this.jobLauncher = jobLauncher; 148 } 149 150 /* 151 * (non-Javadoc) 152 * 153 * @see org.springframework.batch.core.launch.JobOperator#getExecutions(java.lang.Long) 154 */ 155 @Override 156 public List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException { 157 JobInstance jobInstance = jobExplorer.getJobInstance(instanceId); 158 if (jobInstance == null) { 159 throw new NoSuchJobInstanceException(String.format("No job instance with id=%d", instanceId)); 160 } 161 List<Long> list = new ArrayList<Long>(); 162 for (JobExecution jobExecution : jobExplorer.getJobExecutions(jobInstance)) { 163 list.add(jobExecution.getId()); 164 } 165 return list; 166 } 167 168 /* 169 * (non-Javadoc) 170 * 171 * @see org.springframework.batch.core.launch.JobOperator#getJobNames() 172 */ 173 @Override 174 public Set<String> getJobNames() { 175 return new TreeSet<String>(jobRegistry.getJobNames()); 176 } 177 178 /* 179 * (non-Javadoc) 180 * 181 * @see JobOperator#getLastInstances(String, int, int) 182 */ 183 @Override 184 public List<Long> getJobInstances(String jobName, int start, int count) throws NoSuchJobException { 185 List<Long> list = new ArrayList<Long>(); 186 List<JobInstance> jobInstances = jobExplorer.getJobInstances(jobName, start, count); 187 for (JobInstance jobInstance : jobInstances) { 188 list.add(jobInstance.getId()); 189 } 190 if (list.isEmpty() && !jobRegistry.getJobNames().contains(jobName)) { 191 throw new NoSuchJobException("No such job (either in registry or in historical data): " + jobName); 192 } 193 return list; 194 } 195 196 /* 197 * (non-Javadoc) 198 * 199 * @see 200 * org.springframework.batch.core.launch.JobOperator#getParameters(java. 201 * lang.Long) 202 */ 203 @Override 204 public String getParameters(long executionId) throws NoSuchJobExecutionException { 205 JobExecution jobExecution = findExecutionById(executionId); 206 207 return PropertiesConverter.propertiesToString(jobParametersConverter.getProperties(jobExecution 208 .getJobParameters())); 209 } 210 211 /* 212 * (non-Javadoc) 213 * 214 * @see 215 * org.springframework.batch.core.launch.JobOperator#getRunningExecutions 216 * (java.lang.String) 217 */ 218 @Override 219 public Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException { 220 Set<Long> set = new LinkedHashSet<Long>(); 221 for (JobExecution jobExecution : jobExplorer.findRunningJobExecutions(jobName)) { 222 set.add(jobExecution.getId()); 223 } 224 if (set.isEmpty() && !jobRegistry.getJobNames().contains(jobName)) { 225 throw new NoSuchJobException("No such job (either in registry or in historical data): " + jobName); 226 } 227 return set; 228 } 229 230 /* 231 * (non-Javadoc) 232 * 233 * @see 234 * org.springframework.batch.core.launch.JobOperator#getStepExecutionSummaries 235 * (java.lang.Long) 236 */ 237 @Override 238 public Map<Long, String> getStepExecutionSummaries(long executionId) throws NoSuchJobExecutionException { 239 JobExecution jobExecution = findExecutionById(executionId); 240 241 Map<Long, String> map = new LinkedHashMap<Long, String>(); 242 for (StepExecution stepExecution : jobExecution.getStepExecutions()) { 243 map.put(stepExecution.getId(), stepExecution.toString()); 244 } 245 return map; 246 } 247 248 /* 249 * (non-Javadoc) 250 * 251 * @see 252 * org.springframework.batch.core.launch.JobOperator#getSummary(java.lang 253 * .Long) 254 */ 255 @Override 256 public String getSummary(long executionId) throws NoSuchJobExecutionException { 257 JobExecution jobExecution = findExecutionById(executionId); 258 return jobExecution.toString(); 259 } 260 261 /* 262 * (non-Javadoc) 263 * 264 * @see 265 * org.springframework.batch.core.launch.JobOperator#resume(java.lang.Long) 266 */ 267 @Override 268 public Long restart(long executionId) throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException, NoSuchJobException, JobRestartException, JobParametersInvalidException { 269 270 logger.info("Checking status of job execution with id=" + executionId); 271 272 JobExecution jobExecution = findExecutionById(executionId); 273 274 String jobName = jobExecution.getJobInstance().getJobName(); 275 Job job = jobRegistry.getJob(jobName); 276 JobParameters parameters = jobExecution.getJobParameters(); 277 278 logger.info(String.format("Attempting to resume job with name=%s and parameters=%s", jobName, parameters)); 279 try { 280 return jobLauncher.run(job, parameters).getId(); 281 } 282 catch (JobExecutionAlreadyRunningException e) { 283 throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job execution already running", 284 jobName, parameters), e); 285 } 286 287 } 288 289 /* 290 * (non-Javadoc) 291 * 292 * @see 293 * org.springframework.batch.core.launch.JobOperator#start(java.lang.String, 294 * java.lang.String) 295 */ 296 @Override 297 public Long start(String jobName, String parameters) throws NoSuchJobException, JobInstanceAlreadyExistsException, JobParametersInvalidException { 298 299 logger.info("Checking status of job with name=" + jobName); 300 301 JobParameters jobParameters = jobParametersConverter.getJobParameters(PropertiesConverter 302 .stringToProperties(parameters)); 303 304 if (jobRepository.isJobInstanceExists(jobName, jobParameters)) { 305 throw new JobInstanceAlreadyExistsException(String.format( 306 "Cannot start a job instance that already exists with name=%s and parameters=%s", jobName, 307 parameters)); 308 } 309 310 Job job = jobRegistry.getJob(jobName); 311 312 logger.info(String.format("Attempting to launch job with name=%s and parameters=%s", jobName, parameters)); 313 try { 314 return jobLauncher.run(job, jobParameters).getId(); 315 } 316 catch (JobExecutionAlreadyRunningException e) { 317 throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job execution already running", 318 jobName, parameters), e); 319 } 320 catch (JobRestartException e) { 321 throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job not restartable", jobName, 322 parameters), e); 323 } 324 catch (JobInstanceAlreadyCompleteException e) { 325 throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job already complete", jobName, 326 parameters), e); 327 } 328 329 } 330 331 /* 332 * (non-Javadoc) 333 * 334 * @see JobOperator#startNextInstance(String ) 335 */ 336 @Override 337 public Long startNextInstance(String jobName) throws NoSuchJobException, 338 UnexpectedJobExecutionException, JobParametersInvalidException { 339 340 logger.info("Locating parameters for next instance of job with name=" + jobName); 341 342 Job job = jobRegistry.getJob(jobName); 343 JobParameters parameters = new JobParametersBuilder(jobExplorer) 344 .getNextJobParameters(job) 345 .toJobParameters(); 346 347 logger.info(String.format("Attempting to launch job with name=%s and parameters=%s", jobName, parameters)); 348 try { 349 return jobLauncher.run(job, parameters).getId(); 350 } 351 catch (JobExecutionAlreadyRunningException e) { 352 throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job already running", jobName, 353 parameters), e); 354 } 355 catch (JobRestartException e) { 356 throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job not restartable", jobName, 357 parameters), e); 358 } 359 catch (JobInstanceAlreadyCompleteException e) { 360 throw new UnexpectedJobExecutionException(String.format(ILLEGAL_STATE_MSG, "job instance already complete", 361 jobName, parameters), e); 362 } 363 364 } 365 366 /* 367 * (non-Javadoc) 368 * 369 * @see 370 * org.springframework.batch.core.launch.JobOperator#stop(java.lang.Long) 371 */ 372 @Override 373 @Transactional 374 public boolean stop(long executionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException { 375 376 JobExecution jobExecution = findExecutionById(executionId); 377 // Indicate the execution should be stopped by setting it's status to 378 // 'STOPPING'. It is assumed that 379 // the step implementation will check this status at chunk boundaries. 380 BatchStatus status = jobExecution.getStatus(); 381 if (!(status == BatchStatus.STARTED || status == BatchStatus.STARTING)) { 382 throw new JobExecutionNotRunningException("JobExecution must be running so that it can be stopped: "+jobExecution); 383 } 384 jobExecution.setStatus(BatchStatus.STOPPING); 385 jobRepository.update(jobExecution); 386 387 try { 388 Job job = jobRegistry.getJob(jobExecution.getJobInstance().getJobName()); 389 if (job instanceof StepLocator) {//can only process as StepLocator is the only way to get the step object 390 //get the current stepExecution 391 for (StepExecution stepExecution : jobExecution.getStepExecutions()) { 392 if (stepExecution.getStatus().isRunning()) { 393 try { 394 //have the step execution that's running -> need to 'stop' it 395 Step step = ((StepLocator)job).getStep(stepExecution.getStepName()); 396 if (step instanceof TaskletStep) { 397 Tasklet tasklet = ((TaskletStep)step).getTasklet(); 398 if (tasklet instanceof StoppableTasklet) { 399 StepSynchronizationManager.register(stepExecution); 400 ((StoppableTasklet)tasklet).stop(); 401 StepSynchronizationManager.release(); 402 } 403 } 404 } 405 catch (NoSuchStepException e) { 406 logger.warn("Step not found",e); 407 } 408 } 409 } 410 } 411 } 412 catch (NoSuchJobException e) { 413 logger.warn("Cannot find Job object in the job registry. StoppableTasklet#stop() will not be called",e); 414 } 415 416 return true; 417 } 418 419 @Override 420 public JobExecution abandon(long jobExecutionId) throws NoSuchJobExecutionException, JobExecutionAlreadyRunningException { 421 JobExecution jobExecution = findExecutionById(jobExecutionId); 422 423 if (jobExecution.getStatus().isLessThan(BatchStatus.STOPPING)) { 424 throw new JobExecutionAlreadyRunningException( 425 "JobExecution is running or complete and therefore cannot be aborted"); 426 } 427 428 logger.info("Aborting job execution: " + jobExecution); 429 jobExecution.upgradeStatus(BatchStatus.ABANDONED); 430 jobExecution.setEndTime(new Date()); 431 jobRepository.update(jobExecution); 432 433 return jobExecution; 434 } 435 436 private JobExecution findExecutionById(long executionId) throws NoSuchJobExecutionException { 437 JobExecution jobExecution = jobExplorer.getJobExecution(executionId); 438 439 if (jobExecution == null) { 440 throw new NoSuchJobExecutionException("No JobExecution found for id: [" + executionId + "]"); 441 } 442 return jobExecution; 443 444 } 445}