001/* 002 * Copyright 2006-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.item.database; 017 018import java.util.Map; 019 020import org.hibernate.ScrollableResults; 021import org.hibernate.Session; 022import org.hibernate.SessionFactory; 023import org.hibernate.StatelessSession; 024import org.springframework.batch.item.ExecutionContext; 025import org.springframework.batch.item.ItemStreamReader; 026import org.springframework.batch.item.ItemStreamException; 027import org.springframework.batch.item.database.orm.HibernateQueryProvider; 028import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; 029import org.springframework.beans.factory.InitializingBean; 030import org.springframework.util.Assert; 031import org.springframework.util.ClassUtils; 032 033/** 034 * {@link ItemStreamReader} for reading database records built on top of Hibernate. It 035 * executes the HQL query when initialized iterates over the result set as 036 * {@link #read()} method is called, returning an object corresponding to 037 * current row. The query can be set directly using 038 * {@link #setQueryString(String)}, a named query can be used by 039 * {@link #setQueryName(String)}, or a query provider strategy can be supplied 040 * via {@link #setQueryProvider(HibernateQueryProvider)}. 041 * 042 * 043 * <p> 044 * The reader can be configured to use either {@link StatelessSession} 045 * sufficient for simple mappings without the need to cascade to associated 046 * objects or standard hibernate {@link Session} for more advanced mappings or 047 * when caching is desired. When stateful session is used it will be cleared in 048 * the {@link #update(ExecutionContext)} method without being flushed (no data 049 * modifications are expected). 050 * </p> 051 * 052 * The implementation is <b>not</b> thread-safe. 053 * 054 * @author Robert Kasanicky 055 * @author Dave Syer 056 */ 057public class HibernateCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> 058 implements InitializingBean { 059 060 private HibernateItemReaderHelper<T> helper = new HibernateItemReaderHelper<>(); 061 062 public HibernateCursorItemReader() { 063 setName(ClassUtils.getShortName(HibernateCursorItemReader.class)); 064 } 065 066 private ScrollableResults cursor; 067 068 private boolean initialized = false; 069 070 private int fetchSize; 071 072 private Map<String, Object> parameterValues; 073 074 @Override 075 public void afterPropertiesSet() throws Exception { 076 Assert.state(fetchSize >= 0, "fetchSize must not be negative"); 077 helper.afterPropertiesSet(); 078 } 079 080 /** 081 * The parameter values to apply to a query (map of name:value). 082 * 083 * @param parameterValues the parameter values to set 084 */ 085 public void setParameterValues(Map<String, Object> parameterValues) { 086 this.parameterValues = parameterValues; 087 } 088 089 /** 090 * A query name for an externalized query. Either this or the { 091 * {@link #setQueryString(String) query string} or the { 092 * {@link #setQueryProvider(HibernateQueryProvider) query provider} should 093 * be set. 094 * 095 * @param queryName name of a hibernate named query 096 */ 097 public void setQueryName(String queryName) { 098 helper.setQueryName(queryName); 099 } 100 101 /** 102 * Fetch size used internally by Hibernate to limit amount of data fetched 103 * from database per round trip. 104 * 105 * @param fetchSize the fetch size to pass down to Hibernate 106 */ 107 public void setFetchSize(int fetchSize) { 108 this.fetchSize = fetchSize; 109 } 110 111 /** 112 * A query provider. Either this or the {{@link #setQueryString(String) 113 * query string} or the {{@link #setQueryName(String) query name} should be 114 * set. 115 * 116 * @param queryProvider Hibernate query provider 117 */ 118 public void setQueryProvider(HibernateQueryProvider<T> queryProvider) { 119 helper.setQueryProvider(queryProvider); 120 } 121 122 /** 123 * A query string in HQL. Either this or the { 124 * {@link #setQueryProvider(HibernateQueryProvider) query provider} or the { 125 * {@link #setQueryName(String) query name} should be set. 126 * 127 * @param queryString HQL query string 128 */ 129 public void setQueryString(String queryString) { 130 helper.setQueryString(queryString); 131 } 132 133 /** 134 * The Hibernate SessionFactory to use the create a session. 135 * 136 * @param sessionFactory the {@link SessionFactory} to set 137 */ 138 public void setSessionFactory(SessionFactory sessionFactory) { 139 helper.setSessionFactory(sessionFactory); 140 } 141 142 /** 143 * Can be set only in uninitialized state. 144 * 145 * @param useStatelessSession <code>true</code> to use 146 * {@link StatelessSession} <code>false</code> to use standard hibernate 147 * {@link Session} 148 */ 149 public void setUseStatelessSession(boolean useStatelessSession) { 150 helper.setUseStatelessSession(useStatelessSession); 151 } 152 153 @Override 154 protected T doRead() throws Exception { 155 if (cursor.next()) { 156 Object[] data = cursor.get(); 157 158 if (data.length > 1) { 159 // If there are multiple items this must be a projection 160 // and T is an array type. 161 @SuppressWarnings("unchecked") 162 T item = (T) data; 163 return item; 164 } 165 else { 166 // Assume if there is only one item that it is the data the user 167 // wants. 168 // If there is only one item this is going to be a nasty shock 169 // if T is an array type but there's not much else we can do... 170 @SuppressWarnings("unchecked") 171 T item = (T) data[0]; 172 return item; 173 } 174 175 } 176 return null; 177 } 178 179 /** 180 * Open hibernate session and create a forward-only cursor for the query. 181 */ 182 @Override 183 protected void doOpen() throws Exception { 184 Assert.state(!initialized, "Cannot open an already opened ItemReader, call close first"); 185 cursor = helper.getForwardOnlyCursor(fetchSize, parameterValues); 186 initialized = true; 187 } 188 189 /** 190 * Update the context and clear the session if stateful. 191 * 192 * @param executionContext the current {@link ExecutionContext} 193 * @throws ItemStreamException if there is a problem 194 */ 195 @Override 196 public void update(ExecutionContext executionContext) throws ItemStreamException { 197 super.update(executionContext); 198 helper.clear(); 199 } 200 201 /** 202 * Wind forward through the result set to the item requested. Also clears 203 * the session every now and then (if stateful) to avoid memory problems. 204 * The frequency of session clearing is the larger of the fetch size (if 205 * set) and 100. 206 * 207 * @param itemIndex the first item to read 208 * @throws Exception if there is a problem 209 * @see AbstractItemCountingItemStreamItemReader#jumpToItem(int) 210 */ 211 @Override 212 protected void jumpToItem(int itemIndex) throws Exception { 213 int flushSize = Math.max(fetchSize, 100); 214 helper.jumpToItem(cursor, itemIndex, flushSize); 215 } 216 217 /** 218 * Close the cursor and hibernate session. 219 */ 220 @Override 221 protected void doClose() throws Exception { 222 223 if(initialized) { 224 if (cursor != null) { 225 cursor.close(); 226 } 227 228 helper.close(); 229 } 230 231 initialized = false; 232 } 233}