001/*
002 * Copyright 2006-2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.item.database;
018
019import java.sql.Connection;
020import java.sql.PreparedStatement;
021import java.sql.ResultSet;
022import java.sql.SQLException;
023
024import org.springframework.jdbc.core.PreparedStatementSetter;
025import org.springframework.jdbc.core.RowMapper;
026import org.springframework.jdbc.support.JdbcUtils;
027import org.springframework.util.Assert;
028import org.springframework.util.ClassUtils;
029
030/**
031 * <p>
032 * Simple item reader implementation that opens a JDBC cursor and continually retrieves the
033 * next row in the ResultSet.
034 * </p>
035 *
036 * <p>
037 * The statement used to open the cursor is created with the 'READ_ONLY' option since a non read-only
038 * cursor may unnecessarily lock tables or rows. It is also opened with 'TYPE_FORWARD_ONLY' option.
039 * By default the cursor will be opened using a separate connection which means that it will not participate
040 * in any transactions created as part of the step processing.
041 * </p>
042 *
043 * <p>
044 * Each call to {@link #read()} will call the provided RowMapper, passing in the
045 * ResultSet.
046 * </p>
047 *
048 * @author Lucas Ward
049 * @author Peter Zozom
050 * @author Robert Kasanicky
051 * @author Thomas Risberg
052 */
053public class JdbcCursorItemReader<T> extends AbstractCursorItemReader<T> {
054
055        PreparedStatement preparedStatement;
056
057        PreparedStatementSetter preparedStatementSetter;
058
059        String sql;
060
061        RowMapper<T> rowMapper;
062
063        public JdbcCursorItemReader() {
064                super();
065                setName(ClassUtils.getShortName(JdbcCursorItemReader.class));
066        }
067
068        /**
069         * Set the RowMapper to be used for all calls to read().
070         *
071         * @param rowMapper the mapper used to map each item
072         */
073        public void setRowMapper(RowMapper<T> rowMapper) {
074                this.rowMapper = rowMapper;
075        }
076
077        /**
078         * Set the SQL statement to be used when creating the cursor. This statement
079         * should be a complete and valid SQL statement, as it will be run directly
080         * without any modification.
081         *
082         * @param sql SQL statement
083         */
084        public void setSql(String sql) {
085                this.sql = sql;
086        }
087
088        /**
089         * Set the PreparedStatementSetter to use if any parameter values that need
090         * to be set in the supplied query.
091         *
092         * @param preparedStatementSetter PreparedStatementSetter responsible for filling out the statement
093         */
094        public void setPreparedStatementSetter(PreparedStatementSetter preparedStatementSetter) {
095                this.preparedStatementSetter = preparedStatementSetter;
096        }
097
098        /**
099         * Assert that mandatory properties are set.
100         *
101         * @throws IllegalArgumentException if either data source or SQL properties
102         * not set.
103         */
104        @Override
105        public void afterPropertiesSet() throws Exception {
106                super.afterPropertiesSet();
107                Assert.notNull(sql, "The SQL query must be provided");
108                Assert.notNull(rowMapper, "RowMapper must be provided");
109        }
110
111
112        @Override
113        protected void openCursor(Connection con) {
114                try {
115                        if (isUseSharedExtendedConnection()) {
116                                preparedStatement = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
117                                                ResultSet.HOLD_CURSORS_OVER_COMMIT);
118                        }
119                        else {
120                                preparedStatement = con.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
121                        }
122                        applyStatementSettings(preparedStatement);
123                        if (this.preparedStatementSetter != null) {
124                                preparedStatementSetter.setValues(preparedStatement);
125                        }
126                        this.rs = preparedStatement.executeQuery();
127                        handleWarnings(preparedStatement);
128                }
129                catch (SQLException se) {
130                        close();
131                        throw getExceptionTranslator().translate("Executing query", getSql(), se);
132                }
133
134        }
135
136
137        @Override
138        protected T readCursor(ResultSet rs, int currentRow) throws SQLException {
139                return rowMapper.mapRow(rs, currentRow);
140        }
141
142        /**
143         * Close the cursor and database connection.
144         */
145        @Override
146        protected void cleanupOnClose() throws Exception {
147                JdbcUtils.closeStatement(this.preparedStatement);
148        }
149
150        @Override
151        public String getSql() {
152                return this.sql;
153        }
154}