001/* 002 * Copyright 2013-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.jsr.launch; 017 018import java.util.ArrayList; 019import java.util.Collection; 020import java.util.Collections; 021import java.util.Enumeration; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Map; 025import java.util.Properties; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.Semaphore; 029import javax.batch.operations.BatchRuntimeException; 030import javax.batch.operations.JobExecutionAlreadyCompleteException; 031import javax.batch.operations.JobExecutionIsRunningException; 032import javax.batch.operations.JobExecutionNotMostRecentException; 033import javax.batch.operations.JobExecutionNotRunningException; 034import javax.batch.operations.JobOperator; 035import javax.batch.operations.JobRestartException; 036import javax.batch.operations.JobSecurityException; 037import javax.batch.operations.JobStartException; 038import javax.batch.operations.NoSuchJobException; 039import javax.batch.operations.NoSuchJobExecutionException; 040import javax.batch.operations.NoSuchJobInstanceException; 041import javax.batch.runtime.BatchRuntime; 042import javax.batch.runtime.JobExecution; 043import javax.batch.runtime.JobInstance; 044import javax.batch.runtime.StepExecution; 045 046import org.apache.commons.logging.Log; 047import org.apache.commons.logging.LogFactory; 048 049import org.springframework.batch.core.BatchStatus; 050import org.springframework.batch.core.ExitStatus; 051import org.springframework.batch.core.Job; 052import org.springframework.batch.core.JobParameters; 053import org.springframework.batch.core.Step; 054import org.springframework.batch.core.configuration.DuplicateJobException; 055import org.springframework.batch.core.converter.JobParametersConverter; 056import org.springframework.batch.core.explore.JobExplorer; 057import org.springframework.batch.core.jsr.JsrJobContextFactoryBean; 058import org.springframework.batch.core.jsr.JsrJobExecution; 059import org.springframework.batch.core.jsr.JsrJobParametersConverter; 060import org.springframework.batch.core.jsr.JsrStepExecution; 061import org.springframework.batch.core.jsr.configuration.xml.JsrXmlApplicationContext; 062import org.springframework.batch.core.repository.JobRepository; 063import org.springframework.batch.core.scope.context.StepSynchronizationManager; 064import org.springframework.batch.core.step.NoSuchStepException; 065import org.springframework.batch.core.step.StepLocator; 066import org.springframework.batch.core.step.tasklet.StoppableTasklet; 067import org.springframework.batch.core.step.tasklet.Tasklet; 068import org.springframework.batch.core.step.tasklet.TaskletStep; 069import org.springframework.beans.BeansException; 070import org.springframework.beans.factory.BeanCreationException; 071import org.springframework.beans.factory.InitializingBean; 072import org.springframework.beans.factory.config.AutowireCapableBeanFactory; 073import org.springframework.beans.factory.config.BeanDefinition; 074import org.springframework.beans.factory.support.AbstractBeanDefinition; 075import org.springframework.beans.factory.support.BeanDefinitionBuilder; 076import org.springframework.context.ApplicationContext; 077import org.springframework.context.ApplicationContextAware; 078import org.springframework.context.support.GenericXmlApplicationContext; 079import org.springframework.core.convert.converter.Converter; 080import org.springframework.core.io.ClassPathResource; 081import org.springframework.core.io.Resource; 082import org.springframework.core.task.SimpleAsyncTaskExecutor; 083import org.springframework.core.task.TaskExecutor; 084import org.springframework.transaction.PlatformTransactionManager; 085import org.springframework.util.Assert; 086 087/** 088 * The entrance for executing batch jobs as defined by JSR-352. This class provides 089 * a single base {@link ApplicationContext} that is the equivalent to the following: 090 * 091 * <beans> 092 * <batch:job-repository id="jobRepository" ... /> 093 * 094 * <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> 095 * ... 096 * </bean> 097 * 098 * <bean id="batchJobOperator" class="org.springframework.batch.core.launch.support.SimpleJobOperator"> 099 * ... 100 * </bean> 101 * 102 * <bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean"> 103 * ... 104 * </bean> 105 * 106 * <bean id="dataSource" 107 * class="org.apache.commons.dbcp2.BasicDataSource"> 108 * ... 109 * </bean> 110 * 111 * <bean id="transactionManager" 112 * class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> 113 * ... 114 * </bean> 115 * 116 * <bean id="jobParametersConverter" class="org.springframework.batch.core.jsr.JsrJobParametersConverter"/> 117 * 118 * <bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry"/> 119 * 120 * <bean id="placeholderProperties" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> 121 * ... 122 * </bean> 123 * </beans> 124 * 125 * A custom configuration of the above components can be specified by providing a system property JSR-352-BASE-CONTEXT. 126 * The location that is provided by this system property will override any beans as defined in baseContext.xml. 127 * 128 * Calls to {@link JobOperator#start(String, Properties)} will provide a child context to the above context 129 * using the job definition and batch.xml if provided. 130 * 131 * By default, calls to start/restart will result in asynchronous execution of the batch job (via an asynchronous {@link TaskExecutor}. 132 * For synchronous behavior or customization of thread behavior, a different {@link TaskExecutor} implementation is required to 133 * be provided. 134 * 135 * <em>Note</em>: This class is intended to only be used for JSR-352 configured jobs. Use of 136 * this {@link JobOperator} to start/stop/restart Spring Batch jobs may result in unexpected behaviors due to 137 * how job instances are identified differently. 138 * 139 * @author Michael Minella 140 * @author Chris Schaefer 141 * @since 3.0 142 */ 143public class JsrJobOperator implements JobOperator, ApplicationContextAware, InitializingBean { 144 private static final String JSR_JOB_CONTEXT_BEAN_NAME = "jsr_jobContext"; 145 private final Log logger = LogFactory.getLog(getClass()); 146 147 private JobExplorer jobExplorer; 148 private JobRepository jobRepository; 149 private TaskExecutor taskExecutor; 150 private JobParametersConverter jobParametersConverter; 151 private ApplicationContext baseContext; 152 private PlatformTransactionManager transactionManager; 153 private static ExecutingJobRegistry jobRegistry = new ExecutingJobRegistry(); 154 155 /** 156 * Public constructor used by {@link BatchRuntime#getJobOperator()}. This will bootstrap a 157 * singleton ApplicationContext if one has not already been created (and will utilize the existing 158 * one if it has) to populate itself. 159 */ 160 public JsrJobOperator() { 161 162 this.baseContext = BaseContextHolder.getInstance().getContext(); 163 164 baseContext.getAutowireCapableBeanFactory().autowireBeanProperties(this, 165 AutowireCapableBeanFactory.AUTOWIRE_BY_TYPE, false); 166 167 if(taskExecutor == null) { 168 taskExecutor = new SimpleAsyncTaskExecutor(); 169 } 170 } 171 172 /** 173 * The no-arg constructor is used by the {@link BatchRuntime#getJobOperator()} and so bootstraps 174 * an {@link ApplicationContext}. This constructor does not and is therefore dependency injection 175 * friendly. Also useful for unit testing. 176 * 177 * @param jobExplorer an instance of Spring Batch's {@link JobExplorer}. 178 * @param jobRepository an instance of Spring Batch's {@link JobOperator}. 179 * @param jobParametersConverter an instance of Spring Batch's {@link JobParametersConverter}. 180 * @param transactionManager a {@link javax.transaction.TransactionManager}. 181 */ 182 public JsrJobOperator(JobExplorer jobExplorer, JobRepository jobRepository, JobParametersConverter jobParametersConverter, PlatformTransactionManager transactionManager) { 183 Assert.notNull(jobExplorer, "A JobExplorer is required"); 184 Assert.notNull(jobRepository, "A JobRepository is required"); 185 Assert.notNull(jobParametersConverter, "A ParametersConverter is required"); 186 Assert.notNull(transactionManager, "A PlatformTransactionManager is required"); 187 188 this.jobExplorer = jobExplorer; 189 this.jobRepository = jobRepository; 190 this.jobParametersConverter = jobParametersConverter; 191 this.transactionManager = transactionManager; 192 } 193 194 public void setJobExplorer(JobExplorer jobExplorer) { 195 Assert.notNull(jobExplorer, "A JobExplorer is required"); 196 197 this.jobExplorer = jobExplorer; 198 } 199 200 public void setJobRepository(JobRepository jobRepository) { 201 Assert.notNull(jobRepository, "A JobRepository is required"); 202 203 this.jobRepository = jobRepository; 204 } 205 206 public void setTransactionManager(PlatformTransactionManager transactionManager) { 207 Assert.notNull(transactionManager, "A PlatformTransactionManager is required"); 208 209 this.transactionManager = transactionManager; 210 } 211 212 public void setTaskExecutor(TaskExecutor taskExecutor) { 213 this.taskExecutor = taskExecutor; 214 } 215 216 protected TaskExecutor getTaskExecutor() { 217 return taskExecutor; 218 } 219 220 @Override 221 public void afterPropertiesSet() throws Exception { 222 if (this.taskExecutor == null) { 223 this.taskExecutor = new SimpleAsyncTaskExecutor(); 224 } 225 } 226 227 /** 228 * Used to convert the {@link Properties} objects used by JSR-352 to the {@link JobParameters} 229 * objects used in Spring Batch. The default implementation used will configure all parameters 230 * to be non-identifying (per the JSR). 231 * 232 * @param converter A {@link Converter} implementation used to convert {@link Properties} to 233 * {@link JobParameters} 234 */ 235 public void setJobParametersConverter(JobParametersConverter converter) { 236 Assert.notNull(converter, "A Converter is required"); 237 238 this.jobParametersConverter = converter; 239 } 240 241 /* (non-Javadoc) 242 * @see javax.batch.operations.JobOperator#abandon(long) 243 */ 244 @Override 245 public void abandon(long jobExecutionId) throws NoSuchJobExecutionException, 246 JobExecutionIsRunningException, JobSecurityException { 247 org.springframework.batch.core.JobExecution jobExecution = jobExplorer.getJobExecution(jobExecutionId); 248 249 if(jobExecution == null) { 250 throw new NoSuchJobExecutionException("Unable to retrieve JobExecution for id " + jobExecutionId); 251 } 252 253 if(jobExecution.isRunning()) { 254 throw new JobExecutionIsRunningException("Unable to abandon a job that is currently running"); 255 } 256 257 jobExecution.upgradeStatus(BatchStatus.ABANDONED); 258 jobRepository.update(jobExecution); 259 } 260 261 /* (non-Javadoc) 262 * @see javax.batch.operations.JobOperator#getJobExecution(long) 263 */ 264 @Override 265 public JobExecution getJobExecution(long executionId) 266 throws NoSuchJobExecutionException, JobSecurityException { 267 org.springframework.batch.core.JobExecution jobExecution = jobExplorer.getJobExecution(executionId); 268 269 if(jobExecution == null) { 270 throw new NoSuchJobExecutionException("No execution was found for executionId " + executionId); 271 } 272 273 return new JsrJobExecution(jobExecution, jobParametersConverter); 274 } 275 276 /* (non-Javadoc) 277 * @see javax.batch.operations.JobOperator#getJobExecutions(javax.batch.runtime.JobInstance) 278 */ 279 @Override 280 public List<JobExecution> getJobExecutions(JobInstance jobInstance) 281 throws NoSuchJobInstanceException, JobSecurityException { 282 if(jobInstance == null) { 283 throw new NoSuchJobInstanceException("A null JobInstance was provided"); 284 } 285 286 org.springframework.batch.core.JobInstance instance = (org.springframework.batch.core.JobInstance) jobInstance; 287 List<org.springframework.batch.core.JobExecution> batchExecutions = jobExplorer.getJobExecutions(instance); 288 289 if(batchExecutions == null || batchExecutions.size() == 0) { 290 throw new NoSuchJobInstanceException("Unable to find JobInstance " + jobInstance.getInstanceId()); 291 } 292 293 List<JobExecution> results = new ArrayList<JobExecution>(batchExecutions.size()); 294 for (org.springframework.batch.core.JobExecution jobExecution : batchExecutions) { 295 results.add(new JsrJobExecution(jobExecution, jobParametersConverter)); 296 } 297 298 return results; 299 } 300 301 /* (non-Javadoc) 302 * @see javax.batch.operations.JobOperator#getJobInstance(long) 303 */ 304 @Override 305 public JobInstance getJobInstance(long executionId) 306 throws NoSuchJobExecutionException, JobSecurityException { 307 org.springframework.batch.core.JobExecution execution = jobExplorer.getJobExecution(executionId); 308 309 if(execution == null) { 310 throw new NoSuchJobExecutionException("The JobExecution was not found"); 311 } 312 313 return jobExplorer.getJobInstance(execution.getJobInstance().getId()); 314 } 315 316 /* (non-Javadoc) 317 * @see javax.batch.operations.JobOperator#getJobInstanceCount(java.lang.String) 318 */ 319 @Override 320 public int getJobInstanceCount(String jobName) throws NoSuchJobException, 321 JobSecurityException { 322 try { 323 int count = jobExplorer.getJobInstanceCount(jobName); 324 325 if(count <= 0) { 326 throw new NoSuchJobException("No job instances were found for job name " + jobName); 327 } else { 328 return count; 329 } 330 } catch (org.springframework.batch.core.launch.NoSuchJobException e) { 331 throw new NoSuchJobException("No job instances were found for job name " + jobName); 332 } 333 } 334 335 /* (non-Javadoc) 336 * @see javax.batch.operations.JobOperator#getJobInstances(java.lang.String, int, int) 337 */ 338 @Override 339 public List<JobInstance> getJobInstances(String jobName, int start, int count) 340 throws NoSuchJobException, JobSecurityException { 341 List<org.springframework.batch.core.JobInstance> jobInstances = jobExplorer.getJobInstances(jobName, start, count); 342 343 if(jobInstances == null || jobInstances.size() == 0) { 344 throw new NoSuchJobException("The job was not found"); 345 } 346 347 return new ArrayList<JobInstance>(jobInstances); 348 } 349 350 /* (non-Javadoc) 351 * @see javax.batch.operations.JobOperator#getJobNames() 352 */ 353 @Override 354 public Set<String> getJobNames() throws JobSecurityException { 355 return new HashSet<String>(jobExplorer.getJobNames()); 356 } 357 358 /* (non-Javadoc) 359 * @see javax.batch.operations.JobOperator#getParameters(long) 360 */ 361 @Override 362 public Properties getParameters(long executionId) 363 throws NoSuchJobExecutionException, JobSecurityException { 364 org.springframework.batch.core.JobExecution execution = jobExplorer.getJobExecution(executionId); 365 366 if(execution == null) { 367 throw new NoSuchJobExecutionException("Unable to find the JobExecution for id " + executionId); 368 } 369 370 Properties properties = jobParametersConverter.getProperties(execution.getJobParameters()); 371 properties.remove(JsrJobParametersConverter.JOB_RUN_ID); 372 373 return properties; 374 } 375 376 /* (non-Javadoc) 377 * @see javax.batch.operations.JobOperator#getRunningExecutions(java.lang.String) 378 */ 379 @Override 380 public List<Long> getRunningExecutions(String name) 381 throws NoSuchJobException, JobSecurityException { 382 Set<org.springframework.batch.core.JobExecution> findRunningJobExecutions = jobExplorer.findRunningJobExecutions(name); 383 384 if(findRunningJobExecutions.isEmpty()) { 385 throw new NoSuchJobException("Job name: " + name + " not found."); 386 } 387 388 List<Long> results = new ArrayList<Long>(findRunningJobExecutions.size()); 389 390 for (org.springframework.batch.core.JobExecution jobExecution : findRunningJobExecutions) { 391 results.add(jobExecution.getId()); 392 } 393 394 return results; 395 } 396 397 /* (non-Javadoc) 398 * @see javax.batch.operations.JobOperator#getStepExecutions(long) 399 */ 400 @Override 401 public List<StepExecution> getStepExecutions(long executionId) 402 throws NoSuchJobExecutionException, JobSecurityException { 403 org.springframework.batch.core.JobExecution execution = jobExplorer.getJobExecution(executionId); 404 405 if(execution == null) { 406 throw new NoSuchJobException("JobExecution with the id " + executionId + " was not found"); 407 } 408 409 Collection<org.springframework.batch.core.StepExecution> executions = execution.getStepExecutions(); 410 411 List<StepExecution> batchExecutions = new ArrayList<StepExecution>(); 412 413 if(executions != null) { 414 for (org.springframework.batch.core.StepExecution stepExecution : executions) { 415 if(!stepExecution.getStepName().contains(":partition")) { 416 batchExecutions.add(new JsrStepExecution(jobExplorer.getStepExecution(executionId, stepExecution.getId()))); 417 } 418 } 419 } 420 421 return batchExecutions; 422 } 423 424 /** 425 * Creates a child {@link ApplicationContext} for the job being requested based upon 426 * the /META-INF/batch.xml (if exists) and the /META-INF/batch-jobs/<jobName>.xml 427 * configuration and restart the job. 428 * 429 * @param executionId the database id of the job execution to be restarted. 430 * @param params any job parameters to be used during the execution of this job. 431 * @throws JobExecutionAlreadyCompleteException thrown if the requested job execution has 432 * a status of COMPLETE 433 * @throws NoSuchJobExecutionException throw if the requested job execution does not exist 434 * in the repository 435 * @throws JobExecutionNotMostRecentException thrown if the requested job execution is not 436 * the most recent attempt for the job instance it's related to. 437 * @throws JobRestartException thrown for any general errors during the job restart process 438 */ 439 @Override 440 public long restart(long executionId, Properties params) 441 throws JobExecutionAlreadyCompleteException, 442 NoSuchJobExecutionException, JobExecutionNotMostRecentException, 443 JobRestartException, JobSecurityException { 444 org.springframework.batch.core.JobExecution previousJobExecution = jobExplorer.getJobExecution(executionId); 445 446 if (previousJobExecution == null) { 447 throw new NoSuchJobExecutionException("No JobExecution found for id: [" + executionId + "]"); 448 } else if(previousJobExecution.getStatus().equals(BatchStatus.COMPLETED)) { 449 throw new JobExecutionAlreadyCompleteException("The requested job has already completed"); 450 } 451 452 List<org.springframework.batch.core.JobExecution> previousExecutions = jobExplorer.getJobExecutions(previousJobExecution.getJobInstance()); 453 454 for (org.springframework.batch.core.JobExecution jobExecution : previousExecutions) { 455 if(jobExecution.getCreateTime().compareTo(previousJobExecution.getCreateTime()) > 0) { 456 throw new JobExecutionNotMostRecentException("The requested JobExecution to restart was not the most recently run"); 457 } 458 459 if(jobExecution.getStatus().equals(BatchStatus.ABANDONED)) { 460 throw new JobRestartException("JobExecution ID: " + jobExecution.getId() + " is abandoned and attempted to be restarted."); 461 } 462 } 463 464 final String jobName = previousJobExecution.getJobInstance().getJobName(); 465 466 Properties jobRestartProperties = getJobRestartProperties(params, previousJobExecution); 467 468 final JsrXmlApplicationContext batchContext = new JsrXmlApplicationContext(jobRestartProperties); 469 batchContext.setValidating(false); 470 471 Resource batchXml = new ClassPathResource("/META-INF/batch.xml"); 472 Resource jobXml = new ClassPathResource(previousJobExecution.getJobConfigurationName()); 473 474 if(batchXml.exists()) { 475 batchContext.load(batchXml); 476 } 477 478 if(jobXml.exists()) { 479 batchContext.load(jobXml); 480 } 481 482 AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.genericBeanDefinition("org.springframework.batch.core.jsr.JsrJobContextFactoryBean").getBeanDefinition(); 483 beanDefinition.setScope(BeanDefinition.SCOPE_SINGLETON); 484 batchContext.registerBeanDefinition(JSR_JOB_CONTEXT_BEAN_NAME, beanDefinition); 485 486 batchContext.setParent(baseContext); 487 488 try { 489 batchContext.refresh(); 490 } catch (BeanCreationException e) { 491 throw new JobRestartException(e); 492 } 493 494 final org.springframework.batch.core.JobExecution jobExecution; 495 496 try { 497 JobParameters jobParameters = jobParametersConverter.getJobParameters(jobRestartProperties); 498 jobExecution = jobRepository.createJobExecution(previousJobExecution.getJobInstance(), jobParameters, previousJobExecution.getJobConfigurationName()); 499 } catch (Exception e) { 500 throw new JobRestartException(e); 501 } 502 503 try { 504 final Semaphore semaphore = new Semaphore(1); 505 final List<Exception> exceptionHolder = Collections.synchronizedList(new ArrayList<Exception>()); 506 semaphore.acquire(); 507 508 taskExecutor.execute(new Runnable() { 509 510 @Override 511 public void run() { 512 JsrJobContextFactoryBean factoryBean = null; 513 try { 514 factoryBean = (JsrJobContextFactoryBean) batchContext.getBean("&" + JSR_JOB_CONTEXT_BEAN_NAME); 515 factoryBean.setJobExecution(jobExecution); 516 final Job job = batchContext.getBean(Job.class); 517 518 if(!job.isRestartable()) { 519 throw new JobRestartException("Job " + jobName + " is not restartable"); 520 } 521 522 semaphore.release(); 523 // Initialization of the JobExecution for job level dependencies 524 jobRegistry.register(job, jobExecution); 525 job.execute(jobExecution); 526 jobRegistry.remove(jobExecution); 527 } 528 catch (Exception e) { 529 exceptionHolder.add(e); 530 } finally { 531 if(factoryBean != null) { 532 factoryBean.close(); 533 } 534 535 batchContext.close(); 536 537 if(semaphore.availablePermits() == 0) { 538 semaphore.release(); 539 } 540 } 541 } 542 }); 543 544 semaphore.acquire(); 545 if(exceptionHolder.size() > 0) { 546 semaphore.release(); 547 throw new JobRestartException(exceptionHolder.get(0)); 548 } 549 } 550 catch (Exception e) { 551 jobExecution.upgradeStatus(BatchStatus.FAILED); 552 if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) { 553 jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e)); 554 } 555 556 jobRepository.update(jobExecution); 557 558 if(batchContext.isActive()) { 559 batchContext.close(); 560 } 561 562 throw new JobRestartException(e); 563 } 564 565 return jobExecution.getId(); 566 } 567 568 protected Properties getJobRestartProperties(Properties params, org.springframework.batch.core.JobExecution previousJobExecution) { 569 Properties jobRestartProperties = new Properties(); 570 571 if (previousJobExecution != null) { 572 JobParameters previousJobParameters = previousJobExecution.getJobParameters(); 573 574 if (previousJobParameters != null && !previousJobParameters.isEmpty()) { 575 jobRestartProperties.putAll(previousJobParameters.toProperties()); 576 } 577 } 578 579 if (params != null) { 580 Enumeration<?> propertyNames = params.propertyNames(); 581 582 while(propertyNames.hasMoreElements()) { 583 String curName = (String) propertyNames.nextElement(); 584 jobRestartProperties.setProperty(curName, params.getProperty(curName)); 585 } 586 } 587 588 return jobRestartProperties; 589 } 590 591 /** 592 * Creates a child {@link ApplicationContext} for the job being requested based upon 593 * the /META-INF/batch.xml (if exists) and the /META-INF/batch-jobs/<jobName>.xml 594 * configuration and launches the job. Per JSR-352, calls to this method will always 595 * create a new {@link JobInstance} (and related {@link JobExecution}). 596 * 597 * @param jobName the name of the job XML file without the .xml that is located within the 598 * /META-INF/batch-jobs directory. 599 * @param params any job parameters to be used during the execution of this job. 600 */ 601 @Override 602 public long start(String jobName, Properties params) throws JobStartException, 603 JobSecurityException { 604 final JsrXmlApplicationContext batchContext = new JsrXmlApplicationContext(params); 605 batchContext.setValidating(false); 606 607 Resource batchXml = new ClassPathResource("/META-INF/batch.xml"); 608 String jobConfigurationLocation = "/META-INF/batch-jobs/" + jobName + ".xml"; 609 Resource jobXml = new ClassPathResource(jobConfigurationLocation); 610 611 if(batchXml.exists()) { 612 batchContext.load(batchXml); 613 } 614 615 if(jobXml.exists()) { 616 batchContext.load(jobXml); 617 } 618 619 AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.genericBeanDefinition("org.springframework.batch.core.jsr.JsrJobContextFactoryBean").getBeanDefinition(); 620 beanDefinition.setScope(BeanDefinition.SCOPE_SINGLETON); 621 batchContext.registerBeanDefinition(JSR_JOB_CONTEXT_BEAN_NAME, beanDefinition); 622 623 if(baseContext != null) { 624 batchContext.setParent(baseContext); 625 } else { 626 batchContext.getBeanFactory().registerSingleton("jobExplorer", jobExplorer); 627 batchContext.getBeanFactory().registerSingleton("jobRepository", jobRepository); 628 batchContext.getBeanFactory().registerSingleton("jobParametersConverter", jobParametersConverter); 629 batchContext.getBeanFactory().registerSingleton("transactionManager", transactionManager); 630 } 631 632 try { 633 batchContext.refresh(); 634 } catch (BeanCreationException e) { 635 throw new JobStartException(e); 636 } 637 638 Assert.notNull(jobName, "The job name must not be null."); 639 640 final org.springframework.batch.core.JobExecution jobExecution; 641 642 try { 643 JobParameters jobParameters = jobParametersConverter.getJobParameters(params); 644 String [] jobNames = batchContext.getBeanNamesForType(Job.class); 645 646 if(jobNames == null || jobNames.length <= 0) { 647 throw new BatchRuntimeException("No Job defined in current context"); 648 } 649 650 org.springframework.batch.core.JobInstance jobInstance = jobRepository.createJobInstance(jobNames[0], jobParameters); 651 jobExecution = jobRepository.createJobExecution(jobInstance, jobParameters, jobConfigurationLocation); 652 } catch (Exception e) { 653 throw new JobStartException(e); 654 } 655 656 try { 657 final Semaphore semaphore = new Semaphore(1); 658 final List<Exception> exceptionHolder = Collections.synchronizedList(new ArrayList<Exception>()); 659 semaphore.acquire(); 660 661 taskExecutor.execute(new Runnable() { 662 663 @Override 664 public void run() { 665 JsrJobContextFactoryBean factoryBean = null; 666 try { 667 factoryBean = (JsrJobContextFactoryBean) batchContext.getBean("&" + JSR_JOB_CONTEXT_BEAN_NAME); 668 factoryBean.setJobExecution(jobExecution); 669 final Job job = batchContext.getBean(Job.class); 670 semaphore.release(); 671 // Initialization of the JobExecution for job level dependencies 672 jobRegistry.register(job, jobExecution); 673 job.execute(jobExecution); 674 jobRegistry.remove(jobExecution); 675 } 676 catch (Exception e) { 677 exceptionHolder.add(e); 678 } finally { 679 if(factoryBean != null) { 680 factoryBean.close(); 681 } 682 683 batchContext.close(); 684 685 if(semaphore.availablePermits() == 0) { 686 semaphore.release(); 687 } 688 } 689 } 690 }); 691 692 semaphore.acquire(); 693 if(exceptionHolder.size() > 0) { 694 semaphore.release(); 695 throw new JobStartException(exceptionHolder.get(0)); 696 } 697 } 698 catch (Exception e) { 699 if(jobRegistry.exists(jobExecution.getId())) { 700 jobRegistry.remove(jobExecution); 701 } 702 jobExecution.upgradeStatus(BatchStatus.FAILED); 703 if (jobExecution.getExitStatus().equals(ExitStatus.UNKNOWN)) { 704 jobExecution.setExitStatus(ExitStatus.FAILED.addExitDescription(e)); 705 } 706 jobRepository.update(jobExecution); 707 708 if(batchContext.isActive()) { 709 batchContext.close(); 710 } 711 712 throw new JobStartException(e); 713 } 714 return jobExecution.getId(); 715 } 716 717 /** 718 * Stops the running job execution if it is currently running. 719 * 720 * @param executionId the database id for the {@link JobExecution} to be stopped. 721 * @throws NoSuchJobExecutionException thrown if {@link JobExecution} instance does not exist. 722 * @throws JobExecutionNotRunningException thrown if {@link JobExecution} is not running. 723 */ 724 @Override 725 public void stop(long executionId) throws NoSuchJobExecutionException, 726 JobExecutionNotRunningException, JobSecurityException { 727 org.springframework.batch.core.JobExecution jobExecution = jobExplorer.getJobExecution(executionId); 728 // Indicate the execution should be stopped by setting it's status to 729 // 'STOPPING'. It is assumed that 730 // the step implementation will check this status at chunk boundaries. 731 BatchStatus status = jobExecution.getStatus(); 732 if (!(status == BatchStatus.STARTED || status == BatchStatus.STARTING)) { 733 throw new JobExecutionNotRunningException("JobExecution must be running so that it can be stopped: "+jobExecution); 734 } 735 jobExecution.setStatus(BatchStatus.STOPPING); 736 jobRepository.update(jobExecution); 737 738 try { 739 Job job = jobRegistry.getJob(jobExecution.getId()); 740 if (job instanceof StepLocator) {//can only process as StepLocator is the only way to get the step object 741 //get the current stepExecution 742 for (org.springframework.batch.core.StepExecution stepExecution : jobExecution.getStepExecutions()) { 743 if (stepExecution.getStatus().isRunning()) { 744 try { 745 //have the step execution that's running -> need to 'stop' it 746 Step step = ((StepLocator)job).getStep(stepExecution.getStepName()); 747 if (step instanceof TaskletStep) { 748 Tasklet tasklet = ((TaskletStep)step).getTasklet(); 749 if (tasklet instanceof StoppableTasklet) { 750 StepSynchronizationManager.register(stepExecution); 751 ((StoppableTasklet)tasklet).stop(); 752 StepSynchronizationManager.release(); 753 } 754 } 755 } 756 catch (NoSuchStepException e) { 757 logger.warn("Step not found",e); 758 } 759 } 760 } 761 } 762 } 763 catch (NoSuchJobException e) { 764 logger.warn("Cannot find Job object",e); 765 } 766 } 767 768 @Override 769 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { 770 baseContext = applicationContext; 771 } 772 773 private static class ExecutingJobRegistry { 774 775 private Map<Long, Job> registry = new ConcurrentHashMap<Long, Job>(); 776 777 public void register(Job job, org.springframework.batch.core.JobExecution jobExecution) throws DuplicateJobException { 778 779 if(registry.containsKey(jobExecution.getId())) { 780 throw new DuplicateJobException("This job execution has already been registered"); 781 } else { 782 registry.put(jobExecution.getId(), job); 783 } 784 } 785 786 public void remove(org.springframework.batch.core.JobExecution jobExecution) { 787 if(!registry.containsKey(jobExecution.getId())) { 788 throw new NoSuchJobExecutionException("The job execution " + jobExecution.getId() + " was not found"); 789 } else { 790 registry.remove(jobExecution.getId()); 791 } 792 } 793 794 public boolean exists(long jobExecutionId) { 795 return registry.containsKey(jobExecutionId); 796 } 797 798 public Job getJob(long jobExecutionId) { 799 if(!registry.containsKey(jobExecutionId)) { 800 throw new NoSuchJobExecutionException("The job execution " + jobExecutionId + " was not found"); 801 } else { 802 return registry.get(jobExecutionId); 803 } 804 } 805 } 806 807 /** 808 * A singleton holder used to lazily bootstrap the base context used in JSR-352. 809 */ 810 protected static class BaseContextHolder { 811 812 private ApplicationContext context; 813 814 private static BaseContextHolder instance; 815 816 private BaseContextHolder() { 817 synchronized (BaseContextHolder.class) { 818 if(this.context == null) { 819 String overrideContextLocation = System.getProperty("JSR-352-BASE-CONTEXT"); 820 821 List<String> contextLocations = new ArrayList<>(); 822 823 contextLocations.add("jsrBaseContext.xml"); 824 825 if(overrideContextLocation != null) { 826 contextLocations.add(overrideContextLocation); 827 } 828 829 this.context = new GenericXmlApplicationContext( 830 contextLocations.toArray(new String[contextLocations.size()])); 831 } 832 } 833 } 834 835 public static BaseContextHolder getInstance() { 836 synchronized (BaseContextHolder.class) { 837 if(instance == null) { 838 instance = new BaseContextHolder(); 839 } 840 } 841 842 return instance; 843 } 844 845 public ApplicationContext getContext() { 846 return this.context; 847 } 848 } 849}