001/* 002 * Copyright 2009-2014 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.sample.common; 017 018import javax.sql.DataSource; 019 020import org.springframework.batch.item.ItemProcessor; 021import org.springframework.beans.factory.InitializingBean; 022import org.springframework.dao.OptimisticLockingFailureException; 023import org.springframework.jdbc.core.JdbcOperations; 024import org.springframework.jdbc.core.JdbcTemplate; 025import org.springframework.util.Assert; 026 027/** 028 * Marks the input row as 'processed'. (This change will rollback if there is 029 * problem later) 030 * 031 * @param <T> item type 032 * 033 * @see StagingItemReader 034 * @see StagingItemWriter 035 * @see ProcessIndicatorItemWrapper 036 * 037 * @author Robert Kasanicky 038 */ 039public class StagingItemProcessor<T> implements ItemProcessor<ProcessIndicatorItemWrapper<T>, T>, InitializingBean { 040 041 private JdbcOperations jdbcTemplate; 042 043 public void setJdbcTemplate(JdbcOperations jdbcTemplate) { 044 this.jdbcTemplate = jdbcTemplate; 045 } 046 047 public void setDataSource(DataSource dataSource) { 048 this.jdbcTemplate = new JdbcTemplate(dataSource); 049 } 050 051 @Override 052 public void afterPropertiesSet() throws Exception { 053 Assert.notNull(jdbcTemplate, "Either jdbcTemplate or dataSource must be set"); 054 } 055 056 /** 057 * Use the technical identifier to mark the input row as processed and 058 * return unwrapped item. 059 */ 060 @Override 061 public T process(ProcessIndicatorItemWrapper<T> wrapper) throws Exception { 062 063 int count = jdbcTemplate.update("UPDATE BATCH_STAGING SET PROCESSED=? WHERE ID=? AND PROCESSED=?", 064 StagingItemWriter.DONE, wrapper.getId(), StagingItemWriter.NEW); 065 if (count != 1) { 066 throw new OptimisticLockingFailureException("The staging record with ID=" + wrapper.getId() 067 + " was updated concurrently when trying to mark as complete (updated " + count + " records."); 068 } 069 return wrapper.getItem(); 070 } 071 072}