001/* 002 * Copyright 2006-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.core.step.item; 018 019import java.util.List; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.springframework.batch.core.StepContribution; 024import org.springframework.batch.core.StepListener; 025import org.springframework.batch.core.listener.MulticasterBatchListener; 026import org.springframework.batch.item.ItemReader; 027import org.springframework.batch.repeat.RepeatCallback; 028import org.springframework.batch.repeat.RepeatContext; 029import org.springframework.batch.repeat.RepeatOperations; 030import org.springframework.batch.repeat.RepeatStatus; 031import org.springframework.lang.Nullable; 032 033/** 034 * Simple implementation of the ChunkProvider interface that does basic chunk 035 * providing from an {@link ItemReader}. 036 * 037 * @author Dave Syer 038 * @author Michael Minella 039 * @author Mahmoud Ben Hassine 040 * @see ChunkOrientedTasklet 041 */ 042public class SimpleChunkProvider<I> implements ChunkProvider<I> { 043 044 protected final Log logger = LogFactory.getLog(getClass()); 045 046 protected final ItemReader<? extends I> itemReader; 047 048 private final MulticasterBatchListener<I, ?> listener = new MulticasterBatchListener<I, Object>(); 049 050 private final RepeatOperations repeatOperations; 051 052 public SimpleChunkProvider(ItemReader<? extends I> itemReader, RepeatOperations repeatOperations) { 053 this.itemReader = itemReader; 054 this.repeatOperations = repeatOperations; 055 } 056 057 /** 058 * Register some {@link StepListener}s with the handler. Each will get the 059 * callbacks in the order specified at the correct stage. 060 * 061 * @param listeners list of {@link StepListener}s. 062 */ 063 public void setListeners(List<? extends StepListener> listeners) { 064 for (StepListener listener : listeners) { 065 registerListener(listener); 066 } 067 } 068 069 /** 070 * Register a listener for callbacks at the appropriate stages in a process. 071 * 072 * @param listener a {@link StepListener} 073 */ 074 public void registerListener(StepListener listener) { 075 this.listener.register(listener); 076 } 077 078 /** 079 * @return the listener 080 */ 081 protected MulticasterBatchListener<I, ?> getListener() { 082 return listener; 083 } 084 085 /** 086 * Surrounds the read call with listener callbacks. 087 * @return the item or {@code null} if the data source is exhausted 088 * @throws Exception is thrown if error occurs during read. 089 */ 090 @Nullable 091 protected final I doRead() throws Exception { 092 try { 093 listener.beforeRead(); 094 I item = itemReader.read(); 095 if(item != null) { 096 listener.afterRead(item); 097 } 098 return item; 099 } 100 catch (Exception e) { 101 if (logger.isDebugEnabled()) { 102 logger.debug(e.getMessage() + " : " + e.getClass().getName()); 103 } 104 listener.onReadError(e); 105 throw e; 106 } 107 } 108 109 @Override 110 public Chunk<I> provide(final StepContribution contribution) throws Exception { 111 112 final Chunk<I> inputs = new Chunk<I>(); 113 repeatOperations.iterate(new RepeatCallback() { 114 115 @Override 116 public RepeatStatus doInIteration(final RepeatContext context) throws Exception { 117 I item = null; 118 try { 119 item = read(contribution, inputs); 120 } 121 catch (SkipOverflowException e) { 122 // read() tells us about an excess of skips by throwing an 123 // exception 124 return RepeatStatus.FINISHED; 125 } 126 if (item == null) { 127 inputs.setEnd(); 128 return RepeatStatus.FINISHED; 129 } 130 inputs.add(item); 131 contribution.incrementReadCount(); 132 return RepeatStatus.CONTINUABLE; 133 } 134 135 }); 136 137 return inputs; 138 139 } 140 141 @Override 142 public void postProcess(StepContribution contribution, Chunk<I> chunk) { 143 // do nothing 144 } 145 146 /** 147 * Delegates to {@link #doRead()}. Subclasses can add additional behaviour 148 * (e.g. exception handling). 149 * 150 * @param contribution the current step execution contribution 151 * @param chunk the current chunk 152 * @return a new item for processing or {@code null} if the data source is exhausted 153 * 154 * @throws SkipOverflowException if specifically the chunk is accumulating 155 * too much data (e.g. skips) and it wants to force a commit. 156 * 157 * @throws Exception if there is a generic issue 158 */ 159 @Nullable 160 protected I read(StepContribution contribution, Chunk<I> chunk) throws SkipOverflowException, Exception { 161 return doRead(); 162 } 163 164}