001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.core.repository.dao; 018 019import java.sql.PreparedStatement; 020import java.sql.ResultSet; 021import java.sql.SQLException; 022import java.sql.Timestamp; 023import java.sql.Types; 024import java.util.ArrayList; 025import java.util.Arrays; 026import java.util.Collection; 027import java.util.Iterator; 028import java.util.List; 029 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032 033import org.springframework.batch.core.BatchStatus; 034import org.springframework.batch.core.ExitStatus; 035import org.springframework.batch.core.JobExecution; 036import org.springframework.batch.core.StepExecution; 037import org.springframework.beans.factory.InitializingBean; 038import org.springframework.dao.OptimisticLockingFailureException; 039import org.springframework.jdbc.core.BatchPreparedStatementSetter; 040import org.springframework.jdbc.core.RowMapper; 041import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; 042import org.springframework.lang.Nullable; 043import org.springframework.util.Assert; 044 045/** 046 * JDBC implementation of {@link StepExecutionDao}.<br> 047 * 048 * Allows customization of the tables names used by Spring Batch for step meta 049 * data via a prefix property.<br> 050 * 051 * Uses sequences or tables (via Spring's {@link DataFieldMaxValueIncrementer} 052 * abstraction) to create all primary keys before inserting a new row. All 053 * objects are checked to ensure all fields to be stored are not null. If any 054 * are found to be null, an IllegalArgumentException will be thrown. This could 055 * be left to JdbcTemplate, however, the exception will be fairly vague, and 056 * fails to highlight which field caused the exception.<br> 057 * 058 * @author Lucas Ward 059 * @author Dave Syer 060 * @author Robert Kasanicky 061 * @author David Turanski 062 * @author Mahmoud Ben Hassine 063 * 064 * @see StepExecutionDao 065 */ 066public class JdbcStepExecutionDao extends AbstractJdbcBatchMetadataDao implements StepExecutionDao, InitializingBean { 067 068 private static final Log logger = LogFactory.getLog(JdbcStepExecutionDao.class); 069 070 private static final String SAVE_STEP_EXECUTION = "INSERT into %PREFIX%STEP_EXECUTION(STEP_EXECUTION_ID, VERSION, STEP_NAME, JOB_EXECUTION_ID, START_TIME, " 071 + "END_TIME, STATUS, COMMIT_COUNT, READ_COUNT, FILTER_COUNT, WRITE_COUNT, EXIT_CODE, EXIT_MESSAGE, READ_SKIP_COUNT, WRITE_SKIP_COUNT, PROCESS_SKIP_COUNT, ROLLBACK_COUNT, LAST_UPDATED) " 072 + "values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; 073 074 private static final String UPDATE_STEP_EXECUTION = "UPDATE %PREFIX%STEP_EXECUTION set START_TIME = ?, END_TIME = ?, " 075 + "STATUS = ?, COMMIT_COUNT = ?, READ_COUNT = ?, FILTER_COUNT = ?, WRITE_COUNT = ?, EXIT_CODE = ?, " 076 + "EXIT_MESSAGE = ?, VERSION = ?, READ_SKIP_COUNT = ?, PROCESS_SKIP_COUNT = ?, WRITE_SKIP_COUNT = ?, ROLLBACK_COUNT = ?, LAST_UPDATED = ?" 077 + " where STEP_EXECUTION_ID = ? and VERSION = ?"; 078 079 private static final String GET_RAW_STEP_EXECUTIONS = "SELECT STEP_EXECUTION_ID, STEP_NAME, START_TIME, END_TIME, STATUS, COMMIT_COUNT," 080 + " READ_COUNT, FILTER_COUNT, WRITE_COUNT, EXIT_CODE, EXIT_MESSAGE, READ_SKIP_COUNT, WRITE_SKIP_COUNT, PROCESS_SKIP_COUNT, ROLLBACK_COUNT, LAST_UPDATED, VERSION from %PREFIX%STEP_EXECUTION where JOB_EXECUTION_ID = ?"; 081 082 private static final String GET_STEP_EXECUTIONS = GET_RAW_STEP_EXECUTIONS + " order by STEP_EXECUTION_ID"; 083 084 private static final String GET_STEP_EXECUTION = GET_RAW_STEP_EXECUTIONS + " and STEP_EXECUTION_ID = ?"; 085 086 private static final String CURRENT_VERSION_STEP_EXECUTION = "SELECT VERSION FROM %PREFIX%STEP_EXECUTION WHERE STEP_EXECUTION_ID=?"; 087 088 private int exitMessageLength = DEFAULT_EXIT_MESSAGE_LENGTH; 089 090 private DataFieldMaxValueIncrementer stepExecutionIncrementer; 091 092 /** 093 * Public setter for the exit message length in database. Do not set this if 094 * you haven't modified the schema. 095 * @param exitMessageLength the exitMessageLength to set 096 */ 097 public void setExitMessageLength(int exitMessageLength) { 098 this.exitMessageLength = exitMessageLength; 099 } 100 101 public void setStepExecutionIncrementer(DataFieldMaxValueIncrementer stepExecutionIncrementer) { 102 this.stepExecutionIncrementer = stepExecutionIncrementer; 103 } 104 105 @Override 106 public void afterPropertiesSet() throws Exception { 107 super.afterPropertiesSet(); 108 Assert.notNull(stepExecutionIncrementer, "StepExecutionIncrementer cannot be null."); 109 } 110 111 /** 112 * Save a StepExecution. A unique id will be generated by the 113 * stepExecutionIncrementer, and then set in the StepExecution. All values 114 * will then be stored via an INSERT statement. 115 * 116 * @see StepExecutionDao#saveStepExecution(StepExecution) 117 */ 118 @Override 119 public void saveStepExecution(StepExecution stepExecution) { 120 List<Object[]> parameters = buildStepExecutionParameters(stepExecution); 121 Object[] parameterValues = parameters.get(0); 122 123 //Template expects an int array fails with Integer 124 int[] parameterTypes = new int[parameters.get(1).length]; 125 for (int i = 0; i < parameterTypes.length; i++) { 126 parameterTypes[i] = (Integer)parameters.get(1)[i]; 127 } 128 129 getJdbcTemplate().update(getQuery(SAVE_STEP_EXECUTION), parameterValues, parameterTypes); 130 } 131 132 /** 133 * Batch insert StepExecutions 134 * @see StepExecutionDao#saveStepExecutions(Collection) 135 */ 136 @Override 137 public void saveStepExecutions(final Collection<StepExecution> stepExecutions) { 138 Assert.notNull(stepExecutions, "Attempt to save a null collection of step executions"); 139 140 if (!stepExecutions.isEmpty()) { 141 final Iterator<StepExecution> iterator = stepExecutions.iterator(); 142 getJdbcTemplate().batchUpdate(getQuery(SAVE_STEP_EXECUTION), new BatchPreparedStatementSetter() { 143 144 @Override 145 public int getBatchSize() { 146 return stepExecutions.size(); 147 } 148 149 @Override 150 public void setValues(PreparedStatement ps, int i) throws SQLException { 151 StepExecution stepExecution = iterator.next(); 152 List<Object[]> parameters = buildStepExecutionParameters(stepExecution); 153 Object[] parameterValues = parameters.get(0); 154 Integer[] parameterTypes = (Integer[]) parameters.get(1); 155 for (int indx = 0; indx < parameterValues.length; indx++) { 156 switch (parameterTypes[indx]) { 157 case Types.INTEGER: 158 ps.setInt(indx + 1, (Integer) parameterValues[indx]); 159 break; 160 case Types.VARCHAR: 161 ps.setString(indx + 1, (String) parameterValues[indx]); 162 break; 163 case Types.TIMESTAMP: 164 if (parameterValues[indx] != null) { 165 ps.setTimestamp(indx + 1, new Timestamp(((java.util.Date) parameterValues[indx]).getTime())); 166 } else { 167 ps.setNull(indx + 1, Types.TIMESTAMP); 168 } 169 break; 170 case Types.BIGINT: 171 ps.setLong(indx + 1, (Long) parameterValues[indx]); 172 break; 173 default: 174 throw new IllegalArgumentException( 175 "unsupported SQL parameter type for step execution field index " + i); 176 } 177 } 178 } 179 }); 180 } 181 } 182 183 private List<Object[]> buildStepExecutionParameters(StepExecution stepExecution) { 184 Assert.isNull(stepExecution.getId(), 185 "to-be-saved (not updated) StepExecution can't already have an id assigned"); 186 Assert.isNull(stepExecution.getVersion(), 187 "to-be-saved (not updated) StepExecution can't already have a version assigned"); 188 validateStepExecution(stepExecution); 189 stepExecution.setId(stepExecutionIncrementer.nextLongValue()); 190 stepExecution.incrementVersion(); //Should be 0 191 List<Object[]> parameters = new ArrayList<Object[]>(); 192 String exitDescription = truncateExitDescription(stepExecution.getExitStatus().getExitDescription()); 193 Object[] parameterValues = new Object[] { stepExecution.getId(), stepExecution.getVersion(), 194 stepExecution.getStepName(), stepExecution.getJobExecutionId(), stepExecution.getStartTime(), 195 stepExecution.getEndTime(), stepExecution.getStatus().toString(), stepExecution.getCommitCount(), 196 stepExecution.getReadCount(), stepExecution.getFilterCount(), stepExecution.getWriteCount(), 197 stepExecution.getExitStatus().getExitCode(), exitDescription, stepExecution.getReadSkipCount(), 198 stepExecution.getWriteSkipCount(), stepExecution.getProcessSkipCount(), 199 stepExecution.getRollbackCount(), stepExecution.getLastUpdated() }; 200 Integer[] parameterTypes = new Integer[] { Types.BIGINT, Types.INTEGER, Types.VARCHAR, Types.BIGINT, 201 Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.INTEGER, Types.INTEGER, Types.INTEGER, 202 Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.INTEGER, Types.INTEGER, 203 Types.INTEGER, Types.TIMESTAMP }; 204 parameters.add(0, Arrays.copyOf(parameterValues,parameterValues.length)); 205 parameters.add(1, Arrays.copyOf(parameterTypes,parameterTypes.length)); 206 return parameters; 207 } 208 209 /** 210 * Validate StepExecution. At a minimum, JobId, StartTime, and Status cannot 211 * be null. EndTime can be null for an unfinished job. 212 * 213 * @throws IllegalArgumentException 214 */ 215 private void validateStepExecution(StepExecution stepExecution) { 216 Assert.notNull(stepExecution, "stepExecution is required"); 217 Assert.notNull(stepExecution.getStepName(), "StepExecution step name cannot be null."); 218 Assert.notNull(stepExecution.getStartTime(), "StepExecution start time cannot be null."); 219 Assert.notNull(stepExecution.getStatus(), "StepExecution status cannot be null."); 220 } 221 222 @Override 223 public void updateStepExecution(StepExecution stepExecution) { 224 225 validateStepExecution(stepExecution); 226 Assert.notNull(stepExecution.getId(), "StepExecution Id cannot be null. StepExecution must saved" 227 + " before it can be updated."); 228 229 // Do not check for existence of step execution considering 230 // it is saved at every commit point. 231 232 String exitDescription = truncateExitDescription(stepExecution.getExitStatus().getExitDescription()); 233 234 // Attempt to prevent concurrent modification errors by blocking here if 235 // someone is already trying to do it. 236 synchronized (stepExecution) { 237 238 Integer version = stepExecution.getVersion() + 1; 239 Object[] parameters = new Object[] { stepExecution.getStartTime(), stepExecution.getEndTime(), 240 stepExecution.getStatus().toString(), stepExecution.getCommitCount(), stepExecution.getReadCount(), 241 stepExecution.getFilterCount(), stepExecution.getWriteCount(), 242 stepExecution.getExitStatus().getExitCode(), exitDescription, version, 243 stepExecution.getReadSkipCount(), stepExecution.getProcessSkipCount(), 244 stepExecution.getWriteSkipCount(), stepExecution.getRollbackCount(), 245 stepExecution.getLastUpdated(), stepExecution.getId(), stepExecution.getVersion() }; 246 int count = getJdbcTemplate() 247 .update(getQuery(UPDATE_STEP_EXECUTION), 248 parameters, 249 new int[] { Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.INTEGER, Types.INTEGER, 250 Types.INTEGER, Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.INTEGER, 251 Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.TIMESTAMP, 252 Types.BIGINT, Types.INTEGER }); 253 254 // Avoid concurrent modifications... 255 if (count == 0) { 256 int curentVersion = getJdbcTemplate().queryForObject(getQuery(CURRENT_VERSION_STEP_EXECUTION), 257 new Object[] { stepExecution.getId() }, Integer.class); 258 throw new OptimisticLockingFailureException("Attempt to update step execution id=" 259 + stepExecution.getId() + " with wrong version (" + stepExecution.getVersion() 260 + "), where current version is " + curentVersion); 261 } 262 263 stepExecution.incrementVersion(); 264 265 } 266 } 267 268 /** 269 * Truncate the exit description if the length exceeds 270 * {@link #DEFAULT_EXIT_MESSAGE_LENGTH}. 271 * @param description the string to truncate 272 * @return truncated description 273 */ 274 private String truncateExitDescription(String description) { 275 if (description != null && description.length() > exitMessageLength) { 276 if (logger.isDebugEnabled()) { 277 logger.debug("Truncating long message before update of StepExecution, original message is: " + description); 278 } 279 return description.substring(0, exitMessageLength); 280 } else { 281 return description; 282 } 283 } 284 285 @Override 286 @Nullable 287 public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId) { 288 List<StepExecution> executions = getJdbcTemplate().query(getQuery(GET_STEP_EXECUTION), 289 new StepExecutionRowMapper(jobExecution), jobExecution.getId(), stepExecutionId); 290 291 Assert.state(executions.size() <= 1, 292 "There can be at most one step execution with given name for single job execution"); 293 if (executions.isEmpty()) { 294 return null; 295 } else { 296 return executions.get(0); 297 } 298 } 299 300 @Override 301 public void addStepExecutions(JobExecution jobExecution) { 302 getJdbcTemplate().query(getQuery(GET_STEP_EXECUTIONS), new StepExecutionRowMapper(jobExecution), 303 jobExecution.getId()); 304 } 305 306 private static class StepExecutionRowMapper implements RowMapper<StepExecution> { 307 308 private final JobExecution jobExecution; 309 310 public StepExecutionRowMapper(JobExecution jobExecution) { 311 this.jobExecution = jobExecution; 312 } 313 314 @Override 315 public StepExecution mapRow(ResultSet rs, int rowNum) throws SQLException { 316 StepExecution stepExecution = new StepExecution(rs.getString(2), jobExecution, rs.getLong(1)); 317 stepExecution.setStartTime(rs.getTimestamp(3)); 318 stepExecution.setEndTime(rs.getTimestamp(4)); 319 stepExecution.setStatus(BatchStatus.valueOf(rs.getString(5))); 320 stepExecution.setCommitCount(rs.getInt(6)); 321 stepExecution.setReadCount(rs.getInt(7)); 322 stepExecution.setFilterCount(rs.getInt(8)); 323 stepExecution.setWriteCount(rs.getInt(9)); 324 stepExecution.setExitStatus(new ExitStatus(rs.getString(10), rs.getString(11))); 325 stepExecution.setReadSkipCount(rs.getInt(12)); 326 stepExecution.setWriteSkipCount(rs.getInt(13)); 327 stepExecution.setProcessSkipCount(rs.getInt(14)); 328 stepExecution.setRollbackCount(rs.getInt(15)); 329 stepExecution.setLastUpdated(rs.getTimestamp(16)); 330 stepExecution.setVersion(rs.getInt(17)); 331 return stepExecution; 332 } 333 334 } 335 336}