001/*
002 * Copyright 2009-2014 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.sample.common;
017
018import javax.sql.DataSource;
019
020import org.springframework.batch.item.ItemProcessor;
021import org.springframework.beans.factory.InitializingBean;
022import org.springframework.dao.OptimisticLockingFailureException;
023import org.springframework.jdbc.core.JdbcOperations;
024import org.springframework.jdbc.core.JdbcTemplate;
025import org.springframework.util.Assert;
026
027/**
028 * Marks the input row as 'processed'. (This change will rollback if there is
029 * problem later)
030 *
031 * @param <T> item type
032 *
033 * @see StagingItemReader
034 * @see StagingItemWriter
035 * @see ProcessIndicatorItemWrapper
036 *
037 * @author Robert Kasanicky
038 */
039public class StagingItemProcessor<T> implements ItemProcessor<ProcessIndicatorItemWrapper<T>, T>, InitializingBean {
040
041        private JdbcOperations jdbcTemplate;
042
043        public void setJdbcTemplate(JdbcOperations jdbcTemplate) {
044                this.jdbcTemplate = jdbcTemplate;
045        }
046
047        public void setDataSource(DataSource dataSource) {
048                this.jdbcTemplate = new JdbcTemplate(dataSource);
049        }
050
051        @Override
052        public void afterPropertiesSet() throws Exception {
053                Assert.notNull(jdbcTemplate, "Either jdbcTemplate or dataSource must be set");
054        }
055
056        /**
057         * Use the technical identifier to mark the input row as processed and
058         * return unwrapped item.
059         */
060        @Override
061        public T process(ProcessIndicatorItemWrapper<T> wrapper) throws Exception {
062
063                int count = jdbcTemplate.update("UPDATE BATCH_STAGING SET PROCESSED=? WHERE ID=? AND PROCESSED=?",
064                                StagingItemWriter.DONE, wrapper.getId(), StagingItemWriter.NEW);
065                if (count != 1) {
066                        throw new OptimisticLockingFailureException("The staging record with ID=" + wrapper.getId()
067                                        + " was updated concurrently when trying to mark as complete (updated " + count + " records.");
068                }
069                return wrapper.getItem();
070        }
071
072}