001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.repository.dao;
018
019import java.util.ArrayList;
020import java.util.Collections;
021import java.util.Comparator;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Set;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentMap;
027import java.util.concurrent.atomic.AtomicLong;
028
029import org.springframework.batch.core.JobExecution;
030import org.springframework.batch.core.JobInstance;
031import org.springframework.dao.OptimisticLockingFailureException;
032import org.springframework.lang.Nullable;
033import org.springframework.util.Assert;
034import org.springframework.util.SerializationUtils;
035
036/**
037 * In-memory implementation of {@link JobExecutionDao}.
038 */
039public class MapJobExecutionDao implements JobExecutionDao {
040
041        // JDK6 Make this into a ConcurrentSkipListMap: adds and removes tend to be very near the front or back
042        private final ConcurrentMap<Long, JobExecution> executionsById = new ConcurrentHashMap<Long, JobExecution>();
043
044        private final AtomicLong currentId = new AtomicLong(0L);
045
046        public void clear() {
047                executionsById.clear();
048        }
049
050        private static JobExecution copy(JobExecution original) {
051                JobExecution copy = (JobExecution) SerializationUtils.deserialize(SerializationUtils.serialize(original));
052                return copy;
053        }
054
055        @Override
056        public void saveJobExecution(JobExecution jobExecution) {
057                Assert.isTrue(jobExecution.getId() == null, "jobExecution id is not null");
058                Long newId = currentId.getAndIncrement();
059                jobExecution.setId(newId);
060                jobExecution.incrementVersion();
061                executionsById.put(newId, copy(jobExecution));
062        }
063
064        @Override
065        public List<JobExecution> findJobExecutions(JobInstance jobInstance) {
066                List<JobExecution> executions = new ArrayList<JobExecution>();
067                for (JobExecution exec : executionsById.values()) {
068                        if (exec.getJobInstance().equals(jobInstance)) {
069                                executions.add(copy(exec));
070                        }
071                }
072                Collections.sort(executions, new Comparator<JobExecution>() {
073
074                        @Override
075                        public int compare(JobExecution e1, JobExecution e2) {
076                                long result = (e1.getId() - e2.getId());
077                                if (result > 0) {
078                                        return -1;
079                                }
080                                else if (result < 0) {
081                                        return 1;
082                                }
083                                else {
084                                        return 0;
085                                }
086                        }
087                });
088                return executions;
089        }
090
091        @Override
092        public void updateJobExecution(JobExecution jobExecution) {
093                Long id = jobExecution.getId();
094                Assert.notNull(id, "JobExecution is expected to have an id (should be saved already)");
095                JobExecution persistedExecution = executionsById.get(id);
096                Assert.notNull(persistedExecution, "JobExecution must already be saved");
097
098                synchronized (jobExecution) {
099                        if (!persistedExecution.getVersion().equals(jobExecution.getVersion())) {
100                                throw new OptimisticLockingFailureException("Attempt to update job execution id=" + id
101                                                + " with wrong version (" + jobExecution.getVersion() + "), where current version is "
102                                                + persistedExecution.getVersion());
103                        }
104                        jobExecution.incrementVersion();
105                        executionsById.put(id, copy(jobExecution));
106                }
107        }
108
109        @Override
110        public JobExecution getLastJobExecution(@Nullable JobInstance jobInstance) {
111                JobExecution lastExec = null;
112                for (JobExecution exec : executionsById.values()) {
113                        if (!exec.getJobInstance().equals(jobInstance)) {
114                                continue;
115                        }
116                        if (lastExec == null) {
117                                lastExec = exec;
118                        }
119                        if (lastExec.getCreateTime().before(exec.getCreateTime())) {
120                                lastExec = exec;
121                        }
122                }
123                return copy(lastExec);
124        }
125
126        /*
127         * (non-Javadoc)
128         *
129         * @seeorg.springframework.batch.core.repository.dao.JobExecutionDao#
130         * findRunningJobExecutions(java.lang.String)
131         */
132        @Override
133        public Set<JobExecution> findRunningJobExecutions(String jobName) {
134                Set<JobExecution> result = new HashSet<JobExecution>();
135                for (JobExecution exec : executionsById.values()) {
136                        if (!exec.getJobInstance().getJobName().equals(jobName) || !exec.isRunning()) {
137                                continue;
138                        }
139                        result.add(copy(exec));
140                }
141                return result;
142        }
143
144        /*
145         * (non-Javadoc)
146         *
147         * @see
148         * org.springframework.batch.core.repository.dao.JobExecutionDao#getJobExecution
149         * (java.lang.Long)
150         */
151        @Override
152        @Nullable
153        public JobExecution getJobExecution(Long executionId) {
154                return copy(executionsById.get(executionId));
155        }
156
157        @Override
158        public void synchronizeStatus(JobExecution jobExecution) {
159                JobExecution saved = getJobExecution(jobExecution.getId());
160                if (saved.getVersion().intValue() != jobExecution.getVersion().intValue()) {
161                        jobExecution.upgradeStatus(saved.getStatus());
162                        jobExecution.setVersion(saved.getVersion());
163                }
164        }
165}