001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.core.repository.dao; 018 019import java.util.ArrayList; 020import java.util.Collections; 021import java.util.Comparator; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Set; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentMap; 027import java.util.concurrent.atomic.AtomicLong; 028 029import org.springframework.batch.core.JobExecution; 030import org.springframework.batch.core.JobInstance; 031import org.springframework.dao.OptimisticLockingFailureException; 032import org.springframework.lang.Nullable; 033import org.springframework.util.Assert; 034import org.springframework.util.SerializationUtils; 035 036/** 037 * In-memory implementation of {@link JobExecutionDao}. 038 */ 039public class MapJobExecutionDao implements JobExecutionDao { 040 041 // JDK6 Make this into a ConcurrentSkipListMap: adds and removes tend to be very near the front or back 042 private final ConcurrentMap<Long, JobExecution> executionsById = new ConcurrentHashMap<Long, JobExecution>(); 043 044 private final AtomicLong currentId = new AtomicLong(0L); 045 046 public void clear() { 047 executionsById.clear(); 048 } 049 050 private static JobExecution copy(JobExecution original) { 051 JobExecution copy = (JobExecution) SerializationUtils.deserialize(SerializationUtils.serialize(original)); 052 return copy; 053 } 054 055 @Override 056 public void saveJobExecution(JobExecution jobExecution) { 057 Assert.isTrue(jobExecution.getId() == null, "jobExecution id is not null"); 058 Long newId = currentId.getAndIncrement(); 059 jobExecution.setId(newId); 060 jobExecution.incrementVersion(); 061 executionsById.put(newId, copy(jobExecution)); 062 } 063 064 @Override 065 public List<JobExecution> findJobExecutions(JobInstance jobInstance) { 066 List<JobExecution> executions = new ArrayList<JobExecution>(); 067 for (JobExecution exec : executionsById.values()) { 068 if (exec.getJobInstance().equals(jobInstance)) { 069 executions.add(copy(exec)); 070 } 071 } 072 Collections.sort(executions, new Comparator<JobExecution>() { 073 074 @Override 075 public int compare(JobExecution e1, JobExecution e2) { 076 long result = (e1.getId() - e2.getId()); 077 if (result > 0) { 078 return -1; 079 } 080 else if (result < 0) { 081 return 1; 082 } 083 else { 084 return 0; 085 } 086 } 087 }); 088 return executions; 089 } 090 091 @Override 092 public void updateJobExecution(JobExecution jobExecution) { 093 Long id = jobExecution.getId(); 094 Assert.notNull(id, "JobExecution is expected to have an id (should be saved already)"); 095 JobExecution persistedExecution = executionsById.get(id); 096 Assert.notNull(persistedExecution, "JobExecution must already be saved"); 097 098 synchronized (jobExecution) { 099 if (!persistedExecution.getVersion().equals(jobExecution.getVersion())) { 100 throw new OptimisticLockingFailureException("Attempt to update job execution id=" + id 101 + " with wrong version (" + jobExecution.getVersion() + "), where current version is " 102 + persistedExecution.getVersion()); 103 } 104 jobExecution.incrementVersion(); 105 executionsById.put(id, copy(jobExecution)); 106 } 107 } 108 109 @Override 110 public JobExecution getLastJobExecution(@Nullable JobInstance jobInstance) { 111 JobExecution lastExec = null; 112 for (JobExecution exec : executionsById.values()) { 113 if (!exec.getJobInstance().equals(jobInstance)) { 114 continue; 115 } 116 if (lastExec == null) { 117 lastExec = exec; 118 } 119 if (lastExec.getCreateTime().before(exec.getCreateTime())) { 120 lastExec = exec; 121 } 122 } 123 return copy(lastExec); 124 } 125 126 /* 127 * (non-Javadoc) 128 * 129 * @seeorg.springframework.batch.core.repository.dao.JobExecutionDao# 130 * findRunningJobExecutions(java.lang.String) 131 */ 132 @Override 133 public Set<JobExecution> findRunningJobExecutions(String jobName) { 134 Set<JobExecution> result = new HashSet<JobExecution>(); 135 for (JobExecution exec : executionsById.values()) { 136 if (!exec.getJobInstance().getJobName().equals(jobName) || !exec.isRunning()) { 137 continue; 138 } 139 result.add(copy(exec)); 140 } 141 return result; 142 } 143 144 /* 145 * (non-Javadoc) 146 * 147 * @see 148 * org.springframework.batch.core.repository.dao.JobExecutionDao#getJobExecution 149 * (java.lang.Long) 150 */ 151 @Override 152 @Nullable 153 public JobExecution getJobExecution(Long executionId) { 154 return copy(executionsById.get(executionId)); 155 } 156 157 @Override 158 public void synchronizeStatus(JobExecution jobExecution) { 159 JobExecution saved = getJobExecution(jobExecution.getId()); 160 if (saved.getVersion().intValue() != jobExecution.getVersion().intValue()) { 161 jobExecution.upgradeStatus(saved.getStatus()); 162 jobExecution.setVersion(saved.getVersion()); 163 } 164 } 165}