001/*
002 * Copyright 2006-2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.item.database;
018
019import java.util.HashMap;
020import java.util.List;
021import java.util.Map;
022import java.util.concurrent.CopyOnWriteArrayList;
023
024import javax.persistence.EntityManager;
025import javax.persistence.EntityManagerFactory;
026import javax.persistence.EntityTransaction;
027import javax.persistence.Query;
028
029import org.springframework.batch.item.ExecutionContext;
030import org.springframework.batch.item.database.orm.JpaQueryProvider;
031import org.springframework.dao.DataAccessResourceFailureException;
032import org.springframework.util.Assert;
033import org.springframework.util.ClassUtils;
034
035/**
036 * <p>
037 * {@link org.springframework.batch.item.ItemReader} for reading database
038 * records built on top of JPA.
039 * </p>
040 *
041 * <p>
042 * It executes the JPQL {@link #setQueryString(String)} to retrieve requested
043 * data. The query is executed using paged requests of a size specified in
044 * {@link #setPageSize(int)}. Additional pages are requested when needed as
045 * {@link #read()} method is called, returning an object corresponding to
046 * current position.
047 * </p>
048 *
049 * <p>
050 * The performance of the paging depends on the JPA implementation and its use
051 * of database specific features to limit the number of returned rows.
052 * </p>
053 *
054 * <p>
055 * Setting a fairly large page size and using a commit interval that matches the
056 * page size should provide better performance.
057 * </p>
058 *
059 * <p>
060 * In order to reduce the memory usage for large results the persistence context
061 * is flushed and cleared after each page is read. This causes any entities read
062 * to be detached. If you make changes to the entities and want the changes
063 * persisted then you must explicitly merge the entities.
064 * </p>
065 *
066 * <p>
067 * The reader must be configured with an
068 * {@link javax.persistence.EntityManagerFactory}. All entity access is
069 * performed within a new transaction, independent of any existing Spring
070 * managed transactions.
071 * </p>
072 *
073 * <p>
074 * The implementation is thread-safe in between calls to
075 * {@link #open(ExecutionContext)}, but remember to use
076 * <code>saveState=false</code> if used in a multi-threaded client (no restart
077 * available).
078 * </p>
079 *
080 *
081 * @author Thomas Risberg
082 * @author Dave Syer
083 * @author Will Schipp
084 * @since 2.0
085 */
086public class JpaPagingItemReader<T> extends AbstractPagingItemReader<T> {
087
088        private EntityManagerFactory entityManagerFactory;
089
090        private EntityManager entityManager;
091
092        private final Map<String, Object> jpaPropertyMap = new HashMap<>();
093
094        private String queryString;
095
096        private JpaQueryProvider queryProvider;
097
098        private Map<String, Object> parameterValues;
099        
100        private boolean transacted = true;//default value
101
102        public JpaPagingItemReader() {
103                setName(ClassUtils.getShortName(JpaPagingItemReader.class));
104        }
105
106        /**
107         * Create a query using an appropriate query provider (entityManager OR
108         * queryProvider).
109         */
110        private Query createQuery() {
111                if (queryProvider == null) {
112                        return entityManager.createQuery(queryString);
113                }
114                else {
115                        return queryProvider.createQuery();
116                }
117        }
118
119        public void setEntityManagerFactory(EntityManagerFactory entityManagerFactory) {
120                this.entityManagerFactory = entityManagerFactory;
121        }
122
123        /**
124         * The parameter values to be used for the query execution.
125         *
126         * @param parameterValues the values keyed by the parameter named used in
127         * the query string.
128         */
129        public void setParameterValues(Map<String, Object> parameterValues) {
130                this.parameterValues = parameterValues;
131        }
132        
133        /**
134         * By default (true) the EntityTransaction will be started and committed around the read.  
135         * Can be overridden (false) in cases where the JPA implementation doesn't support a 
136         * particular transaction.  (e.g. Hibernate with a JTA transaction).  NOTE: may cause 
137         * problems in guaranteeing the object consistency in the EntityManagerFactory.
138         * 
139         * @param transacted indicator
140         */
141        public void setTransacted(boolean transacted) {
142                this.transacted = transacted;
143        }       
144
145        @Override
146        public void afterPropertiesSet() throws Exception {
147                super.afterPropertiesSet();
148
149                if (queryProvider == null) {
150                        Assert.notNull(entityManagerFactory, "EntityManager is required when queryProvider is null");
151                        Assert.hasLength(queryString, "Query string is required when queryProvider is null");
152                }
153        }
154
155        /**
156         * @param queryString JPQL query string
157         */
158        public void setQueryString(String queryString) {
159                this.queryString = queryString;
160        }
161
162        /**
163         * @param queryProvider JPA query provider
164         */
165        public void setQueryProvider(JpaQueryProvider queryProvider) {
166                this.queryProvider = queryProvider;
167        }
168
169        @Override
170        protected void doOpen() throws Exception {
171                super.doOpen();
172
173                entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);
174                if (entityManager == null) {
175                        throw new DataAccessResourceFailureException("Unable to obtain an EntityManager");
176                }
177                // set entityManager to queryProvider, so it participates
178                // in JpaPagingItemReader's managed transaction
179                if (queryProvider != null) {
180                        queryProvider.setEntityManager(entityManager);
181                }
182
183        }
184
185        @Override
186        @SuppressWarnings("unchecked")
187        protected void doReadPage() {
188
189                EntityTransaction tx = null;
190                
191                if (transacted) {
192                        tx = entityManager.getTransaction();
193                        tx.begin();
194                        
195                        entityManager.flush();
196                        entityManager.clear();
197                }//end if
198
199                Query query = createQuery().setFirstResult(getPage() * getPageSize()).setMaxResults(getPageSize());
200
201                if (parameterValues != null) {
202                        for (Map.Entry<String, Object> me : parameterValues.entrySet()) {
203                                query.setParameter(me.getKey(), me.getValue());
204                        }
205                }
206
207                if (results == null) {
208                        results = new CopyOnWriteArrayList<>();
209                }
210                else {
211                        results.clear();
212                }
213                
214                if (!transacted) {
215                        List<T> queryResult = query.getResultList();
216                        for (T entity : queryResult) {
217                                entityManager.detach(entity);
218                                results.add(entity);
219                        }//end if
220                } else {
221                        results.addAll(query.getResultList());
222                        tx.commit();
223                }//end if
224        }
225
226        @Override
227        protected void doJumpToPage(int itemIndex) {
228        }
229
230        @Override
231        protected void doClose() throws Exception {
232                entityManager.close();
233                super.doClose();
234        }
235
236}