001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.repository.dao; 017 018import java.lang.reflect.Field; 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.Collections; 022import java.util.Comparator; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.atomic.AtomicLong; 027 028import org.springframework.batch.core.Entity; 029import org.springframework.batch.core.JobExecution; 030import org.springframework.batch.core.StepExecution; 031import org.springframework.dao.OptimisticLockingFailureException; 032import org.springframework.lang.Nullable; 033import org.springframework.util.Assert; 034import org.springframework.util.ReflectionUtils; 035import org.springframework.util.SerializationUtils; 036 037/** 038 * In-memory implementation of {@link StepExecutionDao}. 039 */ 040public class MapStepExecutionDao implements StepExecutionDao { 041 042 private Map<Long, Map<Long, StepExecution>> executionsByJobExecutionId = new ConcurrentHashMap<Long, Map<Long,StepExecution>>(); 043 044 private Map<Long, StepExecution> executionsByStepExecutionId = new ConcurrentHashMap<Long, StepExecution>(); 045 046 private AtomicLong currentId = new AtomicLong(); 047 048 public void clear() { 049 executionsByJobExecutionId.clear(); 050 executionsByStepExecutionId.clear(); 051 } 052 053 private static StepExecution copy(StepExecution original) { 054 return (StepExecution) SerializationUtils.deserialize(SerializationUtils.serialize(original)); 055 } 056 057 private static void copy(final StepExecution sourceExecution, final StepExecution targetExecution) { 058 // Cheaper than full serialization is a reflective field copy, which is 059 // fine for volatile storage 060 ReflectionUtils.doWithFields(StepExecution.class, new ReflectionUtils.FieldCallback() { 061 @Override 062 public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException { 063 field.setAccessible(true); 064 field.set(targetExecution, field.get(sourceExecution)); 065 } 066 }, ReflectionUtils.COPYABLE_FIELDS); 067 } 068 069 @Override 070 public void saveStepExecution(StepExecution stepExecution) { 071 072 Assert.isTrue(stepExecution.getId() == null, "stepExecution id was not null"); 073 Assert.isTrue(stepExecution.getVersion() == null, "stepExecution version was not null"); 074 Assert.notNull(stepExecution.getJobExecutionId(), "JobExecution must be saved already."); 075 076 Map<Long, StepExecution> executions = executionsByJobExecutionId.get(stepExecution.getJobExecutionId()); 077 if (executions == null) { 078 executions = new ConcurrentHashMap<Long, StepExecution>(); 079 executionsByJobExecutionId.put(stepExecution.getJobExecutionId(), executions); 080 } 081 082 stepExecution.setId(currentId.incrementAndGet()); 083 stepExecution.incrementVersion(); 084 StepExecution copy = copy(stepExecution); 085 executions.put(stepExecution.getId(), copy); 086 executionsByStepExecutionId.put(stepExecution.getId(), copy); 087 088 } 089 090 @Override 091 public void updateStepExecution(StepExecution stepExecution) { 092 093 Assert.notNull(stepExecution.getJobExecutionId(), "jobExecution id is null"); 094 095 Map<Long, StepExecution> executions = executionsByJobExecutionId.get(stepExecution.getJobExecutionId()); 096 Assert.notNull(executions, "step executions for given job execution are expected to be already saved"); 097 098 final StepExecution persistedExecution = executionsByStepExecutionId.get(stepExecution.getId()); 099 Assert.notNull(persistedExecution, "step execution is expected to be already saved"); 100 101 synchronized (stepExecution) { 102 if (!persistedExecution.getVersion().equals(stepExecution.getVersion())) { 103 throw new OptimisticLockingFailureException("Attempt to update step execution id=" 104 + stepExecution.getId() + " with wrong version (" + stepExecution.getVersion() 105 + "), where current version is " + persistedExecution.getVersion()); 106 } 107 108 stepExecution.incrementVersion(); 109 StepExecution copy = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution()); 110 copy(stepExecution, copy); 111 executions.put(stepExecution.getId(), copy); 112 executionsByStepExecutionId.put(stepExecution.getId(), copy); 113 } 114 } 115 116 @Override 117 @Nullable 118 public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId) { 119 return executionsByStepExecutionId.get(stepExecutionId); 120 } 121 122 @Override 123 public void addStepExecutions(JobExecution jobExecution) { 124 Map<Long, StepExecution> executions = executionsByJobExecutionId.get(jobExecution.getId()); 125 if (executions == null || executions.isEmpty()) { 126 return; 127 } 128 List<StepExecution> result = new ArrayList<StepExecution>(executions.values()); 129 Collections.sort(result, new Comparator<Entity>() { 130 131 @Override 132 public int compare(Entity o1, Entity o2) { 133 return Long.signum(o2.getId() - o1.getId()); 134 } 135 }); 136 137 List<StepExecution> copy = new ArrayList<StepExecution>(result.size()); 138 for (StepExecution exec : result) { 139 copy.add(copy(exec)); 140 } 141 jobExecution.addStepExecutions(copy); 142 } 143 144 @Override 145 public void saveStepExecutions(Collection<StepExecution> stepExecutions) { 146 Assert.notNull(stepExecutions,"Attempt to save an null collect of step executions"); 147 for (StepExecution stepExecution: stepExecutions) { 148 saveStepExecution(stepExecution); 149 } 150 } 151}