001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.step.item;
018
019import java.util.List;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.springframework.batch.core.StepContribution;
024import org.springframework.batch.core.StepListener;
025import org.springframework.batch.core.listener.MulticasterBatchListener;
026import org.springframework.batch.item.ItemReader;
027import org.springframework.batch.repeat.RepeatCallback;
028import org.springframework.batch.repeat.RepeatContext;
029import org.springframework.batch.repeat.RepeatOperations;
030import org.springframework.batch.repeat.RepeatStatus;
031import org.springframework.lang.Nullable;
032
033/**
034 * Simple implementation of the ChunkProvider interface that does basic chunk
035 * providing from an {@link ItemReader}.
036 *
037 * @author Dave Syer
038 * @author Michael Minella
039 * @author Mahmoud Ben Hassine
040 * @see ChunkOrientedTasklet
041 */
042public class SimpleChunkProvider<I> implements ChunkProvider<I> {
043
044        protected final Log logger = LogFactory.getLog(getClass());
045
046        protected final ItemReader<? extends I> itemReader;
047
048        private final MulticasterBatchListener<I, ?> listener = new MulticasterBatchListener<I, Object>();
049
050        private final RepeatOperations repeatOperations;
051
052        public SimpleChunkProvider(ItemReader<? extends I> itemReader, RepeatOperations repeatOperations) {
053                this.itemReader = itemReader;
054                this.repeatOperations = repeatOperations;
055        }
056
057        /**
058         * Register some {@link StepListener}s with the handler. Each will get the
059         * callbacks in the order specified at the correct stage.
060         *
061         * @param listeners list of {@link StepListener}s.
062         */
063        public void setListeners(List<? extends StepListener> listeners) {
064                for (StepListener listener : listeners) {
065                        registerListener(listener);
066                }
067        }
068
069        /**
070         * Register a listener for callbacks at the appropriate stages in a process.
071         *
072         * @param listener a {@link StepListener}
073         */
074        public void registerListener(StepListener listener) {
075                this.listener.register(listener);
076        }
077
078        /**
079         * @return the listener
080         */
081        protected MulticasterBatchListener<I, ?> getListener() {
082                return listener;
083        }
084
085        /**
086         * Surrounds the read call with listener callbacks.
087         * @return the item or {@code null} if the data source is exhausted
088         * @throws Exception is thrown if error occurs during read.
089         */
090        @Nullable
091        protected final I doRead() throws Exception {
092                try {
093                        listener.beforeRead();
094                        I item = itemReader.read();
095                        if(item != null) {
096                                listener.afterRead(item);
097                        }
098                        return item;
099                }
100                catch (Exception e) {
101                        if (logger.isDebugEnabled()) {
102                                logger.debug(e.getMessage() + " : " + e.getClass().getName());
103                        }
104                        listener.onReadError(e);
105                        throw e;
106                }
107        }
108
109        @Override
110        public Chunk<I> provide(final StepContribution contribution) throws Exception {
111
112                final Chunk<I> inputs = new Chunk<I>();
113                repeatOperations.iterate(new RepeatCallback() {
114
115                        @Override
116                        public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
117                                I item = null;
118                                try {
119                                        item = read(contribution, inputs);
120                                }
121                                catch (SkipOverflowException e) {
122                                        // read() tells us about an excess of skips by throwing an
123                                        // exception
124                                        return RepeatStatus.FINISHED;
125                                }
126                                if (item == null) {
127                                        inputs.setEnd();
128                                        return RepeatStatus.FINISHED;
129                                }
130                                inputs.add(item);
131                                contribution.incrementReadCount();
132                                return RepeatStatus.CONTINUABLE;
133                        }
134
135                });
136
137                return inputs;
138
139        }
140
141        @Override
142        public void postProcess(StepContribution contribution, Chunk<I> chunk) {
143                // do nothing
144        }
145
146        /**
147         * Delegates to {@link #doRead()}. Subclasses can add additional behaviour
148         * (e.g. exception handling).
149         *
150         * @param contribution the current step execution contribution
151         * @param chunk the current chunk
152         * @return a new item for processing or {@code null} if the data source is exhausted
153         *
154         * @throws SkipOverflowException if specifically the chunk is accumulating
155         * too much data (e.g. skips) and it wants to force a commit.
156         *
157         * @throws Exception if there is a generic issue
158         */
159        @Nullable
160        protected I read(StepContribution contribution, Chunk<I> chunk) throws SkipOverflowException, Exception {
161                return doRead();
162        }
163
164}