001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.repository.dao;
017
018import java.lang.reflect.Field;
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.Collections;
022import java.util.Comparator;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.atomic.AtomicLong;
027
028import org.springframework.batch.core.Entity;
029import org.springframework.batch.core.JobExecution;
030import org.springframework.batch.core.StepExecution;
031import org.springframework.dao.OptimisticLockingFailureException;
032import org.springframework.lang.Nullable;
033import org.springframework.util.Assert;
034import org.springframework.util.ReflectionUtils;
035import org.springframework.util.SerializationUtils;
036
037/**
038 * In-memory implementation of {@link StepExecutionDao}.
039 */
040public class MapStepExecutionDao implements StepExecutionDao {
041
042        private Map<Long, Map<Long, StepExecution>> executionsByJobExecutionId = new ConcurrentHashMap<Long, Map<Long,StepExecution>>();
043
044        private Map<Long, StepExecution> executionsByStepExecutionId = new ConcurrentHashMap<Long, StepExecution>();
045
046        private AtomicLong currentId = new AtomicLong();
047
048        public void clear() {
049                executionsByJobExecutionId.clear();
050                executionsByStepExecutionId.clear();
051        }
052
053        private static StepExecution copy(StepExecution original) {
054                return (StepExecution) SerializationUtils.deserialize(SerializationUtils.serialize(original));
055        }
056
057        private static void copy(final StepExecution sourceExecution, final StepExecution targetExecution) {
058                // Cheaper than full serialization is a reflective field copy, which is
059                // fine for volatile storage
060                ReflectionUtils.doWithFields(StepExecution.class, new ReflectionUtils.FieldCallback() {
061                        @Override
062                        public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
063                                field.setAccessible(true);
064                                field.set(targetExecution, field.get(sourceExecution));
065                        }
066                }, ReflectionUtils.COPYABLE_FIELDS);
067        }
068
069        @Override
070        public void saveStepExecution(StepExecution stepExecution) {
071
072                Assert.isTrue(stepExecution.getId() == null, "stepExecution id was not null");
073                Assert.isTrue(stepExecution.getVersion() == null, "stepExecution version was not null");
074                Assert.notNull(stepExecution.getJobExecutionId(), "JobExecution must be saved already.");
075
076                Map<Long, StepExecution> executions = executionsByJobExecutionId.get(stepExecution.getJobExecutionId());
077                if (executions == null) {
078                        executions = new ConcurrentHashMap<Long, StepExecution>();
079                        executionsByJobExecutionId.put(stepExecution.getJobExecutionId(), executions);
080                }
081
082                stepExecution.setId(currentId.incrementAndGet());
083                stepExecution.incrementVersion();
084                StepExecution copy = copy(stepExecution);
085                executions.put(stepExecution.getId(), copy);
086                executionsByStepExecutionId.put(stepExecution.getId(), copy);
087
088        }
089
090        @Override
091        public void updateStepExecution(StepExecution stepExecution) {
092
093                Assert.notNull(stepExecution.getJobExecutionId(), "jobExecution id is null");
094
095                Map<Long, StepExecution> executions = executionsByJobExecutionId.get(stepExecution.getJobExecutionId());
096                Assert.notNull(executions, "step executions for given job execution are expected to be already saved");
097
098                final StepExecution persistedExecution = executionsByStepExecutionId.get(stepExecution.getId());
099                Assert.notNull(persistedExecution, "step execution is expected to be already saved");
100
101                synchronized (stepExecution) {
102                        if (!persistedExecution.getVersion().equals(stepExecution.getVersion())) {
103                                throw new OptimisticLockingFailureException("Attempt to update step execution id="
104                                                + stepExecution.getId() + " with wrong version (" + stepExecution.getVersion()
105                                                + "), where current version is " + persistedExecution.getVersion());
106                        }
107
108                        stepExecution.incrementVersion();
109                        StepExecution copy = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution());
110                        copy(stepExecution, copy);
111                        executions.put(stepExecution.getId(), copy);
112                        executionsByStepExecutionId.put(stepExecution.getId(), copy);
113                }
114        }
115
116        @Override
117        @Nullable
118        public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId) {
119                return executionsByStepExecutionId.get(stepExecutionId);
120        }
121
122        @Override
123        public void addStepExecutions(JobExecution jobExecution) {
124                Map<Long, StepExecution> executions = executionsByJobExecutionId.get(jobExecution.getId());
125                if (executions == null || executions.isEmpty()) {
126                        return;
127                }
128                List<StepExecution> result = new ArrayList<StepExecution>(executions.values());
129                Collections.sort(result, new Comparator<Entity>() {
130
131                        @Override
132                        public int compare(Entity o1, Entity o2) {
133                                return Long.signum(o2.getId() - o1.getId());
134                        }
135                });
136
137                List<StepExecution> copy = new ArrayList<StepExecution>(result.size());
138                for (StepExecution exec : result) {
139                        copy.add(copy(exec));
140                }
141                jobExecution.addStepExecutions(copy);
142        }
143
144        @Override
145        public void saveStepExecutions(Collection<StepExecution> stepExecutions) {
146                Assert.notNull(stepExecutions,"Attempt to save an null collect of step executions");
147                for (StepExecution stepExecution: stepExecutions) {
148                        saveStepExecution(stepExecution);
149                }
150        }
151}