001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.repository.dao;
018
019import java.sql.ResultSet;
020import java.sql.SQLException;
021import java.sql.Types;
022import java.util.ArrayList;
023import java.util.List;
024
025import org.springframework.batch.core.DefaultJobKeyGenerator;
026import org.springframework.batch.core.JobExecution;
027import org.springframework.batch.core.JobInstance;
028import org.springframework.batch.core.JobKeyGenerator;
029import org.springframework.batch.core.JobParameters;
030import org.springframework.batch.core.launch.NoSuchJobException;
031import org.springframework.beans.factory.InitializingBean;
032import org.springframework.dao.DataAccessException;
033import org.springframework.dao.EmptyResultDataAccessException;
034import org.springframework.jdbc.core.ResultSetExtractor;
035import org.springframework.jdbc.core.RowMapper;
036import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
037import org.springframework.lang.Nullable;
038import org.springframework.util.Assert;
039import org.springframework.util.StringUtils;
040
041/**
042 * JDBC implementation of {@link JobInstanceDao}. Uses sequences (via Spring's
043 * {@link DataFieldMaxValueIncrementer} abstraction) to create all primary keys
044 * before inserting a new row. Objects are checked to ensure all mandatory
045 * fields to be stored are not null. If any are found to be null, an
046 * IllegalArgumentException will be thrown. This could be left to JdbcTemplate,
047 * however, the exception will be fairly vague, and fails to highlight which
048 * field caused the exception.
049 *
050 * @author Lucas Ward
051 * @author Dave Syer
052 * @author Robert Kasanicky
053 * @author Michael Minella
054 * @author Will Schipp
055 * @author Mahmoud Ben Hassine
056 */
057public class JdbcJobInstanceDao extends AbstractJdbcBatchMetadataDao implements
058JobInstanceDao, InitializingBean {
059
060        private static final String STAR_WILDCARD = "*";
061        
062        private static final String SQL_WILDCARD = "%";
063        
064        private static final String CREATE_JOB_INSTANCE = "INSERT into %PREFIX%JOB_INSTANCE(JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, VERSION)"
065                        + " values (?, ?, ?, ?)";
066
067        private static final String FIND_JOBS_WITH_NAME = "SELECT JOB_INSTANCE_ID, JOB_NAME from %PREFIX%JOB_INSTANCE where JOB_NAME = ?";
068
069        private static final String FIND_JOBS_WITH_KEY = FIND_JOBS_WITH_NAME
070                        + " and JOB_KEY = ?";
071
072        private static final String COUNT_JOBS_WITH_NAME = "SELECT COUNT(*) from %PREFIX%JOB_INSTANCE where JOB_NAME = ?";
073
074        private static final String FIND_JOBS_WITH_EMPTY_KEY = "SELECT JOB_INSTANCE_ID, JOB_NAME from %PREFIX%JOB_INSTANCE where JOB_NAME = ? and (JOB_KEY = ? OR JOB_KEY is NULL)";
075
076        private static final String GET_JOB_FROM_ID = "SELECT JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, VERSION from %PREFIX%JOB_INSTANCE where JOB_INSTANCE_ID = ?";
077
078        private static final String GET_JOB_FROM_EXECUTION_ID = "SELECT ji.JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, ji.VERSION from %PREFIX%JOB_INSTANCE ji, "
079                        + "%PREFIX%JOB_EXECUTION je where JOB_EXECUTION_ID = ? and ji.JOB_INSTANCE_ID = je.JOB_INSTANCE_ID";
080
081        private static final String FIND_JOB_NAMES = "SELECT distinct JOB_NAME from %PREFIX%JOB_INSTANCE order by JOB_NAME";
082
083        private static final String FIND_LAST_JOBS_BY_NAME = "SELECT JOB_INSTANCE_ID, JOB_NAME from %PREFIX%JOB_INSTANCE where JOB_NAME = ? order by JOB_INSTANCE_ID desc";
084        
085        private static final String FIND_LAST_JOBS_LIKE_NAME = "SELECT JOB_INSTANCE_ID, JOB_NAME from %PREFIX%JOB_INSTANCE where JOB_NAME like ? order by JOB_INSTANCE_ID desc";
086
087        private DataFieldMaxValueIncrementer jobIncrementer;
088
089        private JobKeyGenerator<JobParameters> jobKeyGenerator = new DefaultJobKeyGenerator();
090
091        /**
092         * In this JDBC implementation a job id is obtained by asking the
093         * jobIncrementer (which is likely a sequence) for the next long value, and
094         * then passing the Id and parameter values into an INSERT statement.
095         *
096         * @see JobInstanceDao#createJobInstance(String, JobParameters)
097         * @throws IllegalArgumentException
098         *             if any {@link JobParameters} fields are null.
099         */
100        @Override
101        public JobInstance createJobInstance(String jobName,
102                        JobParameters jobParameters) {
103
104                Assert.notNull(jobName, "Job name must not be null.");
105                Assert.notNull(jobParameters, "JobParameters must not be null.");
106
107                Assert.state(getJobInstance(jobName, jobParameters) == null,
108                                "JobInstance must not already exist");
109
110                Long jobId = jobIncrementer.nextLongValue();
111
112                JobInstance jobInstance = new JobInstance(jobId, jobName);
113                jobInstance.incrementVersion();
114
115                Object[] parameters = new Object[] { jobId, jobName,
116                                jobKeyGenerator.generateKey(jobParameters), jobInstance.getVersion() };
117                getJdbcTemplate().update(
118                                getQuery(CREATE_JOB_INSTANCE),
119                                parameters,
120                                new int[] { Types.BIGINT, Types.VARCHAR, Types.VARCHAR,
121                                        Types.INTEGER });
122
123                return jobInstance;
124        }
125
126        /**
127         * The job table is queried for <strong>any</strong> jobs that match the
128         * given identifier, adding them to a list via the RowMapper callback.
129         *
130         * @see JobInstanceDao#getJobInstance(String, JobParameters)
131         * @throws IllegalArgumentException
132         *             if any {@link JobParameters} fields are null.
133         */
134        @Override
135        @Nullable
136        public JobInstance getJobInstance(final String jobName,
137                        final JobParameters jobParameters) {
138
139                Assert.notNull(jobName, "Job name must not be null.");
140                Assert.notNull(jobParameters, "JobParameters must not be null.");
141
142                String jobKey = jobKeyGenerator.generateKey(jobParameters);
143
144                RowMapper<JobInstance> rowMapper = new JobInstanceRowMapper();
145
146                List<JobInstance> instances;
147                if (StringUtils.hasLength(jobKey)) {
148                        instances = getJdbcTemplate().query(getQuery(FIND_JOBS_WITH_KEY),
149                                        rowMapper, jobName, jobKey);
150                } else {
151                        instances = getJdbcTemplate().query(
152                                        getQuery(FIND_JOBS_WITH_EMPTY_KEY), rowMapper, jobName,
153                                        jobKey);
154                }
155
156                if (instances.isEmpty()) {
157                        return null;
158                } else {
159                        Assert.state(instances.size() == 1, "instance count must be 1 but was " + instances.size());
160                        return instances.get(0);
161                }
162        }
163
164        /*
165         * (non-Javadoc)
166         *
167         * @see
168         * org.springframework.batch.core.repository.dao.JobInstanceDao#getJobInstance
169         * (java.lang.Long)
170         */
171        @Override
172        @Nullable
173        public JobInstance getJobInstance(@Nullable Long instanceId) {
174
175                try {
176                        return getJdbcTemplate().queryForObject(getQuery(GET_JOB_FROM_ID),
177                                        new JobInstanceRowMapper(), instanceId);
178                } catch (EmptyResultDataAccessException e) {
179                        return null;
180                }
181
182        }
183
184        /*
185         * (non-Javadoc)
186         *
187         * @see
188         * org.springframework.batch.core.repository.dao.JobInstanceDao#getJobNames
189         * ()
190         */
191        @Override
192        public List<String> getJobNames() {
193                return getJdbcTemplate().query(getQuery(FIND_JOB_NAMES),
194                                new RowMapper<String>() {
195                        @Override
196                        public String mapRow(ResultSet rs, int rowNum)
197                                        throws SQLException {
198                                return rs.getString(1);
199                        }
200                });
201        }
202
203        /*
204         * (non-Javadoc)
205         *
206         * @seeorg.springframework.batch.core.repository.dao.JobInstanceDao#
207         * getLastJobInstances(java.lang.String, int)
208         */
209        @Override
210        public List<JobInstance> getJobInstances(String jobName, final int start,
211                        final int count) {
212
213                ResultSetExtractor<List<JobInstance>> extractor = new ResultSetExtractor<List<JobInstance>>() {
214
215                        private List<JobInstance> list = new ArrayList<JobInstance>();
216
217                        @Override
218                        public List<JobInstance> extractData(ResultSet rs) throws SQLException,
219                        DataAccessException {
220                                int rowNum = 0;
221                                while (rowNum < start && rs.next()) {
222                                        rowNum++;
223                                }
224                                while (rowNum < start + count && rs.next()) {
225                                        RowMapper<JobInstance> rowMapper = new JobInstanceRowMapper();
226                                        list.add(rowMapper.mapRow(rs, rowNum));
227                                        rowNum++;
228                                }
229                                return list;
230                        }
231
232                };
233
234                List<JobInstance> result = getJdbcTemplate().query(getQuery(FIND_LAST_JOBS_BY_NAME),
235                                new Object[] { jobName }, extractor);
236
237                return result;
238        }
239
240        /*
241         * (non-Javadoc)
242         *
243         * @see
244         * org.springframework.batch.core.repository.dao.JobInstanceDao#getJobInstance
245         * (org.springframework.batch.core.JobExecution)
246         */
247        @Override
248        @Nullable
249        public JobInstance getJobInstance(JobExecution jobExecution) {
250
251                try {
252                        return getJdbcTemplate().queryForObject(
253                                        getQuery(GET_JOB_FROM_EXECUTION_ID),
254                                        new JobInstanceRowMapper(), jobExecution.getId());
255                } catch (EmptyResultDataAccessException e) {
256                        return null;
257                }
258        }
259
260        /* (non-Javadoc)
261         * @see org.springframework.batch.core.repository.dao.JobInstanceDao#getJobInstanceCount(java.lang.String)
262         */
263        @Override
264        public int getJobInstanceCount(@Nullable String jobName) throws NoSuchJobException {
265
266                try {
267                        return getJdbcTemplate().queryForObject(
268                                        getQuery(COUNT_JOBS_WITH_NAME),
269                                        Integer.class,
270                                        jobName);
271                } catch (EmptyResultDataAccessException e) {
272                        throw new NoSuchJobException("No job instances were found for job name " + jobName);
273                }
274        }
275
276        /**
277         * Setter for {@link DataFieldMaxValueIncrementer} to be used when
278         * generating primary keys for {@link JobInstance} instances.
279         *
280         * @param jobIncrementer
281         *            the {@link DataFieldMaxValueIncrementer}
282         */
283        public void setJobIncrementer(DataFieldMaxValueIncrementer jobIncrementer) {
284                this.jobIncrementer = jobIncrementer;
285        }
286
287        @Override
288        public void afterPropertiesSet() throws Exception {
289                super.afterPropertiesSet();
290                Assert.notNull(jobIncrementer, "JobIncrementer is required");
291        }
292
293        /**
294         * @author Dave Syer
295         *
296         */
297        private final class JobInstanceRowMapper implements RowMapper<JobInstance> {
298
299                public JobInstanceRowMapper() {
300                }
301
302                @Override
303                public JobInstance mapRow(ResultSet rs, int rowNum) throws SQLException {
304                        JobInstance jobInstance = new JobInstance(rs.getLong(1), rs.getString(2));
305                        // should always be at version=0 because they never get updated
306                        jobInstance.incrementVersion();
307                        return jobInstance;
308                }
309        }
310
311        @Override
312        public List<JobInstance> findJobInstancesByName(String jobName, final int start, final int count) {
313                @SuppressWarnings("rawtypes")
314                ResultSetExtractor extractor = new ResultSetExtractor() {
315                        private List<JobInstance> list = new ArrayList<JobInstance>();
316
317                        @Override
318                        public Object extractData(ResultSet rs) throws SQLException,
319                        DataAccessException {
320                                int rowNum = 0;
321                                while (rowNum < start && rs.next()) {
322                                        rowNum++;
323                                }
324                                while (rowNum < start + count && rs.next()) {
325                                        RowMapper<JobInstance> rowMapper = new JobInstanceRowMapper();
326                                        list.add(rowMapper.mapRow(rs, rowNum));
327                                        rowNum++;
328                                }
329                                return list;
330                        }
331                };
332
333                if (jobName.contains(STAR_WILDCARD)) {
334                        jobName = jobName.replaceAll("\\" + STAR_WILDCARD, SQL_WILDCARD);
335                }
336                
337                @SuppressWarnings("unchecked")
338                List<JobInstance> result = (List<JobInstance>) getJdbcTemplate().query(getQuery(FIND_LAST_JOBS_LIKE_NAME),
339                                new Object[] { jobName }, extractor);
340
341                return result;
342        }
343}