001/* 002 * Copyright 2006-2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.item.database; 018 019import java.sql.CallableStatement; 020import java.sql.Connection; 021import java.sql.ResultSet; 022import java.sql.SQLException; 023import java.sql.Types; 024import java.util.Arrays; 025 026import org.springframework.jdbc.core.PreparedStatementSetter; 027import org.springframework.jdbc.core.RowMapper; 028import org.springframework.jdbc.core.SqlOutParameter; 029import org.springframework.jdbc.core.SqlParameter; 030import org.springframework.jdbc.core.metadata.CallMetaDataContext; 031import org.springframework.jdbc.support.JdbcUtils; 032import org.springframework.util.Assert; 033import org.springframework.util.ClassUtils; 034 035/** 036 * <p> 037 * Item reader implementation that executes a stored procedure and then reads the returned cursor 038 * and continually retrieves the next row in the <code>ResultSet</code>. 039 * </p> 040 * 041 * <p> 042 * The callable statement used to open the cursor is created with the 'READ_ONLY' option as well as with the 043 * 'TYPE_FORWARD_ONLY' option. By default the cursor will be opened using a separate connection which means 044 * that it will not participate in any transactions created as part of the step processing. 045 * </p> 046 * 047 * <p> 048 * Each call to {@link #read()} will call the provided RowMapper, passing in the 049 * ResultSet. 050 * </p> 051 * 052 * <p> 053 * This class is modeled after the similar <code>JdbcCursorItemReader</code> class. 054 * </p> 055 * 056 * @author Thomas Risberg 057 */ 058public class StoredProcedureItemReader<T> extends AbstractCursorItemReader<T> { 059 060 private CallableStatement callableStatement; 061 062 private PreparedStatementSetter preparedStatementSetter; 063 064 private String procedureName; 065 066 private String callString; 067 068 private RowMapper<T> rowMapper; 069 070 private SqlParameter[] parameters = new SqlParameter[0]; 071 072 private boolean function = false; 073 074 private int refCursorPosition = 0; 075 076 public StoredProcedureItemReader() { 077 super(); 078 setName(ClassUtils.getShortName(StoredProcedureItemReader.class)); 079 } 080 081 /** 082 * Set the RowMapper to be used for all calls to read(). 083 * 084 * @param rowMapper the RowMapper to use to map the results 085 */ 086 public void setRowMapper(RowMapper<T> rowMapper) { 087 this.rowMapper = rowMapper; 088 } 089 090 /** 091 * Set the SQL statement to be used when creating the cursor. This statement 092 * should be a complete and valid SQL statement, as it will be run directly 093 * without any modification. 094 * 095 * @param sprocedureName the SQL used to call the statement 096 */ 097 public void setProcedureName(String sprocedureName) { 098 this.procedureName = sprocedureName; 099 } 100 101 /** 102 * Set the PreparedStatementSetter to use if any parameter values that need 103 * to be set in the supplied query. 104 * 105 * @param preparedStatementSetter used to populate the SQL 106 */ 107 public void setPreparedStatementSetter(PreparedStatementSetter preparedStatementSetter) { 108 this.preparedStatementSetter = preparedStatementSetter; 109 } 110 111 /** 112 * Add one or more declared parameters. Used for configuring this operation when used in a 113 * bean factory. Each parameter will specify SQL type and (optionally) the parameter's name. 114 * 115 * @param parameters Array containing the declared <code>SqlParameter</code> objects 116 */ 117 public void setParameters(SqlParameter[] parameters) { 118 this.parameters = parameters; 119 } 120 121 /** 122 * Set whether this stored procedure is a function. 123 * 124 * @param function indicator 125 */ 126 public void setFunction(boolean function) { 127 this.function = function; 128 } 129 130 /** 131 * Set the parameter position of the REF CURSOR. Only used for Oracle and 132 * PostgreSQL that use REF CURSORs. For any other database this should be 133 * kept as 0 which is the default. 134 * 135 * @param refCursorPosition The parameter position of the REF CURSOR 136 */ 137 public void setRefCursorPosition(int refCursorPosition) { 138 this.refCursorPosition = refCursorPosition; 139 } 140 141 /** 142 * Assert that mandatory properties are set. 143 * 144 * @throws IllegalArgumentException if either data source or SQL properties 145 * not set. 146 */ 147 @Override 148 public void afterPropertiesSet() throws Exception { 149 super.afterPropertiesSet(); 150 Assert.notNull(procedureName, "The name of the stored procedure must be provided"); 151 Assert.notNull(rowMapper, "RowMapper must be provided"); 152 } 153 154 @Override 155 protected void openCursor(Connection con) { 156 157 Assert.state(procedureName != null, "Procedure Name must not be null."); 158 Assert.state(refCursorPosition >= 0, 159 "invalid refCursorPosition specified as " + refCursorPosition + "; it can't be " + 160 "specified as a negative number."); 161 Assert.state(refCursorPosition == 0 || refCursorPosition > 0, 162 "invalid refCursorPosition specified as " + refCursorPosition + "; there are " + 163 parameters.length + " parameters defined."); 164 165 CallMetaDataContext callContext = new CallMetaDataContext(); 166 callContext.setAccessCallParameterMetaData(false); 167 callContext.setProcedureName(procedureName); 168 callContext.setFunction(function); 169 callContext.initializeMetaData(getDataSource()); 170 callContext.processParameters(Arrays.asList(parameters)); 171 SqlParameter cursorParameter = callContext.createReturnResultSetParameter("cursor", rowMapper); 172 this.callString = callContext.createCallString(); 173 174 175 if (log.isDebugEnabled()) { 176 log.debug("Call string is: " + callString); 177 } 178 179 int cursorSqlType = Types.OTHER; 180 if (function) { 181 if (cursorParameter instanceof SqlOutParameter) { 182 cursorSqlType = cursorParameter.getSqlType(); 183 } 184 } 185 else { 186 if (refCursorPosition > 0 && refCursorPosition <= parameters.length) { 187 cursorSqlType = parameters[refCursorPosition - 1].getSqlType(); 188 } 189 } 190 191 try { 192 if (isUseSharedExtendedConnection()) { 193 callableStatement = con.prepareCall(callString, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, 194 ResultSet.HOLD_CURSORS_OVER_COMMIT); 195 } 196 else { 197 callableStatement = con.prepareCall(callString, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); 198 } 199 applyStatementSettings(callableStatement); 200 if (this.preparedStatementSetter != null) { 201 preparedStatementSetter.setValues(callableStatement); 202 } 203 204 if (function) { 205 callableStatement.registerOutParameter(1, cursorSqlType); 206 } 207 else { 208 if (refCursorPosition > 0) { 209 callableStatement.registerOutParameter(refCursorPosition, cursorSqlType); 210 } 211 } 212 boolean results = callableStatement.execute(); 213 if (results) { 214 rs = callableStatement.getResultSet(); 215 } 216 else { 217 if (function) { 218 rs = (ResultSet) callableStatement.getObject(1); 219 } 220 else { 221 rs = (ResultSet) callableStatement.getObject(refCursorPosition); 222 } 223 } 224 handleWarnings(callableStatement); 225 } 226 catch (SQLException se) { 227 close(); 228 throw getExceptionTranslator().translate("Executing stored procedure", getSql(), se); 229 } 230 231 } 232 233 @Override 234 protected T readCursor(ResultSet rs, int currentRow) throws SQLException { 235 return rowMapper.mapRow(rs, currentRow); 236 } 237 238 /** 239 * Close the cursor and database connection. 240 */ 241 @Override 242 protected void cleanupOnClose() throws Exception { 243 JdbcUtils.closeStatement(this.callableStatement); 244 } 245 246 @Override 247 public String getSql() { 248 if (callString != null) { 249 return this.callString; 250 } 251 else { 252 return "PROCEDURE NAME: " + procedureName; 253 } 254 } 255 256}