001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.core.repository.dao; 018 019import java.sql.ResultSet; 020import java.sql.SQLException; 021import java.sql.Timestamp; 022import java.sql.Types; 023import java.util.HashMap; 024import java.util.HashSet; 025import java.util.List; 026import java.util.Map; 027import java.util.Map.Entry; 028import java.util.Set; 029 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032 033import org.springframework.batch.core.BatchStatus; 034import org.springframework.batch.core.ExitStatus; 035import org.springframework.batch.core.JobExecution; 036import org.springframework.batch.core.JobInstance; 037import org.springframework.batch.core.JobParameter; 038import org.springframework.batch.core.JobParameter.ParameterType; 039import org.springframework.batch.core.JobParameters; 040import org.springframework.beans.factory.InitializingBean; 041import org.springframework.dao.EmptyResultDataAccessException; 042import org.springframework.dao.OptimisticLockingFailureException; 043import org.springframework.jdbc.core.RowCallbackHandler; 044import org.springframework.jdbc.core.RowMapper; 045import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; 046import org.springframework.lang.Nullable; 047import org.springframework.util.Assert; 048 049/** 050 * JDBC implementation of {@link JobExecutionDao}. Uses sequences (via Spring's 051 * {@link DataFieldMaxValueIncrementer} abstraction) to create all primary keys 052 * before inserting a new row. Objects are checked to ensure all mandatory 053 * fields to be stored are not null. If any are found to be null, an 054 * IllegalArgumentException will be thrown. This could be left to JdbcTemplate, 055 * however, the exception will be fairly vague, and fails to highlight which 056 * field caused the exception. 057 * 058 * @author Lucas Ward 059 * @author Dave Syer 060 * @author Robert Kasanicky 061 * @author Michael Minella 062 * @author Mahmoud Ben Hassine 063 * @author Dimitrios Liapis 064 */ 065public class JdbcJobExecutionDao extends AbstractJdbcBatchMetadataDao implements JobExecutionDao, InitializingBean { 066 067 private static final Log logger = LogFactory.getLog(JdbcJobExecutionDao.class); 068 069 private static final String SAVE_JOB_EXECUTION = "INSERT into %PREFIX%JOB_EXECUTION(JOB_EXECUTION_ID, JOB_INSTANCE_ID, START_TIME, " 070 + "END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, VERSION, CREATE_TIME, LAST_UPDATED, JOB_CONFIGURATION_LOCATION) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; 071 072 private static final String CHECK_JOB_EXECUTION_EXISTS = "SELECT COUNT(*) FROM %PREFIX%JOB_EXECUTION WHERE JOB_EXECUTION_ID = ?"; 073 074 private static final String GET_STATUS = "SELECT STATUS from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?"; 075 076 private static final String UPDATE_JOB_EXECUTION = "UPDATE %PREFIX%JOB_EXECUTION set START_TIME = ?, END_TIME = ?, " 077 + " STATUS = ?, EXIT_CODE = ?, EXIT_MESSAGE = ?, VERSION = ?, CREATE_TIME = ?, LAST_UPDATED = ? where JOB_EXECUTION_ID = ? and VERSION = ?"; 078 079 private static final String FIND_JOB_EXECUTIONS = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION, JOB_CONFIGURATION_LOCATION" 080 + " from %PREFIX%JOB_EXECUTION where JOB_INSTANCE_ID = ? order by JOB_EXECUTION_ID desc"; 081 082 private static final String GET_LAST_EXECUTION = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION, JOB_CONFIGURATION_LOCATION " 083 + "from %PREFIX%JOB_EXECUTION E where JOB_INSTANCE_ID = ? and JOB_EXECUTION_ID in (SELECT max(JOB_EXECUTION_ID) from %PREFIX%JOB_EXECUTION E2 where E2.JOB_INSTANCE_ID = ?)"; 084 085 private static final String GET_EXECUTION_BY_ID = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION, JOB_CONFIGURATION_LOCATION" 086 + " from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?"; 087 088 private static final String GET_RUNNING_EXECUTIONS = "SELECT E.JOB_EXECUTION_ID, E.START_TIME, E.END_TIME, E.STATUS, E.EXIT_CODE, E.EXIT_MESSAGE, E.CREATE_TIME, E.LAST_UPDATED, E.VERSION, " 089 + "E.JOB_INSTANCE_ID, E.JOB_CONFIGURATION_LOCATION from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and I.JOB_NAME=? and E.START_TIME is not NULL and E.END_TIME is NULL order by E.JOB_EXECUTION_ID desc"; 090 091 private static final String CURRENT_VERSION_JOB_EXECUTION = "SELECT VERSION FROM %PREFIX%JOB_EXECUTION WHERE JOB_EXECUTION_ID=?"; 092 093 private static final String FIND_PARAMS_FROM_ID = "SELECT JOB_EXECUTION_ID, KEY_NAME, TYPE_CD, " 094 + "STRING_VAL, DATE_VAL, LONG_VAL, DOUBLE_VAL, IDENTIFYING from %PREFIX%JOB_EXECUTION_PARAMS where JOB_EXECUTION_ID = ?"; 095 096 private static final String CREATE_JOB_PARAMETERS = "INSERT into %PREFIX%JOB_EXECUTION_PARAMS(JOB_EXECUTION_ID, KEY_NAME, TYPE_CD, " 097 + "STRING_VAL, DATE_VAL, LONG_VAL, DOUBLE_VAL, IDENTIFYING) values (?, ?, ?, ?, ?, ?, ?, ?)"; 098 099 private int exitMessageLength = DEFAULT_EXIT_MESSAGE_LENGTH; 100 101 private DataFieldMaxValueIncrementer jobExecutionIncrementer; 102 103 /** 104 * Public setter for the exit message length in database. Do not set this if 105 * you haven't modified the schema. 106 * @param exitMessageLength the exitMessageLength to set 107 */ 108 public void setExitMessageLength(int exitMessageLength) { 109 this.exitMessageLength = exitMessageLength; 110 } 111 112 /** 113 * Setter for {@link DataFieldMaxValueIncrementer} to be used when 114 * generating primary keys for {@link JobExecution} instances. 115 * 116 * @param jobExecutionIncrementer the {@link DataFieldMaxValueIncrementer} 117 */ 118 public void setJobExecutionIncrementer(DataFieldMaxValueIncrementer jobExecutionIncrementer) { 119 this.jobExecutionIncrementer = jobExecutionIncrementer; 120 } 121 122 @Override 123 public void afterPropertiesSet() throws Exception { 124 super.afterPropertiesSet(); 125 Assert.notNull(jobExecutionIncrementer, "The jobExecutionIncrementer must not be null."); 126 } 127 128 @Override 129 public List<JobExecution> findJobExecutions(final JobInstance job) { 130 131 Assert.notNull(job, "Job cannot be null."); 132 Assert.notNull(job.getId(), "Job Id cannot be null."); 133 134 return getJdbcTemplate().query(getQuery(FIND_JOB_EXECUTIONS), new JobExecutionRowMapper(job), job.getId()); 135 } 136 137 /** 138 * 139 * SQL implementation using Sequences via the Spring incrementer 140 * abstraction. Once a new id has been obtained, the JobExecution is saved 141 * via a SQL INSERT statement. 142 * 143 * @see JobExecutionDao#saveJobExecution(JobExecution) 144 * @throws IllegalArgumentException if jobExecution is null, as well as any 145 * of it's fields to be persisted. 146 */ 147 @Override 148 public void saveJobExecution(JobExecution jobExecution) { 149 150 validateJobExecution(jobExecution); 151 152 jobExecution.incrementVersion(); 153 154 jobExecution.setId(jobExecutionIncrementer.nextLongValue()); 155 Object[] parameters = new Object[] { jobExecution.getId(), jobExecution.getJobId(), 156 jobExecution.getStartTime(), jobExecution.getEndTime(), jobExecution.getStatus().toString(), 157 jobExecution.getExitStatus().getExitCode(), jobExecution.getExitStatus().getExitDescription(), 158 jobExecution.getVersion(), jobExecution.getCreateTime(), jobExecution.getLastUpdated(), 159 jobExecution.getJobConfigurationName() }; 160 getJdbcTemplate().update( 161 getQuery(SAVE_JOB_EXECUTION), 162 parameters, 163 new int[] { Types.BIGINT, Types.BIGINT, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, 164 Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR }); 165 166 insertJobParameters(jobExecution.getId(), jobExecution.getJobParameters()); 167 } 168 169 /** 170 * Validate JobExecution. At a minimum, JobId, StartTime, EndTime, and 171 * Status cannot be null. 172 * 173 * @param jobExecution 174 * @throws IllegalArgumentException 175 */ 176 private void validateJobExecution(JobExecution jobExecution) { 177 178 Assert.notNull(jobExecution, "jobExecution cannot be null"); 179 Assert.notNull(jobExecution.getJobId(), "JobExecution Job-Id cannot be null."); 180 Assert.notNull(jobExecution.getStatus(), "JobExecution status cannot be null."); 181 Assert.notNull(jobExecution.getCreateTime(), "JobExecution create time cannot be null"); 182 } 183 184 /** 185 * Update given JobExecution using a SQL UPDATE statement. The JobExecution 186 * is first checked to ensure all fields are not null, and that it has an 187 * ID. The database is then queried to ensure that the ID exists, which 188 * ensures that it is valid. 189 * 190 * @see JobExecutionDao#updateJobExecution(JobExecution) 191 */ 192 @Override 193 public void updateJobExecution(JobExecution jobExecution) { 194 195 validateJobExecution(jobExecution); 196 197 Assert.notNull(jobExecution.getId(), 198 "JobExecution ID cannot be null. JobExecution must be saved before it can be updated"); 199 200 Assert.notNull(jobExecution.getVersion(), 201 "JobExecution version cannot be null. JobExecution must be saved before it can be updated"); 202 203 synchronized (jobExecution) { 204 Integer version = jobExecution.getVersion() + 1; 205 206 String exitDescription = jobExecution.getExitStatus().getExitDescription(); 207 if (exitDescription != null && exitDescription.length() > exitMessageLength) { 208 exitDescription = exitDescription.substring(0, exitMessageLength); 209 if (logger.isDebugEnabled()) { 210 logger.debug("Truncating long message before update of JobExecution: " + jobExecution); 211 } 212 } 213 Object[] parameters = new Object[] { jobExecution.getStartTime(), jobExecution.getEndTime(), 214 jobExecution.getStatus().toString(), jobExecution.getExitStatus().getExitCode(), exitDescription, 215 version, jobExecution.getCreateTime(), jobExecution.getLastUpdated(), jobExecution.getId(), 216 jobExecution.getVersion() }; 217 218 // Check if given JobExecution's Id already exists, if none is found 219 // it 220 // is invalid and 221 // an exception should be thrown. 222 if (getJdbcTemplate().queryForObject(getQuery(CHECK_JOB_EXECUTION_EXISTS), Integer.class, 223 new Object[] { jobExecution.getId() }) != 1) { 224 throw new NoSuchObjectException("Invalid JobExecution, ID " + jobExecution.getId() + " not found."); 225 } 226 227 int count = getJdbcTemplate().update( 228 getQuery(UPDATE_JOB_EXECUTION), 229 parameters, 230 new int[] { Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, 231 Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP, Types.BIGINT, Types.INTEGER }); 232 233 // Avoid concurrent modifications... 234 if (count == 0) { 235 int currentVersion = getJdbcTemplate().queryForObject(getQuery(CURRENT_VERSION_JOB_EXECUTION), Integer.class, 236 new Object[] { jobExecution.getId() }); 237 throw new OptimisticLockingFailureException("Attempt to update job execution id=" 238 + jobExecution.getId() + " with wrong version (" + jobExecution.getVersion() 239 + "), where current version is " + currentVersion); 240 } 241 242 jobExecution.incrementVersion(); 243 } 244 } 245 246 @Override 247 public JobExecution getLastJobExecution(JobInstance jobInstance) { 248 249 Long id = jobInstance.getId(); 250 251 List<JobExecution> executions = getJdbcTemplate().query(getQuery(GET_LAST_EXECUTION), 252 new JobExecutionRowMapper(jobInstance), id, id); 253 254 Assert.state(executions.size() <= 1, "There must be at most one latest job execution"); 255 256 if (executions.isEmpty()) { 257 return null; 258 } 259 else { 260 return executions.get(0); 261 } 262 } 263 264 /* 265 * (non-Javadoc) 266 * 267 * @seeorg.springframework.batch.core.repository.dao.JobExecutionDao# 268 * getLastJobExecution(java.lang.String) 269 */ 270 @Override 271 @Nullable 272 public JobExecution getJobExecution(Long executionId) { 273 try { 274 JobExecution jobExecution = getJdbcTemplate().queryForObject(getQuery(GET_EXECUTION_BY_ID), 275 new JobExecutionRowMapper(), executionId); 276 return jobExecution; 277 } 278 catch (EmptyResultDataAccessException e) { 279 return null; 280 } 281 } 282 283 /* 284 * (non-Javadoc) 285 * 286 * @seeorg.springframework.batch.core.repository.dao.JobExecutionDao# 287 * findRunningJobExecutions(java.lang.String) 288 */ 289 @Override 290 public Set<JobExecution> findRunningJobExecutions(String jobName) { 291 292 final Set<JobExecution> result = new HashSet<JobExecution>(); 293 RowCallbackHandler handler = new RowCallbackHandler() { 294 @Override 295 public void processRow(ResultSet rs) throws SQLException { 296 JobExecutionRowMapper mapper = new JobExecutionRowMapper(); 297 result.add(mapper.mapRow(rs, 0)); 298 } 299 }; 300 getJdbcTemplate().query(getQuery(GET_RUNNING_EXECUTIONS), new Object[] { jobName }, handler); 301 302 return result; 303 } 304 305 @Override 306 public void synchronizeStatus(JobExecution jobExecution) { 307 int currentVersion = getJdbcTemplate().queryForObject(getQuery(CURRENT_VERSION_JOB_EXECUTION), Integer.class, 308 jobExecution.getId()); 309 310 if (currentVersion != jobExecution.getVersion().intValue()) { 311 String status = getJdbcTemplate().queryForObject(getQuery(GET_STATUS), String.class, jobExecution.getId()); 312 jobExecution.upgradeStatus(BatchStatus.valueOf(status)); 313 jobExecution.setVersion(currentVersion); 314 } 315 } 316 317 /** 318 * Convenience method that inserts all parameters from the provided 319 * JobParameters. 320 * 321 */ 322 private void insertJobParameters(Long executionId, JobParameters jobParameters) { 323 324 for (Entry<String, JobParameter> entry : jobParameters.getParameters() 325 .entrySet()) { 326 JobParameter jobParameter = entry.getValue(); 327 insertParameter(executionId, jobParameter.getType(), entry.getKey(), 328 jobParameter.getValue(), jobParameter.isIdentifying()); 329 } 330 } 331 332 /** 333 * Convenience method that inserts an individual records into the 334 * JobParameters table. 335 */ 336 private void insertParameter(Long executionId, ParameterType type, String key, 337 Object value, boolean identifying) { 338 339 Object[] args = new Object[0]; 340 int[] argTypes = new int[] { Types.BIGINT, Types.VARCHAR, 341 Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.BIGINT, 342 Types.DOUBLE, Types.CHAR }; 343 344 String identifyingFlag = identifying? "Y":"N"; 345 346 if (type == ParameterType.STRING) { 347 args = new Object[] { executionId, key, type, value, new Timestamp(0L), 348 0L, 0D, identifyingFlag}; 349 } else if (type == ParameterType.LONG) { 350 args = new Object[] { executionId, key, type, "", new Timestamp(0L), 351 value, new Double(0), identifyingFlag}; 352 } else if (type == ParameterType.DOUBLE) { 353 args = new Object[] { executionId, key, type, "", new Timestamp(0L), 0L, 354 value, identifyingFlag}; 355 } else if (type == ParameterType.DATE) { 356 args = new Object[] { executionId, key, type, "", value, 0L, 0D, identifyingFlag}; 357 } 358 359 getJdbcTemplate().update(getQuery(CREATE_JOB_PARAMETERS), args, argTypes); 360 } 361 362 /** 363 * @param executionId {@link Long} containing the id for the execution. 364 * @return job parameters for the requested execution id 365 */ 366 protected JobParameters getJobParameters(Long executionId) { 367 final Map<String, JobParameter> map = new HashMap<String, JobParameter>(); 368 RowCallbackHandler handler = new RowCallbackHandler() { 369 @Override 370 public void processRow(ResultSet rs) throws SQLException { 371 ParameterType type = ParameterType.valueOf(rs.getString(3)); 372 JobParameter value = null; 373 374 if (type == ParameterType.STRING) { 375 value = new JobParameter(rs.getString(4), rs.getString(8).equalsIgnoreCase("Y")); 376 } else if (type == ParameterType.LONG) { 377 value = new JobParameter(rs.getLong(6), rs.getString(8).equalsIgnoreCase("Y")); 378 } else if (type == ParameterType.DOUBLE) { 379 value = new JobParameter(rs.getDouble(7), rs.getString(8).equalsIgnoreCase("Y")); 380 } else if (type == ParameterType.DATE) { 381 value = new JobParameter(rs.getTimestamp(5), rs.getString(8).equalsIgnoreCase("Y")); 382 } 383 384 // No need to assert that value is not null because it's an enum 385 map.put(rs.getString(2), value); 386 } 387 }; 388 389 getJdbcTemplate().query(getQuery(FIND_PARAMS_FROM_ID), new Object[] { executionId }, handler); 390 391 return new JobParameters(map); 392 } 393 394 /** 395 * Re-usable mapper for {@link JobExecution} instances. 396 * 397 * @author Dave Syer 398 * 399 */ 400 private final class JobExecutionRowMapper implements RowMapper<JobExecution> { 401 402 private JobInstance jobInstance; 403 404 private JobParameters jobParameters; 405 406 public JobExecutionRowMapper() { 407 } 408 409 public JobExecutionRowMapper(JobInstance jobInstance) { 410 this.jobInstance = jobInstance; 411 } 412 413 @Override 414 public JobExecution mapRow(ResultSet rs, int rowNum) throws SQLException { 415 Long id = rs.getLong(1); 416 String jobConfigurationLocation = rs.getString(10); 417 JobExecution jobExecution; 418 if (jobParameters == null) { 419 jobParameters = getJobParameters(id); 420 } 421 422 if (jobInstance == null) { 423 jobExecution = new JobExecution(id, jobParameters, jobConfigurationLocation); 424 } 425 else { 426 jobExecution = new JobExecution(jobInstance, id, jobParameters, jobConfigurationLocation); 427 } 428 429 jobExecution.setStartTime(rs.getTimestamp(2)); 430 jobExecution.setEndTime(rs.getTimestamp(3)); 431 jobExecution.setStatus(BatchStatus.valueOf(rs.getString(4))); 432 jobExecution.setExitStatus(new ExitStatus(rs.getString(5), rs.getString(6))); 433 jobExecution.setCreateTime(rs.getTimestamp(7)); 434 jobExecution.setLastUpdated(rs.getTimestamp(8)); 435 jobExecution.setVersion(rs.getInt(9)); 436 return jobExecution; 437 } 438 439 } 440}