001/*
002 * Copyright 2006-2014 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.sample.common;
018
019import java.sql.PreparedStatement;
020import java.sql.SQLException;
021import java.util.List;
022import java.util.ListIterator;
023
024import org.springframework.batch.core.ExitStatus;
025import org.springframework.batch.core.StepExecution;
026import org.springframework.batch.core.StepExecutionListener;
027import org.springframework.batch.item.ItemWriter;
028import org.springframework.jdbc.core.BatchPreparedStatementSetter;
029import org.springframework.jdbc.core.support.JdbcDaoSupport;
030import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
031import org.springframework.util.Assert;
032import org.springframework.util.ClassUtils;
033import org.springframework.util.SerializationUtils;
034
035/**
036 * Database {@link ItemWriter} implementing the process indicator pattern.
037 */
038public class StagingItemWriter<T> extends JdbcDaoSupport implements StepExecutionListener, ItemWriter<T> {
039
040        protected static final String NEW = "N";
041
042        protected static final String DONE = "Y";
043
044        private DataFieldMaxValueIncrementer incrementer;
045
046        private StepExecution stepExecution;
047
048        /**
049         * Check mandatory properties.
050         *
051         * @see org.springframework.dao.support.DaoSupport#initDao()
052         */
053        @Override
054        protected void initDao() throws Exception {
055                super.initDao();
056                Assert.notNull(incrementer, "DataFieldMaxValueIncrementer is required - set the incrementer property in the "
057                                + ClassUtils.getShortName(StagingItemWriter.class));
058        }
059
060        /**
061         * Setter for the key generator for the staging table.
062         *
063         * @param incrementer the {@link DataFieldMaxValueIncrementer} to set
064         */
065        public void setIncrementer(DataFieldMaxValueIncrementer incrementer) {
066                this.incrementer = incrementer;
067        }
068
069        /**
070         * Serialize the item to the staging table, and add a NEW processed flag.
071         *
072         * @see ItemWriter#write(java.util.List)
073         */
074        @Override
075        public void write(final List<? extends T> items) {
076                final ListIterator<? extends T> itemIterator = items.listIterator();
077
078                getJdbcTemplate().batchUpdate("INSERT into BATCH_STAGING (ID, JOB_ID, VALUE, PROCESSED) values (?,?,?,?)",
079                                new BatchPreparedStatementSetter() {
080                        @Override
081                        public int getBatchSize() {
082                                return items.size();
083                        }
084
085                        @Override
086                        public void setValues(PreparedStatement ps, int i) throws SQLException {
087                                Assert.state(itemIterator.nextIndex() == i, "Item ordering must be preserved in batch sql update");
088
089                                ps.setLong(1, incrementer.nextLongValue());
090                                ps.setLong(2, stepExecution.getJobExecution().getJobId());
091                                ps.setBytes(3, SerializationUtils.serialize(itemIterator.next()));
092                                ps.setString(4, NEW);
093                        }
094                });
095        }
096
097        /*
098         * (non-Javadoc)
099         *
100         * @see
101         * org.springframework.batch.core.domain.StepListener#afterStep(StepExecution
102         * )
103         */
104        @Override
105        public ExitStatus afterStep(StepExecution stepExecution) {
106                return null;
107        }
108
109        /*
110         * (non-Javadoc)
111         *
112         * @see org.springframework.batch.core.domain.StepListener#beforeStep(org.
113         * springframework.batch.core.domain.StepExecution)
114         */
115        @Override
116        public void beforeStep(StepExecution stepExecution) {
117                this.stepExecution = stepExecution;
118        }
119}