001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.core.repository.support; 018 019import org.apache.commons.logging.Log; 020import org.apache.commons.logging.LogFactory; 021import org.springframework.batch.core.BatchStatus; 022import org.springframework.batch.core.JobExecution; 023import org.springframework.batch.core.JobInstance; 024import org.springframework.batch.core.JobParameters; 025import org.springframework.batch.core.StepExecution; 026import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; 027import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; 028import org.springframework.batch.core.repository.JobRepository; 029import org.springframework.batch.core.repository.JobRestartException; 030import org.springframework.batch.core.repository.dao.ExecutionContextDao; 031import org.springframework.batch.core.repository.dao.JobExecutionDao; 032import org.springframework.batch.core.repository.dao.JobInstanceDao; 033import org.springframework.batch.core.repository.dao.StepExecutionDao; 034import org.springframework.batch.item.ExecutionContext; 035import org.springframework.lang.Nullable; 036import org.springframework.util.Assert; 037 038import java.util.ArrayList; 039import java.util.Collection; 040import java.util.Date; 041import java.util.List; 042 043/** 044 * 045 * <p> 046 * Implementation of {@link JobRepository} that stores JobInstances, 047 * JobExecutions, and StepExecutions using the injected DAOs. 048 * <p> 049 * 050 * @author Lucas Ward 051 * @author Dave Syer 052 * @author Robert Kasanicky 053 * @author David Turanski 054 * @author Mahmoud Ben Hassine 055 * 056 * @see JobRepository 057 * @see JobInstanceDao 058 * @see JobExecutionDao 059 * @see StepExecutionDao 060 * 061 */ 062public class SimpleJobRepository implements JobRepository { 063 064 private static final Log logger = LogFactory.getLog(SimpleJobRepository.class); 065 066 private JobInstanceDao jobInstanceDao; 067 068 private JobExecutionDao jobExecutionDao; 069 070 private StepExecutionDao stepExecutionDao; 071 072 private ExecutionContextDao ecDao; 073 074 /** 075 * Provide default constructor with low visibility in case user wants to use 076 * use aop:proxy-target-class="true" for AOP interceptor. 077 */ 078 SimpleJobRepository() { 079 } 080 081 public SimpleJobRepository(JobInstanceDao jobInstanceDao, JobExecutionDao jobExecutionDao, 082 StepExecutionDao stepExecutionDao, ExecutionContextDao ecDao) { 083 super(); 084 this.jobInstanceDao = jobInstanceDao; 085 this.jobExecutionDao = jobExecutionDao; 086 this.stepExecutionDao = stepExecutionDao; 087 this.ecDao = ecDao; 088 } 089 090 @Override 091 public boolean isJobInstanceExists(String jobName, JobParameters jobParameters) { 092 return jobInstanceDao.getJobInstance(jobName, jobParameters) != null; 093 } 094 095 @Override 096 public JobExecution createJobExecution(String jobName, JobParameters jobParameters) 097 throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException { 098 099 Assert.notNull(jobName, "Job name must not be null."); 100 Assert.notNull(jobParameters, "JobParameters must not be null."); 101 102 /* 103 * Find all jobs matching the runtime information. 104 * 105 * If this method is transactional, and the isolation level is 106 * REPEATABLE_READ or better, another launcher trying to start the same 107 * job in another thread or process will block until this transaction 108 * has finished. 109 */ 110 111 JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters); 112 ExecutionContext executionContext; 113 114 // existing job instance found 115 if (jobInstance != null) { 116 117 List<JobExecution> executions = jobExecutionDao.findJobExecutions(jobInstance); 118 119 // check for running executions and find the last started 120 for (JobExecution execution : executions) { 121 if (execution.isRunning() || execution.isStopping()) { 122 throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: " 123 + jobInstance); 124 } 125 BatchStatus status = execution.getStatus(); 126 if (status == BatchStatus.UNKNOWN) { 127 throw new JobRestartException("Cannot restart job from UNKNOWN status. " 128 + "The last execution ended with a failure that could not be rolled back, " 129 + "so it may be dangerous to proceed. Manual intervention is probably necessary."); 130 } 131 if (execution.getJobParameters().getParameters().size() > 0 && (status == BatchStatus.COMPLETED || status == BatchStatus.ABANDONED)) { 132 throw new JobInstanceAlreadyCompleteException( 133 "A job instance already exists and is complete for parameters=" + jobParameters 134 + ". If you want to run this job again, change the parameters."); 135 } 136 } 137 executionContext = ecDao.getExecutionContext(jobExecutionDao.getLastJobExecution(jobInstance)); 138 } 139 else { 140 // no job found, create one 141 jobInstance = jobInstanceDao.createJobInstance(jobName, jobParameters); 142 executionContext = new ExecutionContext(); 143 } 144 145 JobExecution jobExecution = new JobExecution(jobInstance, jobParameters, null); 146 jobExecution.setExecutionContext(executionContext); 147 jobExecution.setLastUpdated(new Date(System.currentTimeMillis())); 148 149 // Save the JobExecution so that it picks up an ID (useful for clients 150 // monitoring asynchronous executions): 151 jobExecutionDao.saveJobExecution(jobExecution); 152 ecDao.saveExecutionContext(jobExecution); 153 154 return jobExecution; 155 156 } 157 158 @Override 159 public void update(JobExecution jobExecution) { 160 161 Assert.notNull(jobExecution, "JobExecution cannot be null."); 162 Assert.notNull(jobExecution.getJobId(), "JobExecution must have a Job ID set."); 163 Assert.notNull(jobExecution.getId(), "JobExecution must be already saved (have an id assigned)."); 164 165 jobExecution.setLastUpdated(new Date(System.currentTimeMillis())); 166 167 jobExecutionDao.synchronizeStatus(jobExecution); 168 jobExecutionDao.updateJobExecution(jobExecution); 169 } 170 171 @Override 172 public void add(StepExecution stepExecution) { 173 validateStepExecution(stepExecution); 174 175 stepExecution.setLastUpdated(new Date(System.currentTimeMillis())); 176 stepExecutionDao.saveStepExecution(stepExecution); 177 ecDao.saveExecutionContext(stepExecution); 178 } 179 180 @Override 181 public void addAll(Collection<StepExecution> stepExecutions) { 182 Assert.notNull(stepExecutions, "Attempt to save a null collection of step executions"); 183 for (StepExecution stepExecution : stepExecutions) { 184 validateStepExecution(stepExecution); 185 stepExecution.setLastUpdated(new Date(System.currentTimeMillis())); 186 } 187 stepExecutionDao.saveStepExecutions(stepExecutions); 188 ecDao.saveExecutionContexts(stepExecutions); 189 } 190 191 @Override 192 public void update(StepExecution stepExecution) { 193 validateStepExecution(stepExecution); 194 Assert.notNull(stepExecution.getId(), "StepExecution must already be saved (have an id assigned)"); 195 196 stepExecution.setLastUpdated(new Date(System.currentTimeMillis())); 197 stepExecutionDao.updateStepExecution(stepExecution); 198 checkForInterruption(stepExecution); 199 } 200 201 private void validateStepExecution(StepExecution stepExecution) { 202 Assert.notNull(stepExecution, "StepExecution cannot be null."); 203 Assert.notNull(stepExecution.getStepName(), "StepExecution's step name cannot be null."); 204 Assert.notNull(stepExecution.getJobExecutionId(), "StepExecution must belong to persisted JobExecution"); 205 } 206 207 @Override 208 public void updateExecutionContext(StepExecution stepExecution) { 209 validateStepExecution(stepExecution); 210 Assert.notNull(stepExecution.getId(), "StepExecution must already be saved (have an id assigned)"); 211 ecDao.updateExecutionContext(stepExecution); 212 } 213 214 @Override 215 public void updateExecutionContext(JobExecution jobExecution) { 216 ecDao.updateExecutionContext(jobExecution); 217 } 218 219 @Override 220 @Nullable 221 public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) { 222 List<JobExecution> jobExecutions = jobExecutionDao.findJobExecutions(jobInstance); 223 List<StepExecution> stepExecutions = new ArrayList<StepExecution>(jobExecutions.size()); 224 225 for (JobExecution jobExecution : jobExecutions) { 226 stepExecutionDao.addStepExecutions(jobExecution); 227 for (StepExecution stepExecution : jobExecution.getStepExecutions()) { 228 if (stepName.equals(stepExecution.getStepName())) { 229 stepExecutions.add(stepExecution); 230 } 231 } 232 } 233 234 StepExecution latest = null; 235 for (StepExecution stepExecution : stepExecutions) { 236 if (latest == null) { 237 latest = stepExecution; 238 } 239 if (latest.getStartTime().getTime() < stepExecution.getStartTime().getTime()) { 240 latest = stepExecution; 241 } 242 // Use step execution ID as the tie breaker if start time is identical 243 if (latest.getStartTime().getTime() == stepExecution.getStartTime().getTime() && 244 latest.getId() < stepExecution.getId()) { 245 latest = stepExecution; 246 } 247 } 248 249 if (latest != null) { 250 ExecutionContext stepExecutionContext = ecDao.getExecutionContext(latest); 251 latest.setExecutionContext(stepExecutionContext); 252 ExecutionContext jobExecutionContext = ecDao.getExecutionContext(latest.getJobExecution()); 253 latest.getJobExecution().setExecutionContext(jobExecutionContext); 254 } 255 256 return latest; 257 } 258 259 /** 260 * @return number of executions of the step within given job instance 261 */ 262 @Override 263 public int getStepExecutionCount(JobInstance jobInstance, String stepName) { 264 int count = 0; 265 List<JobExecution> jobExecutions = jobExecutionDao.findJobExecutions(jobInstance); 266 for (JobExecution jobExecution : jobExecutions) { 267 stepExecutionDao.addStepExecutions(jobExecution); 268 for (StepExecution stepExecution : jobExecution.getStepExecutions()) { 269 if (stepName.equals(stepExecution.getStepName())) { 270 count++; 271 } 272 } 273 } 274 return count; 275 } 276 277 /** 278 * Check to determine whether or not the JobExecution that is the parent of 279 * the provided StepExecution has been interrupted. If, after synchronizing 280 * the status with the database, the status has been updated to STOPPING, 281 * then the job has been interrupted. 282 * 283 * @param stepExecution 284 */ 285 private void checkForInterruption(StepExecution stepExecution) { 286 JobExecution jobExecution = stepExecution.getJobExecution(); 287 jobExecutionDao.synchronizeStatus(jobExecution); 288 if (jobExecution.isStopping()) { 289 logger.info("Parent JobExecution is stopped, so passing message on to StepExecution"); 290 stepExecution.setTerminateOnly(); 291 } 292 } 293 294 @Override 295 @Nullable 296 public JobExecution getLastJobExecution(String jobName, JobParameters jobParameters) { 297 JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters); 298 if (jobInstance == null) { 299 return null; 300 } 301 JobExecution jobExecution = jobExecutionDao.getLastJobExecution(jobInstance); 302 303 if (jobExecution != null) { 304 jobExecution.setExecutionContext(ecDao.getExecutionContext(jobExecution)); 305 stepExecutionDao.addStepExecutions(jobExecution); 306 } 307 return jobExecution; 308 309 } 310 311 @Override 312 public JobInstance createJobInstance(String jobName, JobParameters jobParameters) { 313 Assert.notNull(jobName, "A job name is required to create a JobInstance"); 314 Assert.notNull(jobParameters, "Job parameters are required to create a JobInstance"); 315 316 JobInstance jobInstance = jobInstanceDao.createJobInstance(jobName, jobParameters); 317 318 return jobInstance; 319 } 320 321 @Override 322 public JobExecution createJobExecution(JobInstance jobInstance, 323 JobParameters jobParameters, String jobConfigurationLocation) { 324 325 Assert.notNull(jobInstance, "A JobInstance is required to associate the JobExecution with"); 326 Assert.notNull(jobParameters, "A JobParameters object is required to create a JobExecution"); 327 328 JobExecution jobExecution = new JobExecution(jobInstance, jobParameters, jobConfigurationLocation); 329 ExecutionContext executionContext = new ExecutionContext(); 330 jobExecution.setExecutionContext(executionContext); 331 jobExecution.setLastUpdated(new Date(System.currentTimeMillis())); 332 333 // Save the JobExecution so that it picks up an ID (useful for clients 334 // monitoring asynchronous executions): 335 jobExecutionDao.saveJobExecution(jobExecution); 336 ecDao.saveExecutionContext(jobExecution); 337 338 return jobExecution; 339 } 340}