001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.repository.dao;
018
019import java.sql.PreparedStatement;
020import java.sql.ResultSet;
021import java.sql.SQLException;
022import java.sql.Timestamp;
023import java.sql.Types;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collection;
027import java.util.Iterator;
028import java.util.List;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032
033import org.springframework.batch.core.BatchStatus;
034import org.springframework.batch.core.ExitStatus;
035import org.springframework.batch.core.JobExecution;
036import org.springframework.batch.core.StepExecution;
037import org.springframework.beans.factory.InitializingBean;
038import org.springframework.dao.OptimisticLockingFailureException;
039import org.springframework.jdbc.core.BatchPreparedStatementSetter;
040import org.springframework.jdbc.core.RowMapper;
041import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
042import org.springframework.lang.Nullable;
043import org.springframework.util.Assert;
044
045/**
046 * JDBC implementation of {@link StepExecutionDao}.<br>
047 *
048 * Allows customization of the tables names used by Spring Batch for step meta
049 * data via a prefix property.<br>
050 *
051 * Uses sequences or tables (via Spring's {@link DataFieldMaxValueIncrementer}
052 * abstraction) to create all primary keys before inserting a new row. All
053 * objects are checked to ensure all fields to be stored are not null. If any
054 * are found to be null, an IllegalArgumentException will be thrown. This could
055 * be left to JdbcTemplate, however, the exception will be fairly vague, and
056 * fails to highlight which field caused the exception.<br>
057 *
058 * @author Lucas Ward
059 * @author Dave Syer
060 * @author Robert Kasanicky
061 * @author David Turanski
062 * @author Mahmoud Ben Hassine
063 *
064 * @see StepExecutionDao
065 */
066public class JdbcStepExecutionDao extends AbstractJdbcBatchMetadataDao implements StepExecutionDao, InitializingBean {
067
068        private static final Log logger = LogFactory.getLog(JdbcStepExecutionDao.class);
069
070        private static final String SAVE_STEP_EXECUTION = "INSERT into %PREFIX%STEP_EXECUTION(STEP_EXECUTION_ID, VERSION, STEP_NAME, JOB_EXECUTION_ID, START_TIME, "
071                        + "END_TIME, STATUS, COMMIT_COUNT, READ_COUNT, FILTER_COUNT, WRITE_COUNT, EXIT_CODE, EXIT_MESSAGE, READ_SKIP_COUNT, WRITE_SKIP_COUNT, PROCESS_SKIP_COUNT, ROLLBACK_COUNT, LAST_UPDATED) "
072                        + "values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
073
074        private static final String UPDATE_STEP_EXECUTION = "UPDATE %PREFIX%STEP_EXECUTION set START_TIME = ?, END_TIME = ?, "
075                        + "STATUS = ?, COMMIT_COUNT = ?, READ_COUNT = ?, FILTER_COUNT = ?, WRITE_COUNT = ?, EXIT_CODE = ?, "
076                        + "EXIT_MESSAGE = ?, VERSION = ?, READ_SKIP_COUNT = ?, PROCESS_SKIP_COUNT = ?, WRITE_SKIP_COUNT = ?, ROLLBACK_COUNT = ?, LAST_UPDATED = ?"
077                        + " where STEP_EXECUTION_ID = ? and VERSION = ?";
078
079        private static final String GET_RAW_STEP_EXECUTIONS = "SELECT STEP_EXECUTION_ID, STEP_NAME, START_TIME, END_TIME, STATUS, COMMIT_COUNT,"
080                        + " READ_COUNT, FILTER_COUNT, WRITE_COUNT, EXIT_CODE, EXIT_MESSAGE, READ_SKIP_COUNT, WRITE_SKIP_COUNT, PROCESS_SKIP_COUNT, ROLLBACK_COUNT, LAST_UPDATED, VERSION from %PREFIX%STEP_EXECUTION where JOB_EXECUTION_ID = ?";
081
082        private static final String GET_STEP_EXECUTIONS = GET_RAW_STEP_EXECUTIONS + " order by STEP_EXECUTION_ID";
083
084        private static final String GET_STEP_EXECUTION = GET_RAW_STEP_EXECUTIONS + " and STEP_EXECUTION_ID = ?";
085
086        private static final String CURRENT_VERSION_STEP_EXECUTION = "SELECT VERSION FROM %PREFIX%STEP_EXECUTION WHERE STEP_EXECUTION_ID=?";
087
088        private int exitMessageLength = DEFAULT_EXIT_MESSAGE_LENGTH;
089
090        private DataFieldMaxValueIncrementer stepExecutionIncrementer;
091
092        /**
093         * Public setter for the exit message length in database. Do not set this if
094         * you haven't modified the schema.
095         * @param exitMessageLength the exitMessageLength to set
096         */
097        public void setExitMessageLength(int exitMessageLength) {
098                this.exitMessageLength = exitMessageLength;
099        }
100
101        public void setStepExecutionIncrementer(DataFieldMaxValueIncrementer stepExecutionIncrementer) {
102                this.stepExecutionIncrementer = stepExecutionIncrementer;
103        }
104
105        @Override
106        public void afterPropertiesSet() throws Exception {
107                super.afterPropertiesSet();
108                Assert.notNull(stepExecutionIncrementer, "StepExecutionIncrementer cannot be null.");
109        }
110
111        /**
112         * Save a StepExecution. A unique id will be generated by the
113         * stepExecutionIncrementer, and then set in the StepExecution. All values
114         * will then be stored via an INSERT statement.
115         *
116         * @see StepExecutionDao#saveStepExecution(StepExecution)
117         */
118        @Override
119        public void saveStepExecution(StepExecution stepExecution) {
120                List<Object[]> parameters = buildStepExecutionParameters(stepExecution);
121                Object[] parameterValues = parameters.get(0);
122
123                //Template expects an int array fails with Integer
124                int[] parameterTypes = new int[parameters.get(1).length];
125                for (int i = 0; i < parameterTypes.length; i++) {
126                        parameterTypes[i] = (Integer)parameters.get(1)[i];
127                }
128
129                getJdbcTemplate().update(getQuery(SAVE_STEP_EXECUTION), parameterValues, parameterTypes);
130        }
131
132        /**
133         * Batch insert StepExecutions
134         * @see StepExecutionDao#saveStepExecutions(Collection)
135         */
136        @Override
137        public void saveStepExecutions(final Collection<StepExecution> stepExecutions) {
138                Assert.notNull(stepExecutions, "Attempt to save a null collection of step executions");
139
140        if (!stepExecutions.isEmpty()) {
141            final Iterator<StepExecution> iterator = stepExecutions.iterator();
142            getJdbcTemplate().batchUpdate(getQuery(SAVE_STEP_EXECUTION), new BatchPreparedStatementSetter() {
143
144                @Override
145                public int getBatchSize() {
146                    return stepExecutions.size();
147                }
148
149                @Override
150                public void setValues(PreparedStatement ps, int i) throws SQLException {
151                    StepExecution stepExecution = iterator.next();
152                    List<Object[]> parameters = buildStepExecutionParameters(stepExecution);
153                    Object[] parameterValues = parameters.get(0);
154                    Integer[] parameterTypes = (Integer[]) parameters.get(1);
155                    for (int indx = 0; indx < parameterValues.length; indx++) {
156                        switch (parameterTypes[indx]) {
157                            case Types.INTEGER:
158                                ps.setInt(indx + 1, (Integer) parameterValues[indx]);
159                                break;
160                            case Types.VARCHAR:
161                                ps.setString(indx + 1, (String) parameterValues[indx]);
162                                break;
163                            case Types.TIMESTAMP:
164                                if (parameterValues[indx] != null) {
165                                    ps.setTimestamp(indx + 1, new Timestamp(((java.util.Date) parameterValues[indx]).getTime()));
166                                } else {
167                                    ps.setNull(indx + 1, Types.TIMESTAMP);
168                                }
169                                break;
170                            case Types.BIGINT:
171                                ps.setLong(indx + 1, (Long) parameterValues[indx]);
172                                break;
173                            default:
174                                throw new IllegalArgumentException(
175                                        "unsupported SQL parameter type for step execution field index " + i);
176                        }
177                    }
178                }
179            });
180        }
181    }
182
183        private List<Object[]> buildStepExecutionParameters(StepExecution stepExecution) {
184                Assert.isNull(stepExecution.getId(),
185                                "to-be-saved (not updated) StepExecution can't already have an id assigned");
186                Assert.isNull(stepExecution.getVersion(),
187                                "to-be-saved (not updated) StepExecution can't already have a version assigned");
188                validateStepExecution(stepExecution);
189                stepExecution.setId(stepExecutionIncrementer.nextLongValue());
190                stepExecution.incrementVersion(); //Should be 0
191                List<Object[]> parameters = new ArrayList<Object[]>();
192                String exitDescription = truncateExitDescription(stepExecution.getExitStatus().getExitDescription());
193                Object[] parameterValues = new Object[] { stepExecution.getId(), stepExecution.getVersion(),
194                                stepExecution.getStepName(), stepExecution.getJobExecutionId(), stepExecution.getStartTime(),
195                                stepExecution.getEndTime(), stepExecution.getStatus().toString(), stepExecution.getCommitCount(),
196                                stepExecution.getReadCount(), stepExecution.getFilterCount(), stepExecution.getWriteCount(),
197                                stepExecution.getExitStatus().getExitCode(), exitDescription, stepExecution.getReadSkipCount(),
198                                stepExecution.getWriteSkipCount(), stepExecution.getProcessSkipCount(),
199                                stepExecution.getRollbackCount(), stepExecution.getLastUpdated() };
200                Integer[] parameterTypes = new Integer[] { Types.BIGINT, Types.INTEGER, Types.VARCHAR, Types.BIGINT,
201                                Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.INTEGER, Types.INTEGER, Types.INTEGER,
202                                Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.INTEGER, Types.INTEGER,
203                                Types.INTEGER, Types.TIMESTAMP };
204                parameters.add(0, Arrays.copyOf(parameterValues,parameterValues.length));
205                parameters.add(1, Arrays.copyOf(parameterTypes,parameterTypes.length));
206                return parameters;
207        }
208
209        /**
210         * Validate StepExecution. At a minimum, JobId, StartTime, and Status cannot
211         * be null. EndTime can be null for an unfinished job.
212         *
213         * @throws IllegalArgumentException
214         */
215        private void validateStepExecution(StepExecution stepExecution) {
216                Assert.notNull(stepExecution, "stepExecution is required");
217                Assert.notNull(stepExecution.getStepName(), "StepExecution step name cannot be null.");
218                Assert.notNull(stepExecution.getStartTime(), "StepExecution start time cannot be null.");
219                Assert.notNull(stepExecution.getStatus(), "StepExecution status cannot be null.");
220        }
221
222        @Override
223        public void updateStepExecution(StepExecution stepExecution) {
224
225                validateStepExecution(stepExecution);
226                Assert.notNull(stepExecution.getId(), "StepExecution Id cannot be null. StepExecution must saved"
227                                + " before it can be updated.");
228
229                // Do not check for existence of step execution considering
230                // it is saved at every commit point.
231
232                String exitDescription = truncateExitDescription(stepExecution.getExitStatus().getExitDescription());
233
234                // Attempt to prevent concurrent modification errors by blocking here if
235                // someone is already trying to do it.
236                synchronized (stepExecution) {
237
238                        Integer version = stepExecution.getVersion() + 1;
239                        Object[] parameters = new Object[] { stepExecution.getStartTime(), stepExecution.getEndTime(),
240                                        stepExecution.getStatus().toString(), stepExecution.getCommitCount(), stepExecution.getReadCount(),
241                                        stepExecution.getFilterCount(), stepExecution.getWriteCount(),
242                                        stepExecution.getExitStatus().getExitCode(), exitDescription, version,
243                                        stepExecution.getReadSkipCount(), stepExecution.getProcessSkipCount(),
244                                        stepExecution.getWriteSkipCount(), stepExecution.getRollbackCount(),
245                                        stepExecution.getLastUpdated(), stepExecution.getId(), stepExecution.getVersion() };
246                        int count = getJdbcTemplate()
247                                        .update(getQuery(UPDATE_STEP_EXECUTION),
248                                                        parameters,
249                                                        new int[] { Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.INTEGER, Types.INTEGER,
250                                                                        Types.INTEGER, Types.INTEGER, Types.VARCHAR, Types.VARCHAR, Types.INTEGER,
251                                                                        Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.TIMESTAMP,
252                                                                        Types.BIGINT, Types.INTEGER });
253
254                        // Avoid concurrent modifications...
255                        if (count == 0) {
256                                int curentVersion = getJdbcTemplate().queryForObject(getQuery(CURRENT_VERSION_STEP_EXECUTION),
257                                                new Object[] { stepExecution.getId() }, Integer.class);
258                                throw new OptimisticLockingFailureException("Attempt to update step execution id="
259                                                + stepExecution.getId() + " with wrong version (" + stepExecution.getVersion()
260                                                + "), where current version is " + curentVersion);
261                        }
262
263                        stepExecution.incrementVersion();
264
265                }
266        }
267
268        /**
269         * Truncate the exit description if the length exceeds
270         * {@link #DEFAULT_EXIT_MESSAGE_LENGTH}.
271         * @param description the string to truncate
272         * @return truncated description
273         */
274        private String truncateExitDescription(String description) {
275                if (description != null && description.length() > exitMessageLength) {
276                        if (logger.isDebugEnabled()) {
277                                logger.debug("Truncating long message before update of StepExecution, original message is: " + description);
278                        }
279                        return description.substring(0, exitMessageLength);
280                } else {
281                        return description;
282                }
283        }
284
285        @Override
286        @Nullable
287        public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId) {
288                List<StepExecution> executions = getJdbcTemplate().query(getQuery(GET_STEP_EXECUTION),
289                                new StepExecutionRowMapper(jobExecution), jobExecution.getId(), stepExecutionId);
290
291                Assert.state(executions.size() <= 1,
292                                "There can be at most one step execution with given name for single job execution");
293                if (executions.isEmpty()) {
294                        return null;
295                } else {
296                        return executions.get(0);
297                }
298        }
299
300        @Override
301        public void addStepExecutions(JobExecution jobExecution) {
302                getJdbcTemplate().query(getQuery(GET_STEP_EXECUTIONS), new StepExecutionRowMapper(jobExecution),
303                                jobExecution.getId());
304        }
305
306        private static class StepExecutionRowMapper implements RowMapper<StepExecution> {
307
308                private final JobExecution jobExecution;
309
310                public StepExecutionRowMapper(JobExecution jobExecution) {
311                        this.jobExecution = jobExecution;
312                }
313
314                @Override
315                public StepExecution mapRow(ResultSet rs, int rowNum) throws SQLException {
316                        StepExecution stepExecution = new StepExecution(rs.getString(2), jobExecution, rs.getLong(1));
317                        stepExecution.setStartTime(rs.getTimestamp(3));
318                        stepExecution.setEndTime(rs.getTimestamp(4));
319                        stepExecution.setStatus(BatchStatus.valueOf(rs.getString(5)));
320                        stepExecution.setCommitCount(rs.getInt(6));
321                        stepExecution.setReadCount(rs.getInt(7));
322                        stepExecution.setFilterCount(rs.getInt(8));
323                        stepExecution.setWriteCount(rs.getInt(9));
324                        stepExecution.setExitStatus(new ExitStatus(rs.getString(10), rs.getString(11)));
325                        stepExecution.setReadSkipCount(rs.getInt(12));
326                        stepExecution.setWriteSkipCount(rs.getInt(13));
327                        stepExecution.setProcessSkipCount(rs.getInt(14));
328                        stepExecution.setRollbackCount(rs.getInt(15));
329                        stepExecution.setLastUpdated(rs.getTimestamp(16));
330                        stepExecution.setVersion(rs.getInt(17));
331                        return stepExecution;
332                }
333
334        }
335
336}