001/* 002 * Copyright 2006-2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.item.database; 018 019import java.util.HashMap; 020import java.util.List; 021import java.util.Map; 022import java.util.concurrent.CopyOnWriteArrayList; 023 024import javax.persistence.EntityManager; 025import javax.persistence.EntityManagerFactory; 026import javax.persistence.EntityTransaction; 027import javax.persistence.Query; 028 029import org.springframework.batch.item.ExecutionContext; 030import org.springframework.batch.item.database.orm.JpaQueryProvider; 031import org.springframework.dao.DataAccessResourceFailureException; 032import org.springframework.util.Assert; 033import org.springframework.util.ClassUtils; 034 035/** 036 * <p> 037 * {@link org.springframework.batch.item.ItemReader} for reading database 038 * records built on top of JPA. 039 * </p> 040 * 041 * <p> 042 * It executes the JPQL {@link #setQueryString(String)} to retrieve requested 043 * data. The query is executed using paged requests of a size specified in 044 * {@link #setPageSize(int)}. Additional pages are requested when needed as 045 * {@link #read()} method is called, returning an object corresponding to 046 * current position. 047 * </p> 048 * 049 * <p> 050 * The performance of the paging depends on the JPA implementation and its use 051 * of database specific features to limit the number of returned rows. 052 * </p> 053 * 054 * <p> 055 * Setting a fairly large page size and using a commit interval that matches the 056 * page size should provide better performance. 057 * </p> 058 * 059 * <p> 060 * In order to reduce the memory usage for large results the persistence context 061 * is flushed and cleared after each page is read. This causes any entities read 062 * to be detached. If you make changes to the entities and want the changes 063 * persisted then you must explicitly merge the entities. 064 * </p> 065 * 066 * <p> 067 * The reader must be configured with an 068 * {@link javax.persistence.EntityManagerFactory}. All entity access is 069 * performed within a new transaction, independent of any existing Spring 070 * managed transactions. 071 * </p> 072 * 073 * <p> 074 * The implementation is thread-safe in between calls to 075 * {@link #open(ExecutionContext)}, but remember to use 076 * <code>saveState=false</code> if used in a multi-threaded client (no restart 077 * available). 078 * </p> 079 * 080 * 081 * @author Thomas Risberg 082 * @author Dave Syer 083 * @author Will Schipp 084 * @since 2.0 085 */ 086public class JpaPagingItemReader<T> extends AbstractPagingItemReader<T> { 087 088 private EntityManagerFactory entityManagerFactory; 089 090 private EntityManager entityManager; 091 092 private final Map<String, Object> jpaPropertyMap = new HashMap<>(); 093 094 private String queryString; 095 096 private JpaQueryProvider queryProvider; 097 098 private Map<String, Object> parameterValues; 099 100 private boolean transacted = true;//default value 101 102 public JpaPagingItemReader() { 103 setName(ClassUtils.getShortName(JpaPagingItemReader.class)); 104 } 105 106 /** 107 * Create a query using an appropriate query provider (entityManager OR 108 * queryProvider). 109 */ 110 private Query createQuery() { 111 if (queryProvider == null) { 112 return entityManager.createQuery(queryString); 113 } 114 else { 115 return queryProvider.createQuery(); 116 } 117 } 118 119 public void setEntityManagerFactory(EntityManagerFactory entityManagerFactory) { 120 this.entityManagerFactory = entityManagerFactory; 121 } 122 123 /** 124 * The parameter values to be used for the query execution. 125 * 126 * @param parameterValues the values keyed by the parameter named used in 127 * the query string. 128 */ 129 public void setParameterValues(Map<String, Object> parameterValues) { 130 this.parameterValues = parameterValues; 131 } 132 133 /** 134 * By default (true) the EntityTransaction will be started and committed around the read. 135 * Can be overridden (false) in cases where the JPA implementation doesn't support a 136 * particular transaction. (e.g. Hibernate with a JTA transaction). NOTE: may cause 137 * problems in guaranteeing the object consistency in the EntityManagerFactory. 138 * 139 * @param transacted indicator 140 */ 141 public void setTransacted(boolean transacted) { 142 this.transacted = transacted; 143 } 144 145 @Override 146 public void afterPropertiesSet() throws Exception { 147 super.afterPropertiesSet(); 148 149 if (queryProvider == null) { 150 Assert.notNull(entityManagerFactory, "EntityManager is required when queryProvider is null"); 151 Assert.hasLength(queryString, "Query string is required when queryProvider is null"); 152 } 153 } 154 155 /** 156 * @param queryString JPQL query string 157 */ 158 public void setQueryString(String queryString) { 159 this.queryString = queryString; 160 } 161 162 /** 163 * @param queryProvider JPA query provider 164 */ 165 public void setQueryProvider(JpaQueryProvider queryProvider) { 166 this.queryProvider = queryProvider; 167 } 168 169 @Override 170 protected void doOpen() throws Exception { 171 super.doOpen(); 172 173 entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap); 174 if (entityManager == null) { 175 throw new DataAccessResourceFailureException("Unable to obtain an EntityManager"); 176 } 177 // set entityManager to queryProvider, so it participates 178 // in JpaPagingItemReader's managed transaction 179 if (queryProvider != null) { 180 queryProvider.setEntityManager(entityManager); 181 } 182 183 } 184 185 @Override 186 @SuppressWarnings("unchecked") 187 protected void doReadPage() { 188 189 EntityTransaction tx = null; 190 191 if (transacted) { 192 tx = entityManager.getTransaction(); 193 tx.begin(); 194 195 entityManager.flush(); 196 entityManager.clear(); 197 }//end if 198 199 Query query = createQuery().setFirstResult(getPage() * getPageSize()).setMaxResults(getPageSize()); 200 201 if (parameterValues != null) { 202 for (Map.Entry<String, Object> me : parameterValues.entrySet()) { 203 query.setParameter(me.getKey(), me.getValue()); 204 } 205 } 206 207 if (results == null) { 208 results = new CopyOnWriteArrayList<>(); 209 } 210 else { 211 results.clear(); 212 } 213 214 if (!transacted) { 215 List<T> queryResult = query.getResultList(); 216 for (T entity : queryResult) { 217 entityManager.detach(entity); 218 results.add(entity); 219 }//end if 220 } else { 221 results.addAll(query.getResultList()); 222 tx.commit(); 223 }//end if 224 } 225 226 @Override 227 protected void doJumpToPage(int itemIndex) { 228 } 229 230 @Override 231 protected void doClose() throws Exception { 232 entityManager.close(); 233 super.doClose(); 234 } 235 236}