001/* 002 * Copyright 2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.jsr.step.item; 017 018import java.util.List; 019import java.util.concurrent.atomic.AtomicInteger; 020 021import org.apache.commons.logging.Log; 022import org.apache.commons.logging.LogFactory; 023import org.springframework.batch.core.StepContribution; 024import org.springframework.batch.core.StepListener; 025import org.springframework.batch.core.listener.MulticasterBatchListener; 026import org.springframework.batch.core.step.item.Chunk; 027import org.springframework.batch.core.step.item.ChunkProcessor; 028import org.springframework.batch.item.ItemProcessor; 029import org.springframework.batch.item.ItemReader; 030import org.springframework.batch.item.ItemWriter; 031import org.springframework.batch.repeat.RepeatCallback; 032import org.springframework.batch.repeat.RepeatContext; 033import org.springframework.batch.repeat.RepeatOperations; 034import org.springframework.batch.repeat.RepeatStatus; 035 036/** 037 * {@link ChunkProcessor} implementation that implements JSR-352's chunking pattern 038 * (read and process in a loop until the chunk is complete then write). This 039 * implementation is responsible for all three phases of chunk based processing 040 * (reading, processing and writing). 041 * 042 * @author Michael Minella 043 * 044 * @param <I> The input type for the step 045 * @param <O> The output type for the step 046 */ 047public class JsrChunkProcessor<I,O> implements ChunkProcessor<I> { 048 049 private final Log logger = LogFactory.getLog(getClass()); 050 private ItemReader<? extends I> itemReader; 051 private final MulticasterBatchListener<I, O> listener = new MulticasterBatchListener<I, O>(); 052 private RepeatOperations repeatTemplate; 053 private ItemProcessor<? super I, ? extends O> itemProcessor; 054 private ItemWriter<? super O> itemWriter; 055 056 public JsrChunkProcessor() { 057 this(null, null, null, null); 058 } 059 060 public JsrChunkProcessor(ItemReader<? extends I> reader, ItemProcessor<? super I, ? extends O> processor, ItemWriter<? super O> writer, RepeatOperations repeatTemplate) { 061 this.itemReader = reader; 062 this.itemProcessor = processor; 063 this.itemWriter = writer; 064 this.repeatTemplate = repeatTemplate; 065 } 066 067 protected MulticasterBatchListener<I, O> getListener() { 068 return listener; 069 } 070 071 /** 072 * Loops through reading (via {@link #provide(StepContribution, Chunk)} and 073 * processing (via {@link #transform(StepContribution, Object)}) until the chunk 074 * is complete. Once the chunk is complete, the results are written (via 075 * {@link #persist(StepContribution, Chunk)}. 076 * 077 * @see ChunkProcessor#process(StepContribution, Chunk) 078 * @param contribution a {@link StepContribution} 079 * @param chunk a {@link Chunk} 080 */ 081 @Override 082 public void process(final StepContribution contribution, final Chunk<I> chunk) 083 throws Exception { 084 085 final AtomicInteger filterCount = new AtomicInteger(0); 086 final Chunk<O> output = new Chunk<O>(); 087 088 repeatTemplate.iterate(new RepeatCallback() { 089 090 @Override 091 public RepeatStatus doInIteration(RepeatContext context) throws Exception { 092 I item = provide(contribution, chunk); 093 094 if(item != null) { 095 contribution.incrementReadCount(); 096 } else { 097 return RepeatStatus.FINISHED; 098 } 099 100 O processedItem = transform(contribution, item); 101 102 if(processedItem == null) { 103 filterCount.incrementAndGet(); 104 } else { 105 output.add(processedItem); 106 } 107 108 return RepeatStatus.CONTINUABLE; 109 } 110 }); 111 112 contribution.incrementFilterCount(filterCount.get()); 113 if(output.size() > 0) { 114 persist(contribution, output); 115 } 116 } 117 118 /** 119 * Register some {@link StepListener}s with the handler. Each will get the 120 * callbacks in the order specified at the correct stage. 121 * 122 * @param listeners list of listeners to be used within this step 123 */ 124 public void setListeners(List<? extends StepListener> listeners) { 125 for (StepListener listener : listeners) { 126 registerListener(listener); 127 } 128 } 129 130 /** 131 * Register a listener for callbacks at the appropriate stages in a process. 132 * 133 * @param listener a {@link StepListener} 134 */ 135 public void registerListener(StepListener listener) { 136 this.listener.register(listener); 137 } 138 139 /** 140 * Responsible for the reading portion of the chunking loop. In this implementation, delegates 141 * to {@link #doProvide(StepContribution, Chunk)} 142 * 143 * @param contribution a {@link StepContribution} 144 * @param chunk a {@link Chunk} 145 * @return an item 146 * @throws Exception thrown if error occurs during the reading portion of the chunking loop. 147 */ 148 protected I provide(final StepContribution contribution, final Chunk<I> chunk) throws Exception { 149 return doProvide(contribution, chunk); 150 } 151 152 /** 153 * Implements reading as well as any related listener calls required. 154 * 155 * @param contribution a {@link StepContribution} 156 * @param chunk a {@link Chunk} 157 * @return an item 158 * @throws Exception thrown if error occurs during reading or listener calls. 159 */ 160 protected final I doProvide(final StepContribution contribution, final Chunk<I> chunk) throws Exception { 161 try { 162 listener.beforeRead(); 163 I item = itemReader.read(); 164 if(item != null) { 165 listener.afterRead(item); 166 } else { 167 chunk.setEnd(); 168 } 169 170 return item; 171 } 172 catch (Exception e) { 173 if (logger.isDebugEnabled()) { 174 logger.debug(e.getMessage() + " : " + e.getClass().getName()); 175 } 176 listener.onReadError(e); 177 throw e; 178 } 179 } 180 181 /** 182 * Responsible for the processing portion of the chunking loop. In this implementation, delegates to the 183 * {@link #doTransform(Object)} if a processor is available (returns the item unmodified if it is not) 184 * 185 * @param contribution a {@link StepContribution} 186 * @param item an item 187 * @return a processed item if a processor is present (the unmodified item if it is not) 188 * @throws Exception thrown if error occurs during the processing portion of the chunking loop. 189 */ 190 protected O transform(final StepContribution contribution, final I item) throws Exception { 191 if (itemProcessor == null) { 192 @SuppressWarnings("unchecked") 193 O result = (O) item; 194 return result; 195 } 196 197 return doTransform(item); 198 } 199 200 /** 201 * Implements processing and all related listener calls. 202 * 203 * @param item the item to be processed 204 * @return the processed item 205 * @throws Exception thrown if error occurs during processing. 206 */ 207 protected final O doTransform(I item) throws Exception { 208 try { 209 listener.beforeProcess(item); 210 O result = itemProcessor.process(item); 211 listener.afterProcess(item, result); 212 return result; 213 } 214 catch (Exception e) { 215 listener.onProcessError(item, e); 216 throw e; 217 } 218 } 219 220 /** 221 * Responsible for the writing portion of the chunking loop. In this implementation, delegates to the 222 * {{@link #doPersist(StepContribution, Chunk)}. 223 * 224 * @param contribution a {@link StepContribution} 225 * @param chunk a {@link Chunk} 226 * @throws Exception thrown if error occurs during the writing portion of the chunking loop. 227 */ 228 protected void persist(final StepContribution contribution, final Chunk<O> chunk) throws Exception { 229 doPersist(contribution, chunk); 230 231 contribution.incrementWriteCount(chunk.getItems().size()); 232 } 233 234 /** 235 * Implements writing and all related listener calls 236 * 237 * @param contribution a {@link StepContribution} 238 * @param chunk a {@link Chunk} 239 * @throws Exception thrown if error occurs during the writing portion of the chunking loop. 240 */ 241 protected final void doPersist(final StepContribution contribution, final Chunk<O> chunk) throws Exception { 242 try { 243 List<O> items = chunk.getItems(); 244 listener.beforeWrite(items); 245 itemWriter.write(items); 246 listener.afterWrite(items); 247 } 248 catch (Exception e) { 249 listener.onWriteError(e, chunk.getItems()); 250 throw e; 251 } 252 } 253}