001/* 002 * Copyright 2006-2014 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.sample.common; 018 019import java.sql.ResultSet; 020import java.sql.SQLException; 021import java.util.Iterator; 022import java.util.List; 023 024import javax.sql.DataSource; 025 026import org.apache.commons.logging.Log; 027import org.apache.commons.logging.LogFactory; 028 029import org.springframework.batch.core.ExitStatus; 030import org.springframework.batch.core.StepExecution; 031import org.springframework.batch.core.StepExecutionListener; 032import org.springframework.batch.item.ItemReader; 033import org.springframework.batch.item.ReaderNotOpenException; 034import org.springframework.beans.factory.DisposableBean; 035import org.springframework.beans.factory.InitializingBean; 036import org.springframework.jdbc.core.JdbcOperations; 037import org.springframework.jdbc.core.JdbcTemplate; 038import org.springframework.jdbc.core.RowMapper; 039import org.springframework.util.Assert; 040import org.springframework.util.SerializationUtils; 041 042/** 043 * Thread-safe database {@link ItemReader} implementing the process indicator 044 * pattern. 045 * 046 * To achieve restartability use together with {@link StagingItemProcessor}. 047 */ 048public class StagingItemReader<T> implements ItemReader<ProcessIndicatorItemWrapper<T>>, StepExecutionListener, 049InitializingBean, DisposableBean { 050 051 private static Log logger = LogFactory.getLog(StagingItemReader.class); 052 053 private StepExecution stepExecution; 054 055 private final Object lock = new Object(); 056 057 private volatile boolean initialized = false; 058 059 private volatile Iterator<Long> keys; 060 061 private JdbcOperations jdbcTemplate; 062 063 public void setDataSource(DataSource dataSource) { 064 jdbcTemplate = new JdbcTemplate(dataSource); 065 } 066 067 @Override 068 public void destroy() throws Exception { 069 initialized = false; 070 keys = null; 071 } 072 073 @Override 074 public final void afterPropertiesSet() throws Exception { 075 Assert.notNull(jdbcTemplate, "You must provide a DataSource."); 076 } 077 078 private List<Long> retrieveKeys() { 079 080 synchronized (lock) { 081 082 return jdbcTemplate.query( 083 084 "SELECT ID FROM BATCH_STAGING WHERE JOB_ID=? AND PROCESSED=? ORDER BY ID", 085 086 new RowMapper<Long>() { 087 @Override 088 public Long mapRow(ResultSet rs, int rowNum) throws SQLException { 089 return rs.getLong(1); 090 } 091 }, 092 093 stepExecution.getJobExecution().getJobId(), StagingItemWriter.NEW); 094 095 } 096 097 } 098 099 @Override 100 public ProcessIndicatorItemWrapper<T> read() { 101 if (!initialized) { 102 throw new ReaderNotOpenException("Reader must be open before it can be used."); 103 } 104 105 Long id = null; 106 synchronized (lock) { 107 if (keys.hasNext()) { 108 id = keys.next(); 109 } 110 } 111 logger.debug("Retrieved key from list: " + id); 112 113 if (id == null) { 114 return null; 115 } 116 @SuppressWarnings("unchecked") 117 T result = (T) jdbcTemplate.queryForObject("SELECT VALUE FROM BATCH_STAGING WHERE ID=?", 118 new RowMapper<Object>() { 119 @Override 120 public Object mapRow(ResultSet rs, int rowNum) throws SQLException { 121 byte[] blob = rs.getBytes(1); 122 return SerializationUtils.deserialize(blob); 123 } 124 }, id); 125 126 return new ProcessIndicatorItemWrapper<T>(id, result); 127 } 128 129 @Override 130 public ExitStatus afterStep(StepExecution stepExecution) { 131 return null; 132 } 133 134 @Override 135 public void beforeStep(StepExecution stepExecution) { 136 this.stepExecution = stepExecution; 137 synchronized (lock) { 138 if (keys == null) { 139 keys = retrieveKeys().iterator(); 140 logger.info("Keys obtained for staging."); 141 initialized = true; 142 } 143 } 144 } 145 146}