001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.repository.support;
018
019import org.apache.commons.logging.Log;
020import org.apache.commons.logging.LogFactory;
021import org.springframework.batch.core.BatchStatus;
022import org.springframework.batch.core.JobExecution;
023import org.springframework.batch.core.JobInstance;
024import org.springframework.batch.core.JobParameters;
025import org.springframework.batch.core.StepExecution;
026import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
027import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
028import org.springframework.batch.core.repository.JobRepository;
029import org.springframework.batch.core.repository.JobRestartException;
030import org.springframework.batch.core.repository.dao.ExecutionContextDao;
031import org.springframework.batch.core.repository.dao.JobExecutionDao;
032import org.springframework.batch.core.repository.dao.JobInstanceDao;
033import org.springframework.batch.core.repository.dao.StepExecutionDao;
034import org.springframework.batch.item.ExecutionContext;
035import org.springframework.lang.Nullable;
036import org.springframework.util.Assert;
037
038import java.util.ArrayList;
039import java.util.Collection;
040import java.util.Date;
041import java.util.List;
042
043/**
044 *
045 * <p>
046 * Implementation of {@link JobRepository} that stores JobInstances,
047 * JobExecutions, and StepExecutions using the injected DAOs.
048 * <p>
049 *
050 * @author Lucas Ward
051 * @author Dave Syer
052 * @author Robert Kasanicky
053 * @author David Turanski
054 * @author Mahmoud Ben Hassine
055 *
056 * @see JobRepository
057 * @see JobInstanceDao
058 * @see JobExecutionDao
059 * @see StepExecutionDao
060 *
061 */
062public class SimpleJobRepository implements JobRepository {
063
064        private static final Log logger = LogFactory.getLog(SimpleJobRepository.class);
065
066        private JobInstanceDao jobInstanceDao;
067
068        private JobExecutionDao jobExecutionDao;
069
070        private StepExecutionDao stepExecutionDao;
071
072        private ExecutionContextDao ecDao;
073
074        /**
075         * Provide default constructor with low visibility in case user wants to use
076         * use aop:proxy-target-class="true" for AOP interceptor.
077         */
078        SimpleJobRepository() {
079        }
080
081        public SimpleJobRepository(JobInstanceDao jobInstanceDao, JobExecutionDao jobExecutionDao,
082                        StepExecutionDao stepExecutionDao, ExecutionContextDao ecDao) {
083                super();
084                this.jobInstanceDao = jobInstanceDao;
085                this.jobExecutionDao = jobExecutionDao;
086                this.stepExecutionDao = stepExecutionDao;
087                this.ecDao = ecDao;
088        }
089
090        @Override
091        public boolean isJobInstanceExists(String jobName, JobParameters jobParameters) {
092                return jobInstanceDao.getJobInstance(jobName, jobParameters) != null;
093        }
094
095        @Override
096        public JobExecution createJobExecution(String jobName, JobParameters jobParameters)
097                        throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
098
099                Assert.notNull(jobName, "Job name must not be null.");
100                Assert.notNull(jobParameters, "JobParameters must not be null.");
101
102                /*
103                 * Find all jobs matching the runtime information.
104                 *
105                 * If this method is transactional, and the isolation level is
106                 * REPEATABLE_READ or better, another launcher trying to start the same
107                 * job in another thread or process will block until this transaction
108                 * has finished.
109                 */
110
111                JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
112                ExecutionContext executionContext;
113
114                // existing job instance found
115                if (jobInstance != null) {
116
117                        List<JobExecution> executions = jobExecutionDao.findJobExecutions(jobInstance);
118
119                        // check for running executions and find the last started
120                        for (JobExecution execution : executions) {
121                                if (execution.isRunning() || execution.isStopping()) {
122                                        throw new JobExecutionAlreadyRunningException("A job execution for this job is already running: "
123                                                        + jobInstance);
124                                }
125                                BatchStatus status = execution.getStatus();
126                                if (status == BatchStatus.UNKNOWN) {
127                                        throw new JobRestartException("Cannot restart job from UNKNOWN status. "
128                                                        + "The last execution ended with a failure that could not be rolled back, "
129                                                        + "so it may be dangerous to proceed. Manual intervention is probably necessary.");
130                                }
131                                if (execution.getJobParameters().getParameters().size() > 0 && (status == BatchStatus.COMPLETED || status == BatchStatus.ABANDONED)) {
132                                        throw new JobInstanceAlreadyCompleteException(
133                                                        "A job instance already exists and is complete for parameters=" + jobParameters
134                                                        + ".  If you want to run this job again, change the parameters.");
135                                }
136                        }
137                        executionContext = ecDao.getExecutionContext(jobExecutionDao.getLastJobExecution(jobInstance));
138                }
139                else {
140                        // no job found, create one
141                        jobInstance = jobInstanceDao.createJobInstance(jobName, jobParameters);
142                        executionContext = new ExecutionContext();
143                }
144
145                JobExecution jobExecution = new JobExecution(jobInstance, jobParameters, null);
146                jobExecution.setExecutionContext(executionContext);
147                jobExecution.setLastUpdated(new Date(System.currentTimeMillis()));
148
149                // Save the JobExecution so that it picks up an ID (useful for clients
150                // monitoring asynchronous executions):
151                jobExecutionDao.saveJobExecution(jobExecution);
152                ecDao.saveExecutionContext(jobExecution);
153
154                return jobExecution;
155
156        }
157
158        @Override
159        public void update(JobExecution jobExecution) {
160
161                Assert.notNull(jobExecution, "JobExecution cannot be null.");
162                Assert.notNull(jobExecution.getJobId(), "JobExecution must have a Job ID set.");
163                Assert.notNull(jobExecution.getId(), "JobExecution must be already saved (have an id assigned).");
164
165                jobExecution.setLastUpdated(new Date(System.currentTimeMillis()));
166
167                jobExecutionDao.synchronizeStatus(jobExecution);
168                jobExecutionDao.updateJobExecution(jobExecution);
169        }
170
171        @Override
172        public void add(StepExecution stepExecution) {
173                validateStepExecution(stepExecution);
174
175                stepExecution.setLastUpdated(new Date(System.currentTimeMillis()));
176                stepExecutionDao.saveStepExecution(stepExecution);
177                ecDao.saveExecutionContext(stepExecution);
178        }
179
180        @Override
181        public void addAll(Collection<StepExecution> stepExecutions) {
182                Assert.notNull(stepExecutions, "Attempt to save a null collection of step executions");
183                for (StepExecution stepExecution : stepExecutions) {
184                        validateStepExecution(stepExecution);
185                        stepExecution.setLastUpdated(new Date(System.currentTimeMillis()));
186                }
187                stepExecutionDao.saveStepExecutions(stepExecutions);
188                ecDao.saveExecutionContexts(stepExecutions);
189        }
190
191        @Override
192        public void update(StepExecution stepExecution) {
193                validateStepExecution(stepExecution);
194                Assert.notNull(stepExecution.getId(), "StepExecution must already be saved (have an id assigned)");
195
196                stepExecution.setLastUpdated(new Date(System.currentTimeMillis()));
197                stepExecutionDao.updateStepExecution(stepExecution);
198                checkForInterruption(stepExecution);
199        }
200
201        private void validateStepExecution(StepExecution stepExecution) {
202                Assert.notNull(stepExecution, "StepExecution cannot be null.");
203                Assert.notNull(stepExecution.getStepName(), "StepExecution's step name cannot be null.");
204                Assert.notNull(stepExecution.getJobExecutionId(), "StepExecution must belong to persisted JobExecution");
205        }
206
207        @Override
208        public void updateExecutionContext(StepExecution stepExecution) {
209                validateStepExecution(stepExecution);
210                Assert.notNull(stepExecution.getId(), "StepExecution must already be saved (have an id assigned)");
211                ecDao.updateExecutionContext(stepExecution);
212        }
213
214        @Override
215        public void updateExecutionContext(JobExecution jobExecution) {
216                ecDao.updateExecutionContext(jobExecution);
217        }
218
219        @Override
220        @Nullable
221        public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
222                List<JobExecution> jobExecutions = jobExecutionDao.findJobExecutions(jobInstance);
223                List<StepExecution> stepExecutions = new ArrayList<StepExecution>(jobExecutions.size());
224
225                for (JobExecution jobExecution : jobExecutions) {
226                        stepExecutionDao.addStepExecutions(jobExecution);
227                        for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
228                                if (stepName.equals(stepExecution.getStepName())) {
229                                        stepExecutions.add(stepExecution);
230                                }
231                        }
232                }
233
234                StepExecution latest = null;
235                for (StepExecution stepExecution : stepExecutions) {
236                        if (latest == null) {
237                                latest = stepExecution;
238                        }
239                        if (latest.getStartTime().getTime() < stepExecution.getStartTime().getTime()) {
240                                latest = stepExecution;
241                        }
242                        // Use step execution ID as the tie breaker if start time is identical
243                        if (latest.getStartTime().getTime() == stepExecution.getStartTime().getTime() && 
244                                latest.getId() < stepExecution.getId()) {
245                                latest = stepExecution;
246                        }
247                }
248
249                if (latest != null) {
250                        ExecutionContext stepExecutionContext = ecDao.getExecutionContext(latest);
251                        latest.setExecutionContext(stepExecutionContext);
252                        ExecutionContext jobExecutionContext = ecDao.getExecutionContext(latest.getJobExecution());
253                        latest.getJobExecution().setExecutionContext(jobExecutionContext);
254                }
255
256                return latest;
257        }
258
259        /**
260         * @return number of executions of the step within given job instance
261         */
262        @Override
263        public int getStepExecutionCount(JobInstance jobInstance, String stepName) {
264                int count = 0;
265                List<JobExecution> jobExecutions = jobExecutionDao.findJobExecutions(jobInstance);
266                for (JobExecution jobExecution : jobExecutions) {
267                        stepExecutionDao.addStepExecutions(jobExecution);
268                        for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
269                                if (stepName.equals(stepExecution.getStepName())) {
270                                        count++;
271                                }
272                        }
273                }
274                return count;
275        }
276
277        /**
278         * Check to determine whether or not the JobExecution that is the parent of
279         * the provided StepExecution has been interrupted. If, after synchronizing
280         * the status with the database, the status has been updated to STOPPING,
281         * then the job has been interrupted.
282         *
283         * @param stepExecution
284         */
285        private void checkForInterruption(StepExecution stepExecution) {
286                JobExecution jobExecution = stepExecution.getJobExecution();
287                jobExecutionDao.synchronizeStatus(jobExecution);
288                if (jobExecution.isStopping()) {
289                        logger.info("Parent JobExecution is stopped, so passing message on to StepExecution");
290                        stepExecution.setTerminateOnly();
291                }
292        }
293
294        @Override
295        @Nullable
296        public JobExecution getLastJobExecution(String jobName, JobParameters jobParameters) {
297                JobInstance jobInstance = jobInstanceDao.getJobInstance(jobName, jobParameters);
298                if (jobInstance == null) {
299                        return null;
300                }
301                JobExecution jobExecution = jobExecutionDao.getLastJobExecution(jobInstance);
302
303                if (jobExecution != null) {
304                        jobExecution.setExecutionContext(ecDao.getExecutionContext(jobExecution));
305                        stepExecutionDao.addStepExecutions(jobExecution);
306                }
307                return jobExecution;
308
309        }
310
311        @Override
312        public JobInstance createJobInstance(String jobName, JobParameters jobParameters) {
313                Assert.notNull(jobName, "A job name is required to create a JobInstance");
314                Assert.notNull(jobParameters, "Job parameters are required to create a JobInstance");
315
316                JobInstance jobInstance = jobInstanceDao.createJobInstance(jobName, jobParameters);
317
318                return jobInstance;
319        }
320
321        @Override
322        public JobExecution createJobExecution(JobInstance jobInstance,
323                        JobParameters jobParameters, String jobConfigurationLocation) {
324
325                Assert.notNull(jobInstance, "A JobInstance is required to associate the JobExecution with");
326                Assert.notNull(jobParameters, "A JobParameters object is required to create a JobExecution");
327
328                JobExecution jobExecution = new JobExecution(jobInstance, jobParameters, jobConfigurationLocation);
329                ExecutionContext executionContext = new ExecutionContext();
330                jobExecution.setExecutionContext(executionContext);
331                jobExecution.setLastUpdated(new Date(System.currentTimeMillis()));
332
333                // Save the JobExecution so that it picks up an ID (useful for clients
334                // monitoring asynchronous executions):
335                jobExecutionDao.saveJobExecution(jobExecution);
336                ecDao.saveExecutionContext(jobExecution);
337
338                return jobExecution;
339        }
340}