001/* 002 * Copyright 2006-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.core.job; 018 019import java.util.Collection; 020import java.util.Date; 021 022import org.apache.commons.logging.Log; 023import org.apache.commons.logging.LogFactory; 024import org.springframework.batch.core.BatchStatus; 025import org.springframework.batch.core.ExitStatus; 026import org.springframework.batch.core.Job; 027import org.springframework.batch.core.JobExecution; 028import org.springframework.batch.core.JobExecutionException; 029import org.springframework.batch.core.JobExecutionListener; 030import org.springframework.batch.core.JobInterruptedException; 031import org.springframework.batch.core.JobParametersIncrementer; 032import org.springframework.batch.core.JobParametersValidator; 033import org.springframework.batch.core.StartLimitExceededException; 034import org.springframework.batch.core.Step; 035import org.springframework.batch.core.StepExecution; 036import org.springframework.batch.core.launch.NoSuchJobException; 037import org.springframework.batch.core.launch.support.ExitCodeMapper; 038import org.springframework.batch.core.listener.CompositeJobExecutionListener; 039import org.springframework.batch.core.repository.JobRepository; 040import org.springframework.batch.core.repository.JobRestartException; 041import org.springframework.batch.core.scope.context.JobSynchronizationManager; 042import org.springframework.batch.core.step.StepLocator; 043import org.springframework.batch.repeat.RepeatException; 044import org.springframework.beans.factory.BeanNameAware; 045import org.springframework.beans.factory.InitializingBean; 046import org.springframework.lang.Nullable; 047import org.springframework.util.Assert; 048import org.springframework.util.ClassUtils; 049 050/** 051 * Abstract implementation of the {@link Job} interface. Common dependencies 052 * such as a {@link JobRepository}, {@link JobExecutionListener}s, and various 053 * configuration parameters are set here. Therefore, common error handling and 054 * listener calling activities are abstracted away from implementations. 055 * 056 * @author Lucas Ward 057 * @author Dave Syer 058 * @author Mahmoud Ben Hassine 059 */ 060public abstract class AbstractJob implements Job, StepLocator, BeanNameAware, 061InitializingBean { 062 063 protected static final Log logger = LogFactory.getLog(AbstractJob.class); 064 065 private String name; 066 067 private boolean restartable = true; 068 069 private JobRepository jobRepository; 070 071 private CompositeJobExecutionListener listener = new CompositeJobExecutionListener(); 072 073 private JobParametersIncrementer jobParametersIncrementer; 074 075 private JobParametersValidator jobParametersValidator = new DefaultJobParametersValidator(); 076 077 private StepHandler stepHandler; 078 079 /** 080 * Default constructor. 081 */ 082 public AbstractJob() { 083 super(); 084 } 085 086 /** 087 * Convenience constructor to immediately add name (which is mandatory but 088 * not final). 089 * 090 * @param name name of the job 091 */ 092 public AbstractJob(String name) { 093 super(); 094 this.name = name; 095 } 096 097 /** 098 * A validator for job parameters. Defaults to a vanilla 099 * {@link DefaultJobParametersValidator}. 100 * 101 * @param jobParametersValidator 102 * a validator instance 103 */ 104 public void setJobParametersValidator( 105 JobParametersValidator jobParametersValidator) { 106 this.jobParametersValidator = jobParametersValidator; 107 } 108 109 /** 110 * Assert mandatory properties: {@link JobRepository}. 111 * 112 * @see InitializingBean#afterPropertiesSet() 113 */ 114 @Override 115 public void afterPropertiesSet() throws Exception { 116 Assert.notNull(jobRepository, "JobRepository must be set"); 117 } 118 119 /** 120 * Set the name property if it is not already set. Because of the order of 121 * the callbacks in a Spring container the name property will be set first 122 * if it is present. Care is needed with bean definition inheritance - if a 123 * parent bean has a name, then its children need an explicit name as well, 124 * otherwise they will not be unique. 125 * 126 * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String) 127 */ 128 @Override 129 public void setBeanName(String name) { 130 if (this.name == null) { 131 this.name = name; 132 } 133 } 134 135 /** 136 * Set the name property. Always overrides the default value if this object 137 * is a Spring bean. 138 * 139 * @param name the name to be associated with the job. 140 * 141 * @see #setBeanName(java.lang.String) 142 */ 143 public void setName(String name) { 144 this.name = name; 145 } 146 147 /* 148 * (non-Javadoc) 149 * 150 * @see org.springframework.batch.core.domain.IJob#getName() 151 */ 152 @Override 153 public String getName() { 154 return name; 155 } 156 157 /** 158 * Retrieve the step with the given name. If there is no Step with the given 159 * name, then return null. 160 * 161 * @param stepName name of the step 162 * @return the Step 163 */ 164 @Override 165 public abstract Step getStep(String stepName); 166 167 /** 168 * Retrieve the step names. 169 * 170 * @return the step names 171 */ 172 @Override 173 public abstract Collection<String> getStepNames(); 174 175 @Override 176 public JobParametersValidator getJobParametersValidator() { 177 return jobParametersValidator; 178 } 179 180 /** 181 * Boolean flag to prevent categorically a job from restarting, even if it 182 * has failed previously. 183 * 184 * @param restartable 185 * the value of the flag to set (default true) 186 */ 187 public void setRestartable(boolean restartable) { 188 this.restartable = restartable; 189 } 190 191 /** 192 * @see Job#isRestartable() 193 */ 194 @Override 195 public boolean isRestartable() { 196 return restartable; 197 } 198 199 /** 200 * Public setter for the {@link JobParametersIncrementer}. 201 * 202 * @param jobParametersIncrementer 203 * the {@link JobParametersIncrementer} to set 204 */ 205 public void setJobParametersIncrementer( 206 JobParametersIncrementer jobParametersIncrementer) { 207 this.jobParametersIncrementer = jobParametersIncrementer; 208 } 209 210 /* 211 * (non-Javadoc) 212 * 213 * @see org.springframework.batch.core.Job#getJobParametersIncrementer() 214 */ 215 @Override 216 @Nullable 217 public JobParametersIncrementer getJobParametersIncrementer() { 218 return this.jobParametersIncrementer; 219 } 220 221 /** 222 * Public setter for injecting {@link JobExecutionListener}s. They will all 223 * be given the listener callbacks at the appropriate point in the job. 224 * 225 * @param listeners 226 * the listeners to set. 227 */ 228 public void setJobExecutionListeners(JobExecutionListener[] listeners) { 229 for (int i = 0; i < listeners.length; i++) { 230 this.listener.register(listeners[i]); 231 } 232 } 233 234 /** 235 * Register a single listener for the {@link JobExecutionListener} 236 * callbacks. 237 * 238 * @param listener 239 * a {@link JobExecutionListener} 240 */ 241 public void registerJobExecutionListener(JobExecutionListener listener) { 242 this.listener.register(listener); 243 } 244 245 /** 246 * Public setter for the {@link JobRepository} that is needed to manage the 247 * state of the batch meta domain (jobs, steps, executions) during the life 248 * of a job. 249 * 250 * @param jobRepository repository to use during the job execution 251 */ 252 public void setJobRepository(JobRepository jobRepository) { 253 this.jobRepository = jobRepository; 254 stepHandler = new SimpleStepHandler(jobRepository); 255 } 256 257 /** 258 * Convenience method for subclasses to access the job repository. 259 * 260 * @return the jobRepository 261 */ 262 protected JobRepository getJobRepository() { 263 return jobRepository; 264 } 265 266 /** 267 * Extension point for subclasses allowing them to concentrate on processing 268 * logic and ignore listeners and repository calls. Implementations usually 269 * are concerned with the ordering of steps, and delegate actual step 270 * processing to {@link #handleStep(Step, JobExecution)}. 271 * 272 * @param execution 273 * the current {@link JobExecution} 274 * 275 * @throws JobExecutionException 276 * to signal a fatal batch framework error (not a business or 277 * validation exception) 278 */ 279 abstract protected void doExecute(JobExecution execution) 280 throws JobExecutionException; 281 282 /** 283 * Run the specified job, handling all listener and repository calls, and 284 * delegating the actual processing to {@link #doExecute(JobExecution)}. 285 * 286 * @see Job#execute(JobExecution) 287 * @throws StartLimitExceededException 288 * if start limit of one of the steps was exceeded 289 */ 290 @Override 291 public final void execute(JobExecution execution) { 292 293 Assert.notNull(execution, "jobExecution must not be null"); 294 295 if (logger.isDebugEnabled()) { 296 logger.debug("Job execution starting: " + execution); 297 } 298 299 JobSynchronizationManager.register(execution); 300 301 try { 302 303 jobParametersValidator.validate(execution.getJobParameters()); 304 305 if (execution.getStatus() != BatchStatus.STOPPING) { 306 307 execution.setStartTime(new Date()); 308 updateStatus(execution, BatchStatus.STARTED); 309 310 listener.beforeJob(execution); 311 312 try { 313 doExecute(execution); 314 if (logger.isDebugEnabled()) { 315 logger.debug("Job execution complete: " + execution); 316 } 317 } catch (RepeatException e) { 318 throw e.getCause(); 319 } 320 } else { 321 322 // The job was already stopped before we even got this far. Deal 323 // with it in the same way as any other interruption. 324 execution.setStatus(BatchStatus.STOPPED); 325 execution.setExitStatus(ExitStatus.COMPLETED); 326 if (logger.isDebugEnabled()) { 327 logger.debug("Job execution was stopped: " + execution); 328 } 329 330 } 331 332 } catch (JobInterruptedException e) { 333 logger.info("Encountered interruption executing job: " 334 + e.getMessage()); 335 if (logger.isDebugEnabled()) { 336 logger.debug("Full exception", e); 337 } 338 execution.setExitStatus(getDefaultExitStatusForFailure(e, execution)); 339 execution.setStatus(BatchStatus.max(BatchStatus.STOPPED, e.getStatus())); 340 execution.addFailureException(e); 341 } catch (Throwable t) { 342 logger.error("Encountered fatal error executing job", t); 343 execution.setExitStatus(getDefaultExitStatusForFailure(t, execution)); 344 execution.setStatus(BatchStatus.FAILED); 345 execution.addFailureException(t); 346 } finally { 347 try { 348 if (execution.getStatus().isLessThanOrEqualTo(BatchStatus.STOPPED) 349 && execution.getStepExecutions().isEmpty()) { 350 ExitStatus exitStatus = execution.getExitStatus(); 351 ExitStatus newExitStatus = 352 ExitStatus.NOOP.addExitDescription("All steps already completed or no steps configured for this job."); 353 execution.setExitStatus(exitStatus.and(newExitStatus)); 354 } 355 356 execution.setEndTime(new Date()); 357 358 try { 359 listener.afterJob(execution); 360 } catch (Exception e) { 361 logger.error("Exception encountered in afterJob callback", e); 362 } 363 364 jobRepository.update(execution); 365 } finally { 366 JobSynchronizationManager.release(); 367 } 368 369 } 370 371 } 372 373 /** 374 * Convenience method for subclasses to delegate the handling of a specific 375 * step in the context of the current {@link JobExecution}. Clients of this 376 * method do not need access to the {@link JobRepository}, nor do they need 377 * to worry about populating the execution context on a restart, nor 378 * detecting the interrupted state (in job or step execution). 379 * 380 * @param step 381 * the {@link Step} to execute 382 * @param execution 383 * the current {@link JobExecution} 384 * @return the {@link StepExecution} corresponding to this step 385 * 386 * @throws JobInterruptedException 387 * if the {@link JobExecution} has been interrupted, and in 388 * particular if {@link BatchStatus#ABANDONED} or 389 * {@link BatchStatus#STOPPING} is detected 390 * @throws StartLimitExceededException 391 * if the start limit has been exceeded for this step 392 * @throws JobRestartException 393 * if the job is in an inconsistent state from an earlier 394 * failure 395 */ 396 protected final StepExecution handleStep(Step step, JobExecution execution) 397 throws JobInterruptedException, JobRestartException, 398 StartLimitExceededException { 399 return stepHandler.handleStep(step, execution); 400 401 } 402 403 /** 404 * Default mapping from throwable to {@link ExitStatus}. 405 * 406 * @param ex the cause of the failure 407 * @param execution the {@link JobExecution} instance. 408 * @return an {@link ExitStatus} 409 */ 410 protected ExitStatus getDefaultExitStatusForFailure(Throwable ex, JobExecution execution) { 411 ExitStatus exitStatus; 412 if (ex instanceof JobInterruptedException 413 || ex.getCause() instanceof JobInterruptedException) { 414 exitStatus = ExitStatus.STOPPED 415 .addExitDescription(JobInterruptedException.class.getName()); 416 } else if (ex instanceof NoSuchJobException 417 || ex.getCause() instanceof NoSuchJobException) { 418 exitStatus = new ExitStatus(ExitCodeMapper.NO_SUCH_JOB, ex 419 .getClass().getName()); 420 } else { 421 exitStatus = ExitStatus.FAILED.addExitDescription(ex); 422 } 423 424 return exitStatus; 425 } 426 427 private void updateStatus(JobExecution jobExecution, BatchStatus status) { 428 jobExecution.setStatus(status); 429 jobRepository.update(jobExecution); 430 } 431 432 @Override 433 public String toString() { 434 return ClassUtils.getShortName(getClass()) + ": [name=" + name + "]"; 435 } 436 437}