001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.item.database; 018 019import java.sql.Connection; 020import java.sql.PreparedStatement; 021import java.sql.ResultSet; 022import java.sql.SQLException; 023import java.sql.SQLWarning; 024import java.sql.Statement; 025 026import javax.sql.DataSource; 027 028import org.apache.commons.logging.Log; 029import org.apache.commons.logging.LogFactory; 030import org.springframework.batch.item.ExecutionContext; 031import org.springframework.batch.item.ItemStream; 032import org.springframework.batch.item.ReaderNotOpenException; 033import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; 034import org.springframework.beans.factory.InitializingBean; 035import org.springframework.dao.InvalidDataAccessApiUsageException; 036import org.springframework.dao.InvalidDataAccessResourceUsageException; 037import org.springframework.jdbc.SQLWarningException; 038import org.springframework.jdbc.datasource.DataSourceUtils; 039import org.springframework.jdbc.support.JdbcUtils; 040import org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator; 041import org.springframework.jdbc.support.SQLExceptionTranslator; 042import org.springframework.jdbc.support.SQLStateSQLExceptionTranslator; 043import org.springframework.lang.Nullable; 044import org.springframework.transaction.support.TransactionSynchronizationManager; 045import org.springframework.util.Assert; 046 047/** 048 * <p> 049 * Abstract base class for any simple item reader that opens a database cursor and continually retrieves 050 * the next row in the ResultSet. 051 * </p> 052 * 053 * <p> 054 * By default the cursor will be opened using a separate connection. The ResultSet for the cursor 055 * is held open regardless of commits or roll backs in a surrounding transaction. Clients of this 056 * reader are responsible for buffering the items in the case that they need to be re-presented on a 057 * rollback. This buffering is handled by the step implementations provided and is only a concern for 058 * anyone writing their own step implementations. 059 * </p> 060 * 061 * <p> 062 * There is an option ({@link #setUseSharedExtendedConnection(boolean)} that will share the connection 063 * used for the cursor with the rest of the step processing. If you set this flag to <code>true</code> 064 * then you must wrap the DataSource in a {@link ExtendedConnectionDataSourceProxy} to prevent the 065 * connection from being closed and released after each commit performed as part of the step processing. 066 * You must also use a JDBC driver supporting JDBC 3.0 or later since the cursor will be opened with the 067 * additional option of 'HOLD_CURSORS_OVER_COMMIT' enabled. 068 * </p> 069 * 070 * <p> 071 * Each call to {@link #read()} will attempt to map the row at the current position in the 072 * ResultSet. There is currently no wrapping of the ResultSet to suppress calls 073 * to next(). However, if the RowMapper (mistakenly) increments the current row, 074 * the next call to read will verify that the current row is at the expected 075 * position and throw a DataAccessException if it is not. The reason for such strictness on the 076 * ResultSet is due to the need to maintain control for transactions and 077 * restartability. This ensures that each call to {@link #read()} returns the 078 * ResultSet at the correct row, regardless of rollbacks or restarts. 079 * </p> 080 * 081 * <p> 082 * {@link ExecutionContext}: The current row is returned as restart data, and 083 * when restored from that same data, the cursor is opened and the current row 084 * set to the value within the restart data. See 085 * {@link #setDriverSupportsAbsolute(boolean)} for improving restart 086 * performance. 087 * </p> 088 * 089 * <p> 090 * Calling close on this {@link ItemStream} will cause all resources it is 091 * currently using to be freed. (Connection, ResultSet, etc). It is then illegal 092 * to call {@link #read()} again until it has been re-opened. 093 * </p> 094 * 095 * <p> 096 * Known limitation: when used with Derby 097 * {@link #setVerifyCursorPosition(boolean)} needs to be <code>false</code> 098 * because {@link ResultSet#getRow()} call used for cursor position verification 099 * is not available for 'TYPE_FORWARD_ONLY' result sets. 100 * </p> 101 * 102 * @author Lucas Ward 103 * @author Peter Zozom 104 * @author Robert Kasanicky 105 * @author Thomas Risberg 106 * @author Michael Minella 107 * @author Mahmoud Ben Hassine 108 */ 109public abstract class AbstractCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> 110implements InitializingBean { 111 112 /** Logger available to subclasses */ 113 protected final Log log = LogFactory.getLog(getClass()); 114 115 public static final int VALUE_NOT_SET = -1; 116 private Connection con; 117 118 protected ResultSet rs; 119 120 private DataSource dataSource; 121 122 private int fetchSize = VALUE_NOT_SET; 123 124 private int maxRows = VALUE_NOT_SET; 125 126 private int queryTimeout = VALUE_NOT_SET; 127 128 private boolean ignoreWarnings = true; 129 130 private boolean verifyCursorPosition = true; 131 132 private SQLExceptionTranslator exceptionTranslator; 133 134 private boolean initialized = false; 135 136 private boolean driverSupportsAbsolute = false; 137 138 private boolean useSharedExtendedConnection = false; 139 140 private Boolean connectionAutoCommit; 141 142 private boolean initialConnectionAutoCommit; 143 144 public AbstractCursorItemReader() { 145 super(); 146 } 147 148 /** 149 * Assert that mandatory properties are set. 150 * 151 * @throws IllegalArgumentException if either data source or SQL properties 152 * not set. 153 */ 154 @Override 155 public void afterPropertiesSet() throws Exception { 156 Assert.notNull(dataSource, "DataSource must be provided"); 157 } 158 159 /** 160 * Public setter for the data source for injection purposes. 161 * 162 * @param dataSource {@link javax.sql.DataSource} to be used 163 */ 164 public void setDataSource(DataSource dataSource) { 165 this.dataSource = dataSource; 166 } 167 168 /** 169 * Public getter for the data source. 170 * 171 * @return the dataSource 172 */ 173 public DataSource getDataSource() { 174 return this.dataSource; 175 } 176 177 /** 178 * Prepare the given JDBC Statement (or PreparedStatement or 179 * CallableStatement), applying statement settings such as fetch size, max 180 * rows, and query timeout. @param stmt the JDBC Statement to prepare 181 * 182 * @param stmt {@link java.sql.PreparedStatement} to be configured 183 * 184 * @throws SQLException if interactions with provided stmt fail 185 * 186 * @see #setFetchSize 187 * @see #setMaxRows 188 * @see #setQueryTimeout 189 */ 190 protected void applyStatementSettings(PreparedStatement stmt) throws SQLException { 191 if (fetchSize != VALUE_NOT_SET) { 192 stmt.setFetchSize(fetchSize); 193 stmt.setFetchDirection(ResultSet.FETCH_FORWARD); 194 } 195 if (maxRows != VALUE_NOT_SET) { 196 stmt.setMaxRows(maxRows); 197 } 198 if (queryTimeout != VALUE_NOT_SET) { 199 stmt.setQueryTimeout(queryTimeout); 200 } 201 } 202 203 /** 204 * Creates a default SQLErrorCodeSQLExceptionTranslator for the specified 205 * DataSource if none is set. 206 * 207 * @return the exception translator for this instance. 208 */ 209 protected SQLExceptionTranslator getExceptionTranslator() { 210 synchronized(this) { 211 if (exceptionTranslator == null) { 212 if (dataSource != null) { 213 exceptionTranslator = new SQLErrorCodeSQLExceptionTranslator(dataSource); 214 } 215 else { 216 exceptionTranslator = new SQLStateSQLExceptionTranslator(); 217 } 218 } 219 } 220 return exceptionTranslator; 221 } 222 223 /** 224 * Throw a SQLWarningException if we're not ignoring warnings, else log the 225 * warnings (at debug level). 226 * 227 * @param statement the current statement to obtain the warnings from, if there are any. 228 * @throws SQLException if interaction with provided statement fails. 229 * 230 * @see org.springframework.jdbc.SQLWarningException 231 */ 232 protected void handleWarnings(Statement statement) throws SQLWarningException, 233 SQLException { 234 if (ignoreWarnings) { 235 if (log.isDebugEnabled()) { 236 SQLWarning warningToLog = statement.getWarnings(); 237 while (warningToLog != null) { 238 log.debug("SQLWarning ignored: SQL state '" + warningToLog.getSQLState() + "', error code '" 239 + warningToLog.getErrorCode() + "', message [" + warningToLog.getMessage() + "]"); 240 warningToLog = warningToLog.getNextWarning(); 241 } 242 } 243 } 244 else { 245 SQLWarning warnings = statement.getWarnings(); 246 if (warnings != null) { 247 throw new SQLWarningException("Warning not ignored", warnings); 248 } 249 } 250 } 251 252 /** 253 * Moves the cursor in the ResultSet to the position specified by the row 254 * parameter by traversing the ResultSet. 255 * @param row The index of the row to move to 256 */ 257 private void moveCursorToRow(int row) { 258 try { 259 int count = 0; 260 while (row != count && rs.next()) { 261 count++; 262 } 263 } 264 catch (SQLException se) { 265 throw getExceptionTranslator().translate("Attempted to move ResultSet to last committed row", getSql(), se); 266 } 267 } 268 269 /** 270 * Gives the JDBC driver a hint as to the number of rows that should be 271 * fetched from the database when more rows are needed for this 272 * <code>ResultSet</code> object. If the fetch size specified is zero, the 273 * JDBC driver ignores the value. 274 * 275 * @param fetchSize the number of rows to fetch 276 * @see ResultSet#setFetchSize(int) 277 */ 278 public void setFetchSize(int fetchSize) { 279 this.fetchSize = fetchSize; 280 } 281 282 /** 283 * Sets the limit for the maximum number of rows that any 284 * <code>ResultSet</code> object can contain to the given number. 285 * 286 * @param maxRows the new max rows limit; zero means there is no limit 287 * @see Statement#setMaxRows(int) 288 */ 289 public void setMaxRows(int maxRows) { 290 this.maxRows = maxRows; 291 } 292 293 /** 294 * Sets the number of seconds the driver will wait for a 295 * <code>Statement</code> object to execute to the given number of seconds. 296 * If the limit is exceeded, an <code>SQLException</code> is thrown. 297 * 298 * @param queryTimeout seconds the new query timeout limit in seconds; zero 299 * means there is no limit 300 * @see Statement#setQueryTimeout(int) 301 */ 302 public void setQueryTimeout(int queryTimeout) { 303 this.queryTimeout = queryTimeout; 304 } 305 306 /** 307 * Set whether SQLWarnings should be ignored (only logged) or exception 308 * should be thrown. 309 * 310 * @param ignoreWarnings if TRUE, warnings are ignored 311 */ 312 public void setIgnoreWarnings(boolean ignoreWarnings) { 313 this.ignoreWarnings = ignoreWarnings; 314 } 315 316 /** 317 * Allow verification of cursor position after current row is processed by 318 * RowMapper or RowCallbackHandler. Default value is TRUE. 319 * 320 * @param verifyCursorPosition if true, cursor position is verified 321 */ 322 public void setVerifyCursorPosition(boolean verifyCursorPosition) { 323 this.verifyCursorPosition = verifyCursorPosition; 324 } 325 326 /** 327 * Indicate whether the JDBC driver supports setting the absolute row on a 328 * {@link ResultSet}. It is recommended that this is set to 329 * <code>true</code> for JDBC drivers that supports ResultSet.absolute() as 330 * it may improve performance, especially if a step fails while working with 331 * a large data set. 332 * 333 * @see ResultSet#absolute(int) 334 * 335 * @param driverSupportsAbsolute <code>false</code> by default 336 */ 337 public void setDriverSupportsAbsolute(boolean driverSupportsAbsolute) { 338 this.driverSupportsAbsolute = driverSupportsAbsolute; 339 } 340 341 /** 342 * Indicate whether the connection used for the cursor should be used by all other processing 343 * thus sharing the same transaction. If this is set to false, which is the default, then the 344 * cursor will be opened using in its connection and will not participate in any transactions 345 * started for the rest of the step processing. If you set this flag to true then you must 346 * wrap the DataSource in a {@link ExtendedConnectionDataSourceProxy} to prevent the 347 * connection from being closed and released after each commit. 348 * 349 * When you set this option to <code>true</code> then the statement used to open the cursor 350 * will be created with both 'READ_ONLY' and 'HOLD_CURSORS_OVER_COMMIT' options. This allows 351 * holding the cursor open over transaction start and commits performed in the step processing. 352 * To use this feature you need a database that supports this and a JDBC driver supporting 353 * JDBC 3.0 or later. 354 * 355 * @param useSharedExtendedConnection <code>false</code> by default 356 */ 357 public void setUseSharedExtendedConnection(boolean useSharedExtendedConnection) { 358 this.useSharedExtendedConnection = useSharedExtendedConnection; 359 } 360 361 public boolean isUseSharedExtendedConnection() { 362 return useSharedExtendedConnection; 363 } 364 365 /** 366 * Set whether "autoCommit" should be overridden for the connection used by the cursor. If not set, defaults to 367 * Connection / Datasource default configuration. 368 * 369 * @param autoCommit value used for {@link Connection#setAutoCommit(boolean)}. 370 * @since 4.0 371 */ 372 public void setConnectionAutoCommit(boolean autoCommit) { 373 this.connectionAutoCommit = autoCommit; 374 } 375 376 public abstract String getSql(); 377 378 /** 379 * Check the result set is in sync with the currentRow attribute. This is 380 * important to ensure that the user hasn't modified the current row. 381 */ 382 private void verifyCursorPosition(long expectedCurrentRow) throws SQLException { 383 if (verifyCursorPosition) { 384 if (expectedCurrentRow != this.rs.getRow()) { 385 throw new InvalidDataAccessResourceUsageException("Unexpected cursor position change."); 386 } 387 } 388 } 389 390 /** 391 * Close the cursor and database connection. Make call to cleanupOnClose so sub classes can cleanup 392 * any resources they have allocated. 393 */ 394 @Override 395 protected void doClose() throws Exception { 396 initialized = false; 397 JdbcUtils.closeResultSet(this.rs); 398 rs = null; 399 cleanupOnClose(); 400 401 if(this.con != null && !this.con.isClosed()) { 402 this.con.setAutoCommit(this.initialConnectionAutoCommit); 403 } 404 405 if (useSharedExtendedConnection && dataSource instanceof ExtendedConnectionDataSourceProxy) { 406 ((ExtendedConnectionDataSourceProxy)dataSource).stopCloseSuppression(this.con); 407 if (!TransactionSynchronizationManager.isActualTransactionActive()) { 408 DataSourceUtils.releaseConnection(con, dataSource); 409 } 410 } 411 else { 412 JdbcUtils.closeConnection(this.con); 413 } 414 } 415 416 protected abstract void cleanupOnClose() throws Exception; 417 418 /** 419 * Execute the statement to open the cursor. 420 */ 421 @Override 422 protected void doOpen() throws Exception { 423 424 Assert.state(!initialized, "Stream is already initialized. Close before re-opening."); 425 Assert.isNull(rs, "ResultSet still open! Close before re-opening."); 426 427 initializeConnection(); 428 openCursor(con); 429 initialized = true; 430 431 } 432 433 protected void initializeConnection() { 434 Assert.state(getDataSource() != null, "DataSource must not be null."); 435 436 try { 437 if (useSharedExtendedConnection) { 438 if (!(getDataSource() instanceof ExtendedConnectionDataSourceProxy)) { 439 throw new InvalidDataAccessApiUsageException( 440 "You must use a ExtendedConnectionDataSourceProxy for the dataSource when " + 441 "useSharedExtendedConnection is set to true."); 442 } 443 this.con = DataSourceUtils.getConnection(dataSource); 444 ((ExtendedConnectionDataSourceProxy)dataSource).startCloseSuppression(this.con); 445 } 446 else { 447 this.con = dataSource.getConnection(); 448 } 449 450 this.initialConnectionAutoCommit = this.con.getAutoCommit(); 451 452 if (this.connectionAutoCommit != null && this.con.getAutoCommit() != this.connectionAutoCommit) { 453 this.con.setAutoCommit(this.connectionAutoCommit); 454 } 455 } 456 catch (SQLException se) { 457 close(); 458 throw getExceptionTranslator().translate("Executing query", getSql(), se); 459 } 460 } 461 462 protected abstract void openCursor(Connection con); 463 464 /** 465 * Read next row and map it to item, verify cursor position if 466 * {@link #setVerifyCursorPosition(boolean)} is true. 467 */ 468 @Override 469 protected T doRead() throws Exception { 470 if (rs == null) { 471 throw new ReaderNotOpenException("Reader must be open before it can be read."); 472 } 473 474 try { 475 if (!rs.next()) { 476 return null; 477 } 478 int currentRow = getCurrentItemCount(); 479 T item = readCursor(rs, currentRow); 480 verifyCursorPosition(currentRow); 481 return item; 482 } 483 catch (SQLException se) { 484 throw getExceptionTranslator().translate("Attempt to process next row failed", getSql(), se); 485 } 486 } 487 488 /** 489 * Read the cursor and map to the type of object this reader should return. This method must be 490 * overridden by subclasses. 491 * 492 * @param rs The current result set 493 * @param currentRow Current position of the result set 494 * @return the mapped object at the cursor position 495 * @throws SQLException if interactions with the current result set fail 496 */ 497 @Nullable 498 protected abstract T readCursor(ResultSet rs, int currentRow) throws SQLException; 499 500 /** 501 * Use {@link ResultSet#absolute(int)} if possible, otherwise scroll by 502 * calling {@link ResultSet#next()}. 503 */ 504 @Override 505 protected void jumpToItem(int itemIndex) throws Exception { 506 if (driverSupportsAbsolute) { 507 try { 508 rs.absolute(itemIndex); 509 } 510 catch (SQLException e) { 511 // Driver does not support rs.absolute(int) revert to 512 // traversing ResultSet 513 log.warn("The JDBC driver does not appear to support ResultSet.absolute(). Consider" 514 + " reverting to the default behavior setting the driverSupportsAbsolute to false", e); 515 516 moveCursorToRow(itemIndex); 517 } 518 } 519 else { 520 moveCursorToRow(itemIndex); 521 } 522 } 523 524}