001/*
002 * Copyright 2006-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.item.database;
017
018import java.util.Map;
019
020import org.hibernate.ScrollableResults;
021import org.hibernate.Session;
022import org.hibernate.SessionFactory;
023import org.hibernate.StatelessSession;
024import org.springframework.batch.item.ExecutionContext;
025import org.springframework.batch.item.ItemStreamReader;
026import org.springframework.batch.item.ItemStreamException;
027import org.springframework.batch.item.database.orm.HibernateQueryProvider;
028import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
029import org.springframework.beans.factory.InitializingBean;
030import org.springframework.util.Assert;
031import org.springframework.util.ClassUtils;
032
033/**
034 * {@link ItemStreamReader} for reading database records built on top of Hibernate. It
035 * executes the HQL query when initialized iterates over the result set as
036 * {@link #read()} method is called, returning an object corresponding to
037 * current row. The query can be set directly using
038 * {@link #setQueryString(String)}, a named query can be used by
039 * {@link #setQueryName(String)}, or a query provider strategy can be supplied
040 * via {@link #setQueryProvider(HibernateQueryProvider)}.
041 *
042 *
043 * <p>
044 * The reader can be configured to use either {@link StatelessSession}
045 * sufficient for simple mappings without the need to cascade to associated
046 * objects or standard hibernate {@link Session} for more advanced mappings or
047 * when caching is desired. When stateful session is used it will be cleared in
048 * the {@link #update(ExecutionContext)} method without being flushed (no data
049 * modifications are expected).
050 * </p>
051 *
052 * The implementation is <b>not</b> thread-safe.
053 *
054 * @author Robert Kasanicky
055 * @author Dave Syer
056 */
057public class HibernateCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> 
058        implements InitializingBean {
059
060        private HibernateItemReaderHelper<T> helper = new HibernateItemReaderHelper<>();
061
062        public HibernateCursorItemReader() {
063                setName(ClassUtils.getShortName(HibernateCursorItemReader.class));
064        }
065
066        private ScrollableResults cursor;
067
068        private boolean initialized = false;
069
070        private int fetchSize;
071
072        private Map<String, Object> parameterValues;
073
074        @Override
075        public void afterPropertiesSet() throws Exception {
076                Assert.state(fetchSize >= 0, "fetchSize must not be negative");
077                helper.afterPropertiesSet();
078        }
079
080        /**
081         * The parameter values to apply to a query (map of name:value).
082         *
083         * @param parameterValues the parameter values to set
084         */
085        public void setParameterValues(Map<String, Object> parameterValues) {
086                this.parameterValues = parameterValues;
087        }
088
089        /**
090         * A query name for an externalized query. Either this or the {
091         * {@link #setQueryString(String) query string} or the {
092         * {@link #setQueryProvider(HibernateQueryProvider) query provider} should
093         * be set.
094         *
095         * @param queryName name of a hibernate named query
096         */
097        public void setQueryName(String queryName) {
098                helper.setQueryName(queryName);
099        }
100
101        /**
102         * Fetch size used internally by Hibernate to limit amount of data fetched
103         * from database per round trip.
104         *
105         * @param fetchSize the fetch size to pass down to Hibernate
106         */
107        public void setFetchSize(int fetchSize) {
108                this.fetchSize = fetchSize;
109        }
110
111        /**
112         * A query provider. Either this or the {{@link #setQueryString(String)
113         * query string} or the {{@link #setQueryName(String) query name} should be
114         * set.
115         *
116         * @param queryProvider Hibernate query provider
117         */
118        public void setQueryProvider(HibernateQueryProvider<T> queryProvider) {
119                helper.setQueryProvider(queryProvider);
120        }
121
122        /**
123         * A query string in HQL. Either this or the {
124         * {@link #setQueryProvider(HibernateQueryProvider) query provider} or the {
125         * {@link #setQueryName(String) query name} should be set.
126         *
127         * @param queryString HQL query string
128         */
129        public void setQueryString(String queryString) {
130                helper.setQueryString(queryString);
131        }
132
133        /**
134         * The Hibernate SessionFactory to use the create a session.
135         *
136         * @param sessionFactory the {@link SessionFactory} to set
137         */
138        public void setSessionFactory(SessionFactory sessionFactory) {
139                helper.setSessionFactory(sessionFactory);
140        }
141
142        /**
143         * Can be set only in uninitialized state.
144         *
145         * @param useStatelessSession <code>true</code> to use
146         * {@link StatelessSession} <code>false</code> to use standard hibernate
147         * {@link Session}
148         */
149        public void setUseStatelessSession(boolean useStatelessSession) {
150                helper.setUseStatelessSession(useStatelessSession);
151        }
152
153        @Override
154        protected T doRead() throws Exception {
155                if (cursor.next()) {
156                        Object[] data = cursor.get();
157
158                        if (data.length > 1) {
159                                // If there are multiple items this must be a projection
160                                // and T is an array type.
161                                @SuppressWarnings("unchecked")
162                                T item = (T) data;
163                                return item;
164                        }
165                        else {
166                                // Assume if there is only one item that it is the data the user
167                                // wants.
168                                // If there is only one item this is going to be a nasty shock
169                                // if T is an array type but there's not much else we can do...
170                                @SuppressWarnings("unchecked")
171                                T item = (T) data[0];
172                                return item;
173                        }
174
175                }
176                return null;
177        }
178
179        /**
180         * Open hibernate session and create a forward-only cursor for the query.
181         */
182        @Override
183        protected void doOpen() throws Exception {
184                Assert.state(!initialized, "Cannot open an already opened ItemReader, call close first");
185                cursor = helper.getForwardOnlyCursor(fetchSize, parameterValues);
186                initialized = true;
187        }
188
189        /**
190         * Update the context and clear the session if stateful.
191         *
192         * @param executionContext the current {@link ExecutionContext}
193         * @throws ItemStreamException if there is a problem
194         */
195        @Override
196        public void update(ExecutionContext executionContext) throws ItemStreamException {
197                super.update(executionContext);
198                helper.clear();
199        }
200
201        /**
202         * Wind forward through the result set to the item requested. Also clears
203         * the session every now and then (if stateful) to avoid memory problems.
204         * The frequency of session clearing is the larger of the fetch size (if
205         * set) and 100.
206         *
207         * @param itemIndex the first item to read
208         * @throws Exception if there is a problem
209         * @see AbstractItemCountingItemStreamItemReader#jumpToItem(int)
210         */
211        @Override
212        protected void jumpToItem(int itemIndex) throws Exception {
213                int flushSize = Math.max(fetchSize, 100);
214                helper.jumpToItem(cursor, itemIndex, flushSize);
215        }
216
217        /**
218         * Close the cursor and hibernate session.
219         */
220        @Override
221        protected void doClose() throws Exception {
222
223                if(initialized) {
224                        if (cursor != null) {
225                                cursor.close();
226                        }
227
228                        helper.close();
229                }
230
231                initialized = false;
232        }
233}