001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.repository.dao;
018
019import java.sql.ResultSet;
020import java.sql.SQLException;
021import java.sql.Timestamp;
022import java.sql.Types;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.List;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.Set;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032
033import org.springframework.batch.core.BatchStatus;
034import org.springframework.batch.core.ExitStatus;
035import org.springframework.batch.core.JobExecution;
036import org.springframework.batch.core.JobInstance;
037import org.springframework.batch.core.JobParameter;
038import org.springframework.batch.core.JobParameter.ParameterType;
039import org.springframework.batch.core.JobParameters;
040import org.springframework.beans.factory.InitializingBean;
041import org.springframework.dao.EmptyResultDataAccessException;
042import org.springframework.dao.OptimisticLockingFailureException;
043import org.springframework.jdbc.core.RowCallbackHandler;
044import org.springframework.jdbc.core.RowMapper;
045import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
046import org.springframework.lang.Nullable;
047import org.springframework.util.Assert;
048
049/**
050 * JDBC implementation of {@link JobExecutionDao}. Uses sequences (via Spring's
051 * {@link DataFieldMaxValueIncrementer} abstraction) to create all primary keys
052 * before inserting a new row. Objects are checked to ensure all mandatory
053 * fields to be stored are not null. If any are found to be null, an
054 * IllegalArgumentException will be thrown. This could be left to JdbcTemplate,
055 * however, the exception will be fairly vague, and fails to highlight which
056 * field caused the exception.
057 *
058 * @author Lucas Ward
059 * @author Dave Syer
060 * @author Robert Kasanicky
061 * @author Michael Minella
062 * @author Mahmoud Ben Hassine
063 * @author Dimitrios Liapis
064 */
065public class JdbcJobExecutionDao extends AbstractJdbcBatchMetadataDao implements JobExecutionDao, InitializingBean {
066
067        private static final Log logger = LogFactory.getLog(JdbcJobExecutionDao.class);
068
069        private static final String SAVE_JOB_EXECUTION = "INSERT into %PREFIX%JOB_EXECUTION(JOB_EXECUTION_ID, JOB_INSTANCE_ID, START_TIME, "
070                        + "END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, VERSION, CREATE_TIME, LAST_UPDATED, JOB_CONFIGURATION_LOCATION) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
071
072        private static final String CHECK_JOB_EXECUTION_EXISTS = "SELECT COUNT(*) FROM %PREFIX%JOB_EXECUTION WHERE JOB_EXECUTION_ID = ?";
073
074        private static final String GET_STATUS = "SELECT STATUS from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?";
075
076        private static final String UPDATE_JOB_EXECUTION = "UPDATE %PREFIX%JOB_EXECUTION set START_TIME = ?, END_TIME = ?, "
077                        + " STATUS = ?, EXIT_CODE = ?, EXIT_MESSAGE = ?, VERSION = ?, CREATE_TIME = ?, LAST_UPDATED = ? where JOB_EXECUTION_ID = ? and VERSION = ?";
078
079        private static final String FIND_JOB_EXECUTIONS = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION, JOB_CONFIGURATION_LOCATION"
080                        + " from %PREFIX%JOB_EXECUTION where JOB_INSTANCE_ID = ? order by JOB_EXECUTION_ID desc";
081
082        private static final String GET_LAST_EXECUTION = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION, JOB_CONFIGURATION_LOCATION "
083                        + "from %PREFIX%JOB_EXECUTION E where JOB_INSTANCE_ID = ? and JOB_EXECUTION_ID in (SELECT max(JOB_EXECUTION_ID) from %PREFIX%JOB_EXECUTION E2 where E2.JOB_INSTANCE_ID = ?)";
084
085        private static final String GET_EXECUTION_BY_ID = "SELECT JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, CREATE_TIME, LAST_UPDATED, VERSION, JOB_CONFIGURATION_LOCATION"
086                        + " from %PREFIX%JOB_EXECUTION where JOB_EXECUTION_ID = ?";
087
088        private static final String GET_RUNNING_EXECUTIONS = "SELECT E.JOB_EXECUTION_ID, E.START_TIME, E.END_TIME, E.STATUS, E.EXIT_CODE, E.EXIT_MESSAGE, E.CREATE_TIME, E.LAST_UPDATED, E.VERSION, "
089                        + "E.JOB_INSTANCE_ID, E.JOB_CONFIGURATION_LOCATION from %PREFIX%JOB_EXECUTION E, %PREFIX%JOB_INSTANCE I where E.JOB_INSTANCE_ID=I.JOB_INSTANCE_ID and I.JOB_NAME=? and E.START_TIME is not NULL and E.END_TIME is NULL order by E.JOB_EXECUTION_ID desc";
090
091        private static final String CURRENT_VERSION_JOB_EXECUTION = "SELECT VERSION FROM %PREFIX%JOB_EXECUTION WHERE JOB_EXECUTION_ID=?";
092
093        private static final String FIND_PARAMS_FROM_ID = "SELECT JOB_EXECUTION_ID, KEY_NAME, TYPE_CD, "
094                        + "STRING_VAL, DATE_VAL, LONG_VAL, DOUBLE_VAL, IDENTIFYING from %PREFIX%JOB_EXECUTION_PARAMS where JOB_EXECUTION_ID = ?";
095
096        private static final String CREATE_JOB_PARAMETERS = "INSERT into %PREFIX%JOB_EXECUTION_PARAMS(JOB_EXECUTION_ID, KEY_NAME, TYPE_CD, "
097                        + "STRING_VAL, DATE_VAL, LONG_VAL, DOUBLE_VAL, IDENTIFYING) values (?, ?, ?, ?, ?, ?, ?, ?)";
098
099        private int exitMessageLength = DEFAULT_EXIT_MESSAGE_LENGTH;
100
101        private DataFieldMaxValueIncrementer jobExecutionIncrementer;
102
103        /**
104         * Public setter for the exit message length in database. Do not set this if
105         * you haven't modified the schema.
106         * @param exitMessageLength the exitMessageLength to set
107         */
108        public void setExitMessageLength(int exitMessageLength) {
109                this.exitMessageLength = exitMessageLength;
110        }
111
112        /**
113         * Setter for {@link DataFieldMaxValueIncrementer} to be used when
114         * generating primary keys for {@link JobExecution} instances.
115         *
116         * @param jobExecutionIncrementer the {@link DataFieldMaxValueIncrementer}
117         */
118        public void setJobExecutionIncrementer(DataFieldMaxValueIncrementer jobExecutionIncrementer) {
119                this.jobExecutionIncrementer = jobExecutionIncrementer;
120        }
121
122        @Override
123        public void afterPropertiesSet() throws Exception {
124                super.afterPropertiesSet();
125                Assert.notNull(jobExecutionIncrementer, "The jobExecutionIncrementer must not be null.");
126        }
127
128        @Override
129        public List<JobExecution> findJobExecutions(final JobInstance job) {
130
131                Assert.notNull(job, "Job cannot be null.");
132                Assert.notNull(job.getId(), "Job Id cannot be null.");
133
134                return getJdbcTemplate().query(getQuery(FIND_JOB_EXECUTIONS), new JobExecutionRowMapper(job), job.getId());
135        }
136
137        /**
138         *
139         * SQL implementation using Sequences via the Spring incrementer
140         * abstraction. Once a new id has been obtained, the JobExecution is saved
141         * via a SQL INSERT statement.
142         *
143         * @see JobExecutionDao#saveJobExecution(JobExecution)
144         * @throws IllegalArgumentException if jobExecution is null, as well as any
145         * of it's fields to be persisted.
146         */
147        @Override
148        public void saveJobExecution(JobExecution jobExecution) {
149
150                validateJobExecution(jobExecution);
151
152                jobExecution.incrementVersion();
153
154                jobExecution.setId(jobExecutionIncrementer.nextLongValue());
155                Object[] parameters = new Object[] { jobExecution.getId(), jobExecution.getJobId(),
156                                jobExecution.getStartTime(), jobExecution.getEndTime(), jobExecution.getStatus().toString(),
157                                jobExecution.getExitStatus().getExitCode(), jobExecution.getExitStatus().getExitDescription(),
158                                jobExecution.getVersion(), jobExecution.getCreateTime(), jobExecution.getLastUpdated(),
159                                jobExecution.getJobConfigurationName() };
160                getJdbcTemplate().update(
161                                getQuery(SAVE_JOB_EXECUTION),
162                                parameters,
163                                new int[] { Types.BIGINT, Types.BIGINT, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR,
164                                        Types.VARCHAR, Types.VARCHAR, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR });
165
166                insertJobParameters(jobExecution.getId(), jobExecution.getJobParameters());
167        }
168
169        /**
170         * Validate JobExecution. At a minimum, JobId, StartTime, EndTime, and
171         * Status cannot be null.
172         *
173         * @param jobExecution
174         * @throws IllegalArgumentException
175         */
176        private void validateJobExecution(JobExecution jobExecution) {
177
178                Assert.notNull(jobExecution, "jobExecution cannot be null");
179                Assert.notNull(jobExecution.getJobId(), "JobExecution Job-Id cannot be null.");
180                Assert.notNull(jobExecution.getStatus(), "JobExecution status cannot be null.");
181                Assert.notNull(jobExecution.getCreateTime(), "JobExecution create time cannot be null");
182        }
183
184        /**
185         * Update given JobExecution using a SQL UPDATE statement. The JobExecution
186         * is first checked to ensure all fields are not null, and that it has an
187         * ID. The database is then queried to ensure that the ID exists, which
188         * ensures that it is valid.
189         *
190         * @see JobExecutionDao#updateJobExecution(JobExecution)
191         */
192        @Override
193        public void updateJobExecution(JobExecution jobExecution) {
194
195                validateJobExecution(jobExecution);
196
197                Assert.notNull(jobExecution.getId(),
198                                "JobExecution ID cannot be null. JobExecution must be saved before it can be updated");
199
200                Assert.notNull(jobExecution.getVersion(),
201                                "JobExecution version cannot be null. JobExecution must be saved before it can be updated");
202
203                synchronized (jobExecution) {
204                        Integer version = jobExecution.getVersion() + 1;
205
206                        String exitDescription = jobExecution.getExitStatus().getExitDescription();
207                        if (exitDescription != null && exitDescription.length() > exitMessageLength) {
208                                exitDescription = exitDescription.substring(0, exitMessageLength);
209                                if (logger.isDebugEnabled()) {
210                                        logger.debug("Truncating long message before update of JobExecution: " + jobExecution);
211                                }
212                        }
213                        Object[] parameters = new Object[] { jobExecution.getStartTime(), jobExecution.getEndTime(),
214                                        jobExecution.getStatus().toString(), jobExecution.getExitStatus().getExitCode(), exitDescription,
215                                        version, jobExecution.getCreateTime(), jobExecution.getLastUpdated(), jobExecution.getId(),
216                                        jobExecution.getVersion() };
217
218                        // Check if given JobExecution's Id already exists, if none is found
219                        // it
220                        // is invalid and
221                        // an exception should be thrown.
222                        if (getJdbcTemplate().queryForObject(getQuery(CHECK_JOB_EXECUTION_EXISTS), Integer.class,
223                                        new Object[] { jobExecution.getId() }) != 1) {
224                                throw new NoSuchObjectException("Invalid JobExecution, ID " + jobExecution.getId() + " not found.");
225                        }
226
227                        int count = getJdbcTemplate().update(
228                                        getQuery(UPDATE_JOB_EXECUTION),
229                                        parameters,
230                                        new int[] { Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
231                                                Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP, Types.BIGINT, Types.INTEGER });
232
233                        // Avoid concurrent modifications...
234                        if (count == 0) {
235                                int currentVersion = getJdbcTemplate().queryForObject(getQuery(CURRENT_VERSION_JOB_EXECUTION), Integer.class,
236                                                new Object[] { jobExecution.getId() });
237                                throw new OptimisticLockingFailureException("Attempt to update job execution id="
238                                                + jobExecution.getId() + " with wrong version (" + jobExecution.getVersion()
239                                                + "), where current version is " + currentVersion);
240                        }
241
242                        jobExecution.incrementVersion();
243                }
244        }
245
246        @Override
247        public JobExecution getLastJobExecution(JobInstance jobInstance) {
248
249                Long id = jobInstance.getId();
250
251                List<JobExecution> executions = getJdbcTemplate().query(getQuery(GET_LAST_EXECUTION),
252                                new JobExecutionRowMapper(jobInstance), id, id);
253
254                Assert.state(executions.size() <= 1, "There must be at most one latest job execution");
255
256                if (executions.isEmpty()) {
257                        return null;
258                }
259                else {
260                        return executions.get(0);
261                }
262        }
263
264        /*
265         * (non-Javadoc)
266         *
267         * @seeorg.springframework.batch.core.repository.dao.JobExecutionDao#
268         * getLastJobExecution(java.lang.String)
269         */
270        @Override
271        @Nullable
272        public JobExecution getJobExecution(Long executionId) {
273                try {
274                        JobExecution jobExecution = getJdbcTemplate().queryForObject(getQuery(GET_EXECUTION_BY_ID),
275                                        new JobExecutionRowMapper(), executionId);
276                        return jobExecution;
277                }
278                catch (EmptyResultDataAccessException e) {
279                        return null;
280                }
281        }
282
283        /*
284         * (non-Javadoc)
285         *
286         * @seeorg.springframework.batch.core.repository.dao.JobExecutionDao#
287         * findRunningJobExecutions(java.lang.String)
288         */
289        @Override
290        public Set<JobExecution> findRunningJobExecutions(String jobName) {
291
292                final Set<JobExecution> result = new HashSet<JobExecution>();
293                RowCallbackHandler handler = new RowCallbackHandler() {
294                        @Override
295                        public void processRow(ResultSet rs) throws SQLException {
296                                JobExecutionRowMapper mapper = new JobExecutionRowMapper();
297                                result.add(mapper.mapRow(rs, 0));
298                        }
299                };
300                getJdbcTemplate().query(getQuery(GET_RUNNING_EXECUTIONS), new Object[] { jobName }, handler);
301
302                return result;
303        }
304
305        @Override
306        public void synchronizeStatus(JobExecution jobExecution) {
307                int currentVersion = getJdbcTemplate().queryForObject(getQuery(CURRENT_VERSION_JOB_EXECUTION), Integer.class,
308                                jobExecution.getId());
309
310                if (currentVersion != jobExecution.getVersion().intValue()) {
311                        String status = getJdbcTemplate().queryForObject(getQuery(GET_STATUS), String.class, jobExecution.getId());
312                        jobExecution.upgradeStatus(BatchStatus.valueOf(status));
313                        jobExecution.setVersion(currentVersion);
314                }
315        }
316
317        /**
318         * Convenience method that inserts all parameters from the provided
319         * JobParameters.
320         *
321         */
322        private void insertJobParameters(Long executionId, JobParameters jobParameters) {
323
324                for (Entry<String, JobParameter> entry : jobParameters.getParameters()
325                                .entrySet()) {
326                        JobParameter jobParameter = entry.getValue();
327                        insertParameter(executionId, jobParameter.getType(), entry.getKey(),
328                                        jobParameter.getValue(), jobParameter.isIdentifying());
329                }
330        }
331
332        /**
333         * Convenience method that inserts an individual records into the
334         * JobParameters table.
335         */
336        private void insertParameter(Long executionId, ParameterType type, String key,
337                        Object value, boolean identifying) {
338
339                Object[] args = new Object[0];
340                int[] argTypes = new int[] { Types.BIGINT, Types.VARCHAR,
341                                Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.BIGINT,
342                                Types.DOUBLE, Types.CHAR };
343
344                String identifyingFlag = identifying? "Y":"N";
345
346                if (type == ParameterType.STRING) {
347                        args = new Object[] { executionId, key, type, value, new Timestamp(0L),
348                                        0L, 0D, identifyingFlag};
349                } else if (type == ParameterType.LONG) {
350                        args = new Object[] { executionId, key, type, "", new Timestamp(0L),
351                                        value, new Double(0), identifyingFlag};
352                } else if (type == ParameterType.DOUBLE) {
353                        args = new Object[] { executionId, key, type, "", new Timestamp(0L), 0L,
354                                        value, identifyingFlag};
355                } else if (type == ParameterType.DATE) {
356                        args = new Object[] { executionId, key, type, "", value, 0L, 0D, identifyingFlag};
357                }
358
359                getJdbcTemplate().update(getQuery(CREATE_JOB_PARAMETERS), args, argTypes);
360        }
361
362        /**
363         * @param executionId {@link Long} containing the id for the execution.
364         * @return job parameters for the requested execution id
365         */
366        protected JobParameters getJobParameters(Long executionId) {
367                final Map<String, JobParameter> map = new HashMap<String, JobParameter>();
368                RowCallbackHandler handler = new RowCallbackHandler() {
369                        @Override
370                        public void processRow(ResultSet rs) throws SQLException {
371                                ParameterType type = ParameterType.valueOf(rs.getString(3));
372                                JobParameter value = null;
373
374                                if (type == ParameterType.STRING) {
375                                        value = new JobParameter(rs.getString(4), rs.getString(8).equalsIgnoreCase("Y"));
376                                } else if (type == ParameterType.LONG) {
377                                        value = new JobParameter(rs.getLong(6), rs.getString(8).equalsIgnoreCase("Y"));
378                                } else if (type == ParameterType.DOUBLE) {
379                                        value = new JobParameter(rs.getDouble(7), rs.getString(8).equalsIgnoreCase("Y"));
380                                } else if (type == ParameterType.DATE) {
381                                        value = new JobParameter(rs.getTimestamp(5), rs.getString(8).equalsIgnoreCase("Y"));
382                                }
383
384                                // No need to assert that value is not null because it's an enum
385                                map.put(rs.getString(2), value);
386                        }
387                };
388
389                getJdbcTemplate().query(getQuery(FIND_PARAMS_FROM_ID), new Object[] { executionId }, handler);
390
391                return new JobParameters(map);
392        }
393
394        /**
395         * Re-usable mapper for {@link JobExecution} instances.
396         *
397         * @author Dave Syer
398         *
399         */
400        private final class JobExecutionRowMapper implements RowMapper<JobExecution> {
401
402                private JobInstance jobInstance;
403
404                private JobParameters jobParameters;
405
406                public JobExecutionRowMapper() {
407                }
408
409                public JobExecutionRowMapper(JobInstance jobInstance) {
410                        this.jobInstance = jobInstance;
411                }
412
413                @Override
414                public JobExecution mapRow(ResultSet rs, int rowNum) throws SQLException {
415                        Long id = rs.getLong(1);
416                        String jobConfigurationLocation = rs.getString(10);
417                        JobExecution jobExecution;
418                        if (jobParameters == null) {
419                                jobParameters = getJobParameters(id);
420                        }
421
422                        if (jobInstance == null) {
423                                jobExecution = new JobExecution(id, jobParameters, jobConfigurationLocation);
424                        }
425                        else {
426                                jobExecution = new JobExecution(jobInstance, id, jobParameters, jobConfigurationLocation);
427                        }
428
429                        jobExecution.setStartTime(rs.getTimestamp(2));
430                        jobExecution.setEndTime(rs.getTimestamp(3));
431                        jobExecution.setStatus(BatchStatus.valueOf(rs.getString(4)));
432                        jobExecution.setExitStatus(new ExitStatus(rs.getString(5), rs.getString(6)));
433                        jobExecution.setCreateTime(rs.getTimestamp(7));
434                        jobExecution.setLastUpdated(rs.getTimestamp(8));
435                        jobExecution.setVersion(rs.getInt(9));
436                        return jobExecution;
437                }
438
439        }
440}