001/* 002 * Copyright 2006-2013 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.core.step.item; 018 019import org.apache.commons.logging.Log; 020import org.apache.commons.logging.LogFactory; 021import org.springframework.batch.core.StepContribution; 022import org.springframework.batch.core.scope.context.ChunkContext; 023import org.springframework.batch.core.step.tasklet.Tasklet; 024import org.springframework.batch.repeat.RepeatStatus; 025 026/** 027 * A {@link Tasklet} implementing variations on read-process-write item 028 * handling. 029 * 030 * @author Dave Syer 031 * 032 * @param <I> input item type 033 */ 034public class ChunkOrientedTasklet<I> implements Tasklet { 035 036 private static final String INPUTS_KEY = "INPUTS"; 037 038 private final ChunkProcessor<I> chunkProcessor; 039 040 private final ChunkProvider<I> chunkProvider; 041 042 private boolean buffering = true; 043 044 private static Log logger = LogFactory.getLog(ChunkOrientedTasklet.class); 045 046 public ChunkOrientedTasklet(ChunkProvider<I> chunkProvider, ChunkProcessor<I> chunkProcessor) { 047 this.chunkProvider = chunkProvider; 048 this.chunkProcessor = chunkProcessor; 049 } 050 051 /** 052 * Flag to indicate that items should be buffered once read. Defaults to 053 * true, which is appropriate for forward-only, non-transactional item 054 * readers. Main (or only) use case for setting this flag to false is a 055 * transactional JMS item reader. 056 * 057 * @param buffering indicator 058 */ 059 public void setBuffering(boolean buffering) { 060 this.buffering = buffering; 061 } 062 063 @Override 064 public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { 065 066 @SuppressWarnings("unchecked") 067 Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY); 068 if (inputs == null) { 069 inputs = chunkProvider.provide(contribution); 070 if (buffering) { 071 chunkContext.setAttribute(INPUTS_KEY, inputs); 072 } 073 } 074 075 chunkProcessor.process(contribution, inputs); 076 chunkProvider.postProcess(contribution, inputs); 077 078 // Allow a message coming back from the processor to say that we 079 // are not done yet 080 if (inputs.isBusy()) { 081 logger.debug("Inputs still busy"); 082 return RepeatStatus.CONTINUABLE; 083 } 084 085 chunkContext.removeAttribute(INPUTS_KEY); 086 chunkContext.setComplete(); 087 088 if (logger.isDebugEnabled()) { 089 logger.debug("Inputs not busy, ended: " + inputs.isEnd()); 090 } 091 return RepeatStatus.continueIf(!inputs.isEnd()); 092 093 } 094 095}