001/*
002 * Copyright 2006-2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.item.database;
017
018import java.sql.PreparedStatement;
019import java.sql.SQLException;
020import java.util.ArrayList;
021import java.util.List;
022import java.util.Map;
023import javax.sql.DataSource;
024
025import org.apache.commons.logging.Log;
026import org.apache.commons.logging.LogFactory;
027
028import org.springframework.batch.item.ItemWriter;
029import org.springframework.beans.factory.InitializingBean;
030import org.springframework.dao.DataAccessException;
031import org.springframework.dao.EmptyResultDataAccessException;
032import org.springframework.dao.InvalidDataAccessApiUsageException;
033import org.springframework.jdbc.core.PreparedStatementCallback;
034import org.springframework.jdbc.core.namedparam.NamedParameterJdbcOperations;
035import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
036import org.springframework.jdbc.core.namedparam.SqlParameterSource;
037import org.springframework.util.Assert;
038
039/**
040 * <p>{@link ItemWriter} that uses the batching features from
041 * {@link NamedParameterJdbcTemplate} to execute a batch of statements for all items
042 * provided.</p>
043 *
044 * The user must provide an SQL query and a special callback for either of
045 * {@link ItemPreparedStatementSetter} or {@link ItemSqlParameterSourceProvider}.
046 * You can use either named parameters or the traditional '?' placeholders. If you use the
047 * named parameter support then you should provide a {@link ItemSqlParameterSourceProvider},
048 * otherwise you should provide a  {@link ItemPreparedStatementSetter}.
049 * This callback would be responsible for mapping the item to the parameters needed to
050 * execute the SQL statement.<br>
051 *
052 * It is expected that {@link #write(List)} is called inside a transaction.<br>
053 *
054 * The writer is thread-safe after its properties are set (normal singleton
055 * behavior), so it can be used to write in multiple concurrent transactions.
056 *
057 * @author Dave Syer
058 * @author Thomas Risberg
059 * @author Michael Minella
060 * @since 2.0
061 */
062public class JdbcBatchItemWriter<T> implements ItemWriter<T>, InitializingBean {
063
064        protected static final Log logger = LogFactory.getLog(JdbcBatchItemWriter.class);
065
066        private NamedParameterJdbcOperations namedParameterJdbcTemplate;
067
068        private ItemPreparedStatementSetter<T> itemPreparedStatementSetter;
069
070        private ItemSqlParameterSourceProvider<T> itemSqlParameterSourceProvider;
071
072        private String sql;
073
074        private boolean assertUpdates = true;
075
076        private int parameterCount;
077
078        private boolean usingNamedParameters;
079
080        /**
081         * Public setter for the flag that determines whether an assertion is made
082         * that all items cause at least one row to be updated.
083         * @param assertUpdates the flag to set. Defaults to true;
084         */
085        public void setAssertUpdates(boolean assertUpdates) {
086                this.assertUpdates = assertUpdates;
087        }
088
089        /**
090         * Public setter for the query string to execute on write. The parameters
091         * should correspond to those known to the
092         * {@link ItemPreparedStatementSetter}.
093         * @param sql the query to set
094         */
095        public void setSql(String sql) {
096                this.sql = sql;
097        }
098
099        /**
100         * Public setter for the {@link ItemPreparedStatementSetter}.
101         * @param preparedStatementSetter the {@link ItemPreparedStatementSetter} to
102         * set. This is required when using traditional '?' placeholders for the SQL statement.
103         */
104        public void setItemPreparedStatementSetter(ItemPreparedStatementSetter<T> preparedStatementSetter) {
105                this.itemPreparedStatementSetter = preparedStatementSetter;
106        }
107
108        /**
109         * Public setter for the {@link ItemSqlParameterSourceProvider}.
110         * @param itemSqlParameterSourceProvider the {@link ItemSqlParameterSourceProvider} to
111         * set. This is required when using named parameters for the SQL statement and the type
112         * to be written does not implement {@link Map}.
113         */
114        public void setItemSqlParameterSourceProvider(ItemSqlParameterSourceProvider<T> itemSqlParameterSourceProvider) {
115                this.itemSqlParameterSourceProvider = itemSqlParameterSourceProvider;
116        }
117
118        /**
119         * Public setter for the data source for injection purposes.
120         *
121         * @param dataSource {@link javax.sql.DataSource} to use for querying against
122         */
123        public void setDataSource(DataSource dataSource) {
124                if (namedParameterJdbcTemplate == null) {
125                        this.namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
126                }
127        }
128
129        /**
130         * Public setter for the {@link NamedParameterJdbcOperations}.
131         * @param namedParameterJdbcTemplate the {@link NamedParameterJdbcOperations} to set
132         */
133        public void setJdbcTemplate(NamedParameterJdbcOperations namedParameterJdbcTemplate) {
134                this.namedParameterJdbcTemplate = namedParameterJdbcTemplate;
135        }
136
137        /**
138         * Check mandatory properties - there must be a SimpleJdbcTemplate and an SQL statement plus a
139         * parameter source.
140         */
141        @Override
142        public void afterPropertiesSet() {
143                Assert.notNull(namedParameterJdbcTemplate, "A DataSource or a NamedParameterJdbcTemplate is required.");
144                Assert.notNull(sql, "An SQL statement is required.");
145                List<String> namedParameters = new ArrayList<>();
146                parameterCount = JdbcParameterUtils.countParameterPlaceholders(sql, namedParameters);
147                if (namedParameters.size() > 0) {
148                        if (parameterCount != namedParameters.size()) {
149                                throw new InvalidDataAccessApiUsageException("You can't use both named parameters and classic \"?\" placeholders: " + sql);
150                        }
151                        usingNamedParameters = true;
152                }
153                if (!usingNamedParameters) {
154                        Assert.notNull(itemPreparedStatementSetter, "Using SQL statement with '?' placeholders requires an ItemPreparedStatementSetter");
155                }
156        }
157
158        /* (non-Javadoc)
159         * @see org.springframework.batch.item.ItemWriter#write(java.util.List)
160         */
161        @SuppressWarnings({"unchecked", "rawtypes"})
162        @Override
163        public void write(final List<? extends T> items) throws Exception {
164
165                if (!items.isEmpty()) {
166
167                        if (logger.isDebugEnabled()) {
168                                logger.debug("Executing batch with " + items.size() + " items.");
169                        }
170
171                        int[] updateCounts;
172
173                        if (usingNamedParameters) {
174                                if(items.get(0) instanceof Map && this.itemSqlParameterSourceProvider == null) {
175                                        updateCounts = namedParameterJdbcTemplate.batchUpdate(sql, items.toArray(new Map[items.size()]));
176                                } else {
177                                        SqlParameterSource[] batchArgs = new SqlParameterSource[items.size()];
178                                        int i = 0;
179                                        for (T item : items) {
180                                                batchArgs[i++] = itemSqlParameterSourceProvider.createSqlParameterSource(item);
181                                        }
182                                        updateCounts = namedParameterJdbcTemplate.batchUpdate(sql, batchArgs);
183                                }
184                        }
185                        else {
186                                updateCounts = namedParameterJdbcTemplate.getJdbcOperations().execute(sql, new PreparedStatementCallback<int[]>() {
187                                        @Override
188                                        public int[] doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
189                                                for (T item : items) {
190                                                        itemPreparedStatementSetter.setValues(item, ps);
191                                                        ps.addBatch();
192                                                }
193                                                return ps.executeBatch();
194                                        }
195                                });
196                        }
197
198                        if (assertUpdates) {
199                                for (int i = 0; i < updateCounts.length; i++) {
200                                        int value = updateCounts[i];
201                                        if (value == 0) {
202                                                throw new EmptyResultDataAccessException("Item " + i + " of " + updateCounts.length
203                                                                + " did not update any rows: [" + items.get(i) + "]", 1);
204                                        }
205                                }
206                        }
207                }
208        }
209}