001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.core.step.item;
018
019import java.util.List;
020
021import org.springframework.batch.core.StepContribution;
022import org.springframework.batch.core.StepListener;
023import org.springframework.batch.core.listener.MulticasterBatchListener;
024import org.springframework.batch.item.ItemProcessor;
025import org.springframework.batch.item.ItemWriter;
026import org.springframework.beans.factory.InitializingBean;
027import org.springframework.lang.Nullable;
028import org.springframework.util.Assert;
029
030/**
031 * Simple implementation of the {@link ChunkProcessor} interface that handles
032 * basic item writing and processing. Any exceptions encountered will be
033 * rethrown.
034 *
035 * @see ChunkOrientedTasklet
036 */
037public class SimpleChunkProcessor<I, O> implements ChunkProcessor<I>, InitializingBean {
038
039        private ItemProcessor<? super I, ? extends O> itemProcessor;
040
041        private ItemWriter<? super O> itemWriter;
042
043        private final MulticasterBatchListener<I, O> listener = new MulticasterBatchListener<I, O>();
044
045        /**
046         * Default constructor for ease of configuration.
047         */
048        @SuppressWarnings("unused")
049        private SimpleChunkProcessor() {
050                this(null, null);
051        }
052
053        public SimpleChunkProcessor(@Nullable ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter) {
054                this.itemProcessor = itemProcessor;
055                this.itemWriter = itemWriter;
056        }
057
058        public SimpleChunkProcessor(ItemWriter<? super O> itemWriter) {
059                this(null, itemWriter);
060        }
061
062        /**
063         * @param itemProcessor the {@link ItemProcessor} to set
064         */
065        public void setItemProcessor(ItemProcessor<? super I, ? extends O> itemProcessor) {
066                this.itemProcessor = itemProcessor;
067        }
068
069        /**
070         * @param itemWriter the {@link ItemWriter} to set
071         */
072        public void setItemWriter(ItemWriter<? super O> itemWriter) {
073                this.itemWriter = itemWriter;
074        }
075
076        /**
077         * Check mandatory properties.
078         *
079         * @see InitializingBean#afterPropertiesSet()
080         */
081        @Override
082        public void afterPropertiesSet() throws Exception {
083                Assert.notNull(itemWriter, "ItemWriter must be set");
084        }
085
086        /**
087         * Register some {@link StepListener}s with the handler. Each will get the
088         * callbacks in the order specified at the correct stage.
089         *
090         * @param listeners list of {@link StepListener} instances.
091         */
092        public void setListeners(List<? extends StepListener> listeners) {
093                for (StepListener listener : listeners) {
094                        registerListener(listener);
095                }
096        }
097
098        /**
099         * Register a listener for callbacks at the appropriate stages in a process.
100         *
101         * @param listener a {@link StepListener}
102         */
103        public void registerListener(StepListener listener) {
104                this.listener.register(listener);
105        }
106
107        /**
108         * @return the listener
109         */
110        protected MulticasterBatchListener<I, O> getListener() {
111                return listener;
112        }
113
114        /**
115         * @param item the input item
116         * @return the result of the processing
117         * @throws Exception thrown if error occurs.
118         */
119        protected final O doProcess(I item) throws Exception {
120
121                if (itemProcessor == null) {
122                        @SuppressWarnings("unchecked")
123                        O result = (O) item;
124                        return result;
125                }
126
127                try {
128                        listener.beforeProcess(item);
129                        O result = itemProcessor.process(item);
130                        listener.afterProcess(item, result);
131                        return result;
132                }
133                catch (Exception e) {
134                        listener.onProcessError(item, e);
135                        throw e;
136                }
137
138        }
139
140        /**
141         * Surrounds the actual write call with listener callbacks.
142         *
143         * @param items list of items to be written.
144         * @throws Exception thrown if error occurs.
145         */
146        protected final void doWrite(List<O> items) throws Exception {
147
148                if (itemWriter == null) {
149                        return;
150                }
151
152                try {
153                        listener.beforeWrite(items);
154                        writeItems(items);
155                        doAfterWrite(items);
156                }
157                catch (Exception e) {
158                        doOnWriteError(e, items);
159                        throw e;
160                }
161
162        }
163
164        /**
165         * Call the listener's after write method.
166         *
167         * @param items list of items that were just written.
168         */
169        protected final void doAfterWrite(List<O> items) {
170                listener.afterWrite(items);
171        }
172
173        /**
174         * Call listener's writerError method.
175         * @param e exception that occurred.
176         * @param items list of items that failed to be written.
177         */
178        protected final void doOnWriteError(Exception e, List<O> items) {
179                listener.onWriteError(e, items);
180        }
181
182        /**
183         * @param items list of items to be written.
184         * @throws Exception thrown if error occurs.
185         */
186        protected void writeItems(List<O> items) throws Exception {
187                if (itemWriter != null) {
188                        itemWriter.write(items);
189                }
190        }
191
192        @Override
193        public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {
194
195                // Allow temporary state to be stored in the user data field
196                initializeUserData(inputs);
197
198                // If there is no input we don't have to do anything more
199                if (isComplete(inputs)) {
200                        return;
201                }
202
203                // Make the transformation, calling remove() on the inputs iterator if
204                // any items are filtered. Might throw exception and cause rollback.
205                Chunk<O> outputs = transform(contribution, inputs);
206
207                // Adjust the filter count based on available data
208                contribution.incrementFilterCount(getFilterCount(inputs, outputs));
209
210                // Adjust the outputs if necessary for housekeeping purposes, and then
211                // write them out...
212                write(contribution, inputs, getAdjustedOutputs(inputs, outputs));
213
214        }
215
216        /**
217         * Extension point for subclasses to allow them to memorise the contents of
218         * the inputs, in case they are needed for accounting purposes later. The
219         * default implementation sets up some user data to remember the original
220         * size of the inputs. If this method is overridden then some or all of
221         * {@link #isComplete(Chunk)}, {@link #getFilterCount(Chunk, Chunk)} and
222         * {@link #getAdjustedOutputs(Chunk, Chunk)} might also need to be, to
223         * ensure that the user data is handled consistently.
224         *
225         * @param inputs the inputs for the process
226         */
227        protected void initializeUserData(Chunk<I> inputs) {
228                inputs.setUserData(inputs.size());
229        }
230
231        /**
232         * Extension point for subclasses to calculate the filter count. Defaults to
233         * the difference between input size and output size.
234         *
235         * @param inputs the inputs after transformation
236         * @param outputs the outputs after transformation
237         *
238         * @return the difference in sizes
239         *
240         * @see #initializeUserData(Chunk)
241         */
242        protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) {
243                return (Integer) inputs.getUserData() - outputs.size();
244        }
245
246        /**
247         * Extension point for subclasses that want to store additional data in the
248         * inputs. Default just checks if inputs are empty.
249         *
250         * @param inputs the input chunk
251         * @return true if it is empty
252         *
253         * @see #initializeUserData(Chunk)
254         */
255        protected boolean isComplete(Chunk<I> inputs) {
256                return inputs.isEmpty();
257        }
258
259        /**
260         * Extension point for subclasses that want to adjust the outputs based on
261         * additional saved data in the inputs. Default implementation just returns
262         * the outputs unchanged.
263         *
264         * @param inputs the inputs for the transformation
265         * @param outputs the result of the transformation
266         * @return the outputs unchanged
267         *
268         * @see #initializeUserData(Chunk)
269         */
270        protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) {
271                return outputs;
272        }
273
274        /**
275         * Simple implementation delegates to the {@link #doWrite(List)} method and
276         * increments the write count in the contribution. Subclasses can handle
277         * more complicated scenarios, e.g.with fault tolerance. If output items are
278         * skipped they should be removed from the inputs as well.
279         *
280         * @param contribution the current step contribution
281         * @param inputs the inputs that gave rise to the outputs
282         * @param outputs the outputs to write
283         * @throws Exception if there is a problem
284         */
285        protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception {
286                try {
287                        doWrite(outputs.getItems());
288                }
289                catch (Exception e) {
290                        /*
291                         * For a simple chunk processor (no fault tolerance) we are done
292                         * here, so prevent any more processing of these inputs.
293                         */
294                        inputs.clear();
295                        throw e;
296                }
297                contribution.incrementWriteCount(outputs.size());
298        }
299
300        protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
301                Chunk<O> outputs = new Chunk<O>();
302                for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
303                        final I item = iterator.next();
304                        O output;
305                        try {
306                                output = doProcess(item);
307                        }
308                        catch (Exception e) {
309                                /*
310                                 * For a simple chunk processor (no fault tolerance) we are done
311                                 * here, so prevent any more processing of these inputs.
312                                 */
313                                inputs.clear();
314                                throw e;
315                        }
316                        if (output != null) {
317                                outputs.add(output);
318                        }
319                        else {
320                                iterator.remove();
321                        }
322                }
323                return outputs;
324        }
325
326}