001/*
002 * Copyright 2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.jsr.step.item;
017
018import java.util.List;
019import java.util.concurrent.atomic.AtomicInteger;
020
021import org.apache.commons.logging.Log;
022import org.apache.commons.logging.LogFactory;
023import org.springframework.batch.core.StepContribution;
024import org.springframework.batch.core.StepListener;
025import org.springframework.batch.core.listener.MulticasterBatchListener;
026import org.springframework.batch.core.step.item.Chunk;
027import org.springframework.batch.core.step.item.ChunkProcessor;
028import org.springframework.batch.item.ItemProcessor;
029import org.springframework.batch.item.ItemReader;
030import org.springframework.batch.item.ItemWriter;
031import org.springframework.batch.repeat.RepeatCallback;
032import org.springframework.batch.repeat.RepeatContext;
033import org.springframework.batch.repeat.RepeatOperations;
034import org.springframework.batch.repeat.RepeatStatus;
035
036/**
037 * {@link ChunkProcessor} implementation that implements JSR-352's chunking pattern
038 * (read and process in a loop until the chunk is complete then write).  This
039 * implementation is responsible for all three phases of chunk based processing
040 * (reading, processing and writing).
041 *
042 * @author Michael Minella
043 *
044 * @param <I> The input type for the step
045 * @param <O> The output type for the step
046 */
047public class JsrChunkProcessor<I,O> implements ChunkProcessor<I> {
048
049        private final Log logger = LogFactory.getLog(getClass());
050        private ItemReader<? extends I> itemReader;
051        private final MulticasterBatchListener<I, O> listener = new MulticasterBatchListener<I, O>();
052        private RepeatOperations repeatTemplate;
053        private ItemProcessor<? super I, ? extends O> itemProcessor;
054        private ItemWriter<? super O> itemWriter;
055
056        public JsrChunkProcessor() {
057                this(null, null, null, null);
058        }
059
060        public JsrChunkProcessor(ItemReader<? extends I> reader, ItemProcessor<? super I, ? extends O> processor, ItemWriter<? super O> writer, RepeatOperations repeatTemplate) {
061                this.itemReader = reader;
062                this.itemProcessor = processor;
063                this.itemWriter = writer;
064                this.repeatTemplate = repeatTemplate;
065        }
066
067        protected MulticasterBatchListener<I, O> getListener() {
068                return listener;
069        }
070
071        /**
072         * Loops through reading (via {@link #provide(StepContribution, Chunk)} and
073         * processing (via {@link #transform(StepContribution, Object)}) until the chunk
074         * is complete.  Once the chunk is complete, the results are written (via
075         * {@link #persist(StepContribution, Chunk)}.
076         *
077         * @see ChunkProcessor#process(StepContribution, Chunk)
078         * @param contribution a {@link StepContribution}
079         * @param chunk a {@link Chunk}
080         */
081        @Override
082        public void process(final StepContribution contribution, final Chunk<I> chunk)
083                        throws Exception {
084
085                final AtomicInteger filterCount = new AtomicInteger(0);
086                final Chunk<O> output = new Chunk<O>();
087
088                repeatTemplate.iterate(new RepeatCallback() {
089
090                        @Override
091                        public RepeatStatus doInIteration(RepeatContext context) throws Exception {
092                                I item = provide(contribution, chunk);
093
094                                if(item != null) {
095                                        contribution.incrementReadCount();
096                                } else {
097                                        return RepeatStatus.FINISHED;
098                                }
099
100                                O processedItem = transform(contribution, item);
101
102                                if(processedItem == null) {
103                                        filterCount.incrementAndGet();
104                                } else {
105                                        output.add(processedItem);
106                                }
107
108                                return RepeatStatus.CONTINUABLE;
109                        }
110                });
111
112                contribution.incrementFilterCount(filterCount.get());
113                if(output.size() > 0) {
114                        persist(contribution, output);
115                }
116        }
117
118        /**
119         * Register some {@link StepListener}s with the handler. Each will get the
120         * callbacks in the order specified at the correct stage.
121         *
122         * @param listeners list of listeners to be used within this step
123         */
124        public void setListeners(List<? extends StepListener> listeners) {
125                for (StepListener listener : listeners) {
126                        registerListener(listener);
127                }
128        }
129
130        /**
131         * Register a listener for callbacks at the appropriate stages in a process.
132         *
133         * @param listener a {@link StepListener}
134         */
135        public void registerListener(StepListener listener) {
136                this.listener.register(listener);
137        }
138
139        /**
140         * Responsible for the reading portion of the chunking loop.  In this implementation, delegates
141         * to {@link #doProvide(StepContribution, Chunk)}
142         *
143         * @param contribution a {@link StepContribution}
144         * @param chunk a {@link Chunk}
145         * @return an item
146         * @throws Exception thrown if error occurs during the reading portion of the chunking loop.
147         */
148        protected I provide(final StepContribution contribution, final Chunk<I> chunk) throws Exception {
149                return doProvide(contribution, chunk);
150        }
151
152        /**
153         * Implements reading as well as any related listener calls required.
154         *
155         * @param contribution a {@link StepContribution}
156         * @param chunk a {@link Chunk}
157         * @return an item
158         * @throws Exception thrown if error occurs during reading or listener calls.
159         */
160        protected final I doProvide(final StepContribution contribution, final Chunk<I> chunk) throws Exception {
161                try {
162                        listener.beforeRead();
163                        I item = itemReader.read();
164                        if(item != null) {
165                                listener.afterRead(item);
166                        } else {
167                                chunk.setEnd();
168                        }
169
170                        return item;
171                }
172                catch (Exception e) {
173                        if (logger.isDebugEnabled()) {
174                                logger.debug(e.getMessage() + " : " + e.getClass().getName());
175                        }
176                        listener.onReadError(e);
177                        throw e;
178                }
179        }
180
181        /**
182         * Responsible for the processing portion of the chunking loop.  In this implementation, delegates to the
183         * {@link #doTransform(Object)} if a processor is available (returns the item unmodified if it is not)
184         *
185         * @param contribution a {@link StepContribution}
186         * @param item an item
187         * @return a processed item if a processor is present (the unmodified item if it is not)
188         * @throws Exception thrown if error occurs during the processing portion of the chunking loop.
189         */
190        protected O transform(final StepContribution contribution, final I item) throws Exception {
191                if (itemProcessor == null) {
192                        @SuppressWarnings("unchecked")
193                        O result = (O) item;
194                        return result;
195                }
196
197                return doTransform(item);
198        }
199
200        /**
201         * Implements processing and all related listener calls.
202         *
203         * @param item the item to be processed
204         * @return the processed item
205         * @throws Exception thrown if error occurs during processing.
206         */
207        protected final O doTransform(I item) throws Exception {
208                try {
209                        listener.beforeProcess(item);
210                        O result = itemProcessor.process(item);
211                        listener.afterProcess(item, result);
212                        return result;
213                }
214                catch (Exception e) {
215                        listener.onProcessError(item, e);
216                        throw e;
217                }
218        }
219
220        /**
221         * Responsible for the writing portion of the chunking loop.  In this implementation, delegates to the
222         * {{@link #doPersist(StepContribution, Chunk)}.
223         *
224         * @param contribution a {@link StepContribution}
225         * @param chunk a {@link Chunk}
226         * @throws Exception thrown if error occurs during the writing portion of the chunking loop.
227         */
228        protected void persist(final StepContribution contribution, final Chunk<O> chunk) throws Exception {
229                doPersist(contribution, chunk);
230
231                contribution.incrementWriteCount(chunk.getItems().size());
232        }
233
234        /**
235         * Implements writing and all related listener calls
236         *
237         * @param contribution a {@link StepContribution}
238         * @param chunk a {@link Chunk}
239         * @throws Exception thrown if error occurs during the writing portion of the chunking loop.
240         */
241        protected final void doPersist(final StepContribution contribution, final Chunk<O> chunk) throws Exception {
242                try {
243                        List<O> items = chunk.getItems();
244                        listener.beforeWrite(items);
245                        itemWriter.write(items);
246                        listener.afterWrite(items);
247                }
248                catch (Exception e) {
249                        listener.onWriteError(e, chunk.getItems());
250                        throw e;
251                }
252        }
253}