001/*
002 * Copyright 2006-2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.item.database;
018
019import java.sql.CallableStatement;
020import java.sql.Connection;
021import java.sql.ResultSet;
022import java.sql.SQLException;
023import java.sql.Types;
024import java.util.Arrays;
025
026import org.springframework.jdbc.core.PreparedStatementSetter;
027import org.springframework.jdbc.core.RowMapper;
028import org.springframework.jdbc.core.SqlOutParameter;
029import org.springframework.jdbc.core.SqlParameter;
030import org.springframework.jdbc.core.metadata.CallMetaDataContext;
031import org.springframework.jdbc.support.JdbcUtils;
032import org.springframework.util.Assert;
033import org.springframework.util.ClassUtils;
034
035/**
036 * <p>
037 * Item reader implementation that executes a stored procedure and then reads the returned cursor
038 * and continually retrieves the next row in the <code>ResultSet</code>.
039 * </p>
040 *
041 * <p>
042 * The callable statement used to open the cursor is created with the 'READ_ONLY' option as well as with the
043 * 'TYPE_FORWARD_ONLY' option. By default the cursor will be opened using a separate connection which means
044 * that it will not participate in any transactions created as part of the step processing.
045 * </p>
046 *
047 * <p>
048 * Each call to {@link #read()} will call the provided RowMapper, passing in the
049 * ResultSet.
050 * </p>
051 *
052 * <p>
053 * This class is modeled after the similar <code>JdbcCursorItemReader</code> class.
054 * </p>
055 *
056 * @author Thomas Risberg
057 */
058public class StoredProcedureItemReader<T> extends AbstractCursorItemReader<T> {
059
060        private CallableStatement callableStatement;
061
062        private PreparedStatementSetter preparedStatementSetter;
063
064        private String procedureName;
065
066        private String callString;
067
068        private RowMapper<T> rowMapper;
069
070        private SqlParameter[] parameters = new SqlParameter[0];
071
072        private boolean function = false;
073
074        private int refCursorPosition = 0;
075
076        public StoredProcedureItemReader() {
077                super();
078                setName(ClassUtils.getShortName(StoredProcedureItemReader.class));
079        }
080
081        /**
082         * Set the RowMapper to be used for all calls to read().
083         *
084         * @param rowMapper the RowMapper to use to map the results
085         */
086        public void setRowMapper(RowMapper<T> rowMapper) {
087                this.rowMapper = rowMapper;
088        }
089
090        /**
091         * Set the SQL statement to be used when creating the cursor. This statement
092         * should be a complete and valid SQL statement, as it will be run directly
093         * without any modification.
094         *
095         * @param sprocedureName the SQL used to call the statement
096         */
097        public void setProcedureName(String sprocedureName) {
098                this.procedureName = sprocedureName;
099        }
100
101        /**
102         * Set the PreparedStatementSetter to use if any parameter values that need
103         * to be set in the supplied query.
104         *
105         * @param preparedStatementSetter used to populate the SQL
106         */
107        public void setPreparedStatementSetter(PreparedStatementSetter preparedStatementSetter) {
108                this.preparedStatementSetter = preparedStatementSetter;
109        }
110
111        /**
112         * Add one or more declared parameters. Used for configuring this operation when used in a
113         * bean factory. Each parameter will specify SQL type and (optionally) the parameter's name.
114         *
115         * @param parameters Array containing the declared <code>SqlParameter</code> objects
116         */
117        public void setParameters(SqlParameter[] parameters) {
118                this.parameters = parameters;
119        }
120
121        /**
122         * Set whether this stored procedure is a function.
123         *
124         * @param function indicator
125         */
126        public void setFunction(boolean function) {
127                this.function = function;
128        }
129
130        /**
131         * Set the parameter position of the REF CURSOR. Only used for Oracle and
132         * PostgreSQL that use REF CURSORs. For any other database this should be
133         * kept as 0 which is the default.
134         *
135         * @param refCursorPosition The parameter position of the REF CURSOR
136         */
137        public void setRefCursorPosition(int refCursorPosition) {
138                this.refCursorPosition = refCursorPosition;
139        }
140
141        /**
142         * Assert that mandatory properties are set.
143         *
144         * @throws IllegalArgumentException if either data source or SQL properties
145         * not set.
146         */
147        @Override
148        public void afterPropertiesSet() throws Exception {
149                super.afterPropertiesSet();
150                Assert.notNull(procedureName, "The name of the stored procedure must be provided");
151                Assert.notNull(rowMapper, "RowMapper must be provided");
152        }
153
154        @Override
155        protected void openCursor(Connection con) {
156
157                Assert.state(procedureName != null, "Procedure Name must not be null.");
158                Assert.state(refCursorPosition >= 0,
159                                "invalid refCursorPosition specified as " + refCursorPosition + "; it can't be " +
160                                "specified as a negative number.");
161                Assert.state(refCursorPosition == 0 || refCursorPosition > 0,
162                                "invalid refCursorPosition specified as " + refCursorPosition + "; there are " +
163                                                parameters.length + " parameters defined.");
164
165                CallMetaDataContext callContext = new CallMetaDataContext();
166                callContext.setAccessCallParameterMetaData(false);
167                callContext.setProcedureName(procedureName);
168                callContext.setFunction(function);
169                callContext.initializeMetaData(getDataSource());
170                callContext.processParameters(Arrays.asList(parameters));
171                SqlParameter cursorParameter = callContext.createReturnResultSetParameter("cursor", rowMapper);
172                this.callString = callContext.createCallString();
173
174
175                if (log.isDebugEnabled()) {
176                        log.debug("Call string is: " + callString);
177                }
178
179                int cursorSqlType = Types.OTHER;
180                if (function) {
181                        if (cursorParameter instanceof SqlOutParameter) {
182                                cursorSqlType = cursorParameter.getSqlType();
183                        }
184                }
185                else {
186                        if (refCursorPosition > 0 && refCursorPosition <= parameters.length) {
187                                cursorSqlType = parameters[refCursorPosition - 1].getSqlType();
188                        }
189                }
190
191                try {
192                        if (isUseSharedExtendedConnection()) {
193                                callableStatement = con.prepareCall(callString, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
194                                                ResultSet.HOLD_CURSORS_OVER_COMMIT);
195                        }
196                        else {
197                                callableStatement = con.prepareCall(callString, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
198                        }
199                        applyStatementSettings(callableStatement);
200                        if (this.preparedStatementSetter != null) {
201                                preparedStatementSetter.setValues(callableStatement);
202                        }
203
204                        if (function) {
205                                callableStatement.registerOutParameter(1, cursorSqlType);
206                        }
207                        else {
208                                if (refCursorPosition > 0) {
209                                        callableStatement.registerOutParameter(refCursorPosition, cursorSqlType);
210                                }
211                        }
212                        boolean results = callableStatement.execute();
213                        if (results) {
214                                rs = callableStatement.getResultSet();
215                        }
216                        else {
217                                if (function) {
218                                        rs = (ResultSet) callableStatement.getObject(1);
219                                }
220                                else {
221                                        rs = (ResultSet) callableStatement.getObject(refCursorPosition);
222                                }
223                        }
224                        handleWarnings(callableStatement);
225                }
226                catch (SQLException se) {
227                        close();
228                        throw getExceptionTranslator().translate("Executing stored procedure", getSql(), se);
229                }
230
231        }
232
233        @Override
234        protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
235                return rowMapper.mapRow(rs, currentRow);
236        }
237
238        /**
239         * Close the cursor and database connection.
240         */
241        @Override
242        protected void cleanupOnClose() throws Exception {
243                JdbcUtils.closeStatement(this.callableStatement);
244        }
245
246        @Override
247        public String getSql() {
248                if (callString != null) {
249                        return this.callString;
250                }
251                else {
252                        return "PROCEDURE NAME: " + procedureName;
253                }
254        }
255
256}