001/* 002 * Copyright 2006-2014 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.sample.common; 018 019import java.sql.PreparedStatement; 020import java.sql.SQLException; 021import java.util.List; 022import java.util.ListIterator; 023 024import org.springframework.batch.core.ExitStatus; 025import org.springframework.batch.core.StepExecution; 026import org.springframework.batch.core.StepExecutionListener; 027import org.springframework.batch.item.ItemWriter; 028import org.springframework.jdbc.core.BatchPreparedStatementSetter; 029import org.springframework.jdbc.core.support.JdbcDaoSupport; 030import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer; 031import org.springframework.util.Assert; 032import org.springframework.util.ClassUtils; 033import org.springframework.util.SerializationUtils; 034 035/** 036 * Database {@link ItemWriter} implementing the process indicator pattern. 037 */ 038public class StagingItemWriter<T> extends JdbcDaoSupport implements StepExecutionListener, ItemWriter<T> { 039 040 protected static final String NEW = "N"; 041 042 protected static final String DONE = "Y"; 043 044 private DataFieldMaxValueIncrementer incrementer; 045 046 private StepExecution stepExecution; 047 048 /** 049 * Check mandatory properties. 050 * 051 * @see org.springframework.dao.support.DaoSupport#initDao() 052 */ 053 @Override 054 protected void initDao() throws Exception { 055 super.initDao(); 056 Assert.notNull(incrementer, "DataFieldMaxValueIncrementer is required - set the incrementer property in the " 057 + ClassUtils.getShortName(StagingItemWriter.class)); 058 } 059 060 /** 061 * Setter for the key generator for the staging table. 062 * 063 * @param incrementer the {@link DataFieldMaxValueIncrementer} to set 064 */ 065 public void setIncrementer(DataFieldMaxValueIncrementer incrementer) { 066 this.incrementer = incrementer; 067 } 068 069 /** 070 * Serialize the item to the staging table, and add a NEW processed flag. 071 * 072 * @see ItemWriter#write(java.util.List) 073 */ 074 @Override 075 public void write(final List<? extends T> items) { 076 final ListIterator<? extends T> itemIterator = items.listIterator(); 077 078 getJdbcTemplate().batchUpdate("INSERT into BATCH_STAGING (ID, JOB_ID, VALUE, PROCESSED) values (?,?,?,?)", 079 new BatchPreparedStatementSetter() { 080 @Override 081 public int getBatchSize() { 082 return items.size(); 083 } 084 085 @Override 086 public void setValues(PreparedStatement ps, int i) throws SQLException { 087 Assert.state(itemIterator.nextIndex() == i, "Item ordering must be preserved in batch sql update"); 088 089 ps.setLong(1, incrementer.nextLongValue()); 090 ps.setLong(2, stepExecution.getJobExecution().getJobId()); 091 ps.setBytes(3, SerializationUtils.serialize(itemIterator.next())); 092 ps.setString(4, NEW); 093 } 094 }); 095 } 096 097 /* 098 * (non-Javadoc) 099 * 100 * @see 101 * org.springframework.batch.core.domain.StepListener#afterStep(StepExecution 102 * ) 103 */ 104 @Override 105 public ExitStatus afterStep(StepExecution stepExecution) { 106 return null; 107 } 108 109 /* 110 * (non-Javadoc) 111 * 112 * @see org.springframework.batch.core.domain.StepListener#beforeStep(org. 113 * springframework.batch.core.domain.StepExecution) 114 */ 115 @Override 116 public void beforeStep(StepExecution stepExecution) { 117 this.stepExecution = stepExecution; 118 } 119}