001/*
002 * Copyright 2006-2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.step.item;
018
019import org.apache.commons.logging.Log;
020import org.apache.commons.logging.LogFactory;
021import org.springframework.batch.core.StepContribution;
022import org.springframework.batch.core.scope.context.ChunkContext;
023import org.springframework.batch.core.step.tasklet.Tasklet;
024import org.springframework.batch.repeat.RepeatStatus;
025
026/**
027 * A {@link Tasklet} implementing variations on read-process-write item
028 * handling.
029 *
030 * @author Dave Syer
031 *
032 * @param <I> input item type
033 */
034public class ChunkOrientedTasklet<I> implements Tasklet {
035
036        private static final String INPUTS_KEY = "INPUTS";
037
038        private final ChunkProcessor<I> chunkProcessor;
039
040        private final ChunkProvider<I> chunkProvider;
041
042        private boolean buffering = true;
043
044        private static Log logger = LogFactory.getLog(ChunkOrientedTasklet.class);
045
046        public ChunkOrientedTasklet(ChunkProvider<I> chunkProvider, ChunkProcessor<I> chunkProcessor) {
047                this.chunkProvider = chunkProvider;
048                this.chunkProcessor = chunkProcessor;
049        }
050
051        /**
052         * Flag to indicate that items should be buffered once read. Defaults to
053         * true, which is appropriate for forward-only, non-transactional item
054         * readers. Main (or only) use case for setting this flag to false is a
055         * transactional JMS item reader.
056         *
057         * @param buffering indicator
058         */
059        public void setBuffering(boolean buffering) {
060                this.buffering = buffering;
061        }
062
063        @Override
064        public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
065
066                @SuppressWarnings("unchecked")
067                Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
068                if (inputs == null) {
069                        inputs = chunkProvider.provide(contribution);
070                        if (buffering) {
071                                chunkContext.setAttribute(INPUTS_KEY, inputs);
072                        }
073                }
074
075                chunkProcessor.process(contribution, inputs);
076                chunkProvider.postProcess(contribution, inputs);
077
078                // Allow a message coming back from the processor to say that we
079                // are not done yet
080                if (inputs.isBusy()) {
081                        logger.debug("Inputs still busy");
082                        return RepeatStatus.CONTINUABLE;
083                }
084
085                chunkContext.removeAttribute(INPUTS_KEY);
086                chunkContext.setComplete();
087
088                if (logger.isDebugEnabled()) {
089                        logger.debug("Inputs not busy, ended: " + inputs.isEnd());
090                }
091                return RepeatStatus.continueIf(!inputs.isEnd());
092
093        }
094
095}