001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.step.builder;
017
018import java.lang.reflect.Method;
019import java.util.ArrayList;
020import java.util.HashSet;
021import java.util.LinkedHashSet;
022import java.util.Set;
023import java.util.function.Function;
024
025import org.springframework.batch.core.ChunkListener;
026import org.springframework.batch.core.ItemProcessListener;
027import org.springframework.batch.core.ItemReadListener;
028import org.springframework.batch.core.ItemWriteListener;
029import org.springframework.batch.core.StepExecutionListener;
030import org.springframework.batch.core.StepListener;
031import org.springframework.batch.core.annotation.AfterProcess;
032import org.springframework.batch.core.annotation.AfterRead;
033import org.springframework.batch.core.annotation.AfterWrite;
034import org.springframework.batch.core.annotation.BeforeProcess;
035import org.springframework.batch.core.annotation.BeforeRead;
036import org.springframework.batch.core.annotation.BeforeWrite;
037import org.springframework.batch.core.annotation.OnProcessError;
038import org.springframework.batch.core.annotation.OnReadError;
039import org.springframework.batch.core.annotation.OnWriteError;
040import org.springframework.batch.core.listener.StepListenerFactoryBean;
041import org.springframework.batch.core.step.item.ChunkOrientedTasklet;
042import org.springframework.batch.core.step.item.SimpleChunkProcessor;
043import org.springframework.batch.core.step.item.SimpleChunkProvider;
044import org.springframework.batch.core.step.tasklet.Tasklet;
045import org.springframework.batch.core.step.tasklet.TaskletStep;
046import org.springframework.batch.item.ItemProcessor;
047import org.springframework.batch.item.ItemReader;
048import org.springframework.batch.item.ItemStream;
049import org.springframework.batch.item.ItemWriter;
050import org.springframework.batch.item.function.FunctionItemProcessor;
051import org.springframework.batch.repeat.CompletionPolicy;
052import org.springframework.batch.repeat.RepeatOperations;
053import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
054import org.springframework.batch.repeat.support.RepeatTemplate;
055import org.springframework.batch.support.ReflectionUtils;
056import org.springframework.util.Assert;
057
058/**
059 * Step builder for simple item processing (chunk oriented) steps. Items are read and cached in chunks, and then
060 * processed (transformed) and written (optionally either the processor or the writer can be omitted) all in the same
061 * transaction.
062 *
063 * @see FaultTolerantStepBuilder for a step that handles retry and skip of failed items
064 *
065 * @author Dave Syer
066 * @author Mahmoud Ben Hassine
067 *
068 * @since 2.2
069 */
070public class SimpleStepBuilder<I, O> extends AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> {
071
072        private static final int DEFAULT_COMMIT_INTERVAL = 1;
073
074        private ItemReader<? extends I> reader;
075
076        private ItemWriter<? super O> writer;
077
078        private ItemProcessor<? super I, ? extends O> processor;
079
080        private Function<? super I, ? extends O> itemProcessorFunction;
081
082        private int chunkSize = 0;
083
084        private RepeatOperations chunkOperations;
085
086        private CompletionPolicy completionPolicy;
087
088        private Set<StepListener> itemListeners = new LinkedHashSet<>();
089
090        private boolean readerTransactionalQueue = false;
091
092        /**
093         * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used.
094         *
095         * @param parent a parent helper containing common step properties
096         */
097        public SimpleStepBuilder(StepBuilderHelper<?> parent) {
098                super(parent);
099        }
100
101        /**
102         * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used.
103         *
104         * @param parent a parent helper containing common step properties
105         */
106        protected SimpleStepBuilder(SimpleStepBuilder<I, O> parent) {
107                super(parent);
108                this.chunkSize = parent.chunkSize;
109                this.completionPolicy = parent.completionPolicy;
110                this.chunkOperations = parent.chunkOperations;
111                this.reader = parent.reader;
112                this.writer = parent.writer;
113                this.processor = parent.processor;
114                this.itemListeners = parent.itemListeners;
115                this.readerTransactionalQueue = parent.readerTransactionalQueue;
116        }
117
118        public FaultTolerantStepBuilder<I, O> faultTolerant() {
119                return new FaultTolerantStepBuilder<>(this);
120        }
121
122        /**
123         * Build a step with the reader, writer, processor as provided.
124         *
125         * @see org.springframework.batch.core.step.builder.AbstractTaskletStepBuilder#build()
126         */
127        @Override
128        public TaskletStep build() {
129
130                registerStepListenerAsItemListener();
131                registerAsStreamsAndListeners(reader, processor, writer);
132                return super.build();
133        }
134
135        protected void registerStepListenerAsItemListener() {
136                for (StepExecutionListener stepExecutionListener: properties.getStepExecutionListeners()){
137                        checkAndAddItemListener(stepExecutionListener);
138                }
139                for (ChunkListener chunkListener: this.chunkListeners){
140                        checkAndAddItemListener(chunkListener);
141                }
142        }
143
144        @SuppressWarnings("unchecked")
145        private void checkAndAddItemListener(StepListener stepListener) {
146                if (stepListener instanceof ItemReadListener){
147                        listener((ItemReadListener<I>)stepListener);
148                }
149                if (stepListener instanceof ItemProcessListener){
150                        listener((ItemProcessListener<I,O>)stepListener);
151                }
152                if (stepListener instanceof ItemWriteListener){
153                        listener((ItemWriteListener<O>)stepListener);
154                }
155        }
156
157        @Override
158        protected Tasklet createTasklet() {
159                Assert.state(reader != null, "ItemReader must be provided");
160                Assert.state(writer != null, "ItemWriter must be provided");
161                RepeatOperations repeatOperations = createChunkOperations();
162                SimpleChunkProvider<I> chunkProvider = new SimpleChunkProvider<>(getReader(), repeatOperations);
163                SimpleChunkProcessor<I, O> chunkProcessor = new SimpleChunkProcessor<>(getProcessor(), getWriter());
164                chunkProvider.setListeners(new ArrayList<>(itemListeners));
165                chunkProcessor.setListeners(new ArrayList<>(itemListeners));
166                ChunkOrientedTasklet<I> tasklet = new ChunkOrientedTasklet<>(chunkProvider, chunkProcessor);
167                tasklet.setBuffering(!readerTransactionalQueue);
168                return tasklet;
169        }
170
171        /**
172         * Sets the chunk size or commit interval for this step. This is the maximum number of items that will be read
173         * before processing starts in a single transaction. Not compatible with {@link #completionPolicy}
174         * .
175         *
176         * @param chunkSize the chunk size (a.k.a commit interval)
177         * @return this for fluent chaining
178         */
179        public SimpleStepBuilder<I, O> chunk(int chunkSize) {
180                Assert.state(completionPolicy == null || chunkSize == 0,
181                                "You must specify either a chunkCompletionPolicy or a commitInterval but not both.");
182                this.chunkSize = chunkSize;
183                return this;
184        }
185
186        /**
187         * Sets a completion policy for the chunk processing. Items are read until this policy determines that a chunk is
188         * complete, giving more control than with just the {@link #chunk(int) chunk size} (or commit interval).
189         *
190         * @param completionPolicy a completion policy for the chunk
191         * @return this for fluent chaining
192         */
193        public SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) {
194                Assert.state(chunkSize == 0 || completionPolicy == null,
195                                "You must specify either a chunkCompletionPolicy or a commitInterval but not both.");
196                this.completionPolicy = completionPolicy;
197                return this;
198        }
199
200        /**
201         * An item reader that provides a stream of items. Will be automatically registered as a {@link #stream(ItemStream)}
202         * or listener if it implements the corresponding interface. By default assumed to be non-transactional.
203         *
204         * @see #readerTransactionalQueue
205         * @param reader an item reader
206         * @return this for fluent chaining
207         */
208        public SimpleStepBuilder<I, O> reader(ItemReader<? extends I> reader) {
209                this.reader = reader;
210                return this;
211        }
212
213        /**
214         * An item writer that writes a chunk of items. Will be automatically registered as a {@link #stream(ItemStream)} or
215         * listener if it implements the corresponding interface.
216         *
217         * @param writer an item writer
218         * @return this for fluent chaining
219         */
220        public SimpleStepBuilder<I, O> writer(ItemWriter<? super O> writer) {
221                this.writer = writer;
222                return this;
223        }
224
225        /**
226         * An item processor that processes or transforms a stream of items. Will be automatically registered as a
227         * {@link #stream(ItemStream)} or listener if it implements the corresponding interface.
228         *
229         * @param processor an item processor
230         * @return this for fluent chaining
231         */
232        public SimpleStepBuilder<I, O> processor(ItemProcessor<? super I, ? extends O> processor) {
233                this.processor = processor;
234                return this;
235        }
236
237        /**
238         * A {@link Function} to be delegated to as an {@link ItemProcessor}.  If this is set,
239         * it will take precedence over any {@code ItemProcessor} configured via
240         * {@link #processor(ItemProcessor)}.
241         *
242         * @param function the function to delegate item processing to
243         * @return this for fluent chaining
244         */
245        public SimpleStepBuilder<I, O> processor(Function<? super I, ? extends O> function) {
246                this.itemProcessorFunction = function;
247                return this;
248        }
249
250        /**
251         * Sets a flag to say that the reader is transactional (usually a queue), which is to say that failed items might be
252         * rolled back and re-presented in a subsequent transaction. Default is false, meaning that the items are read
253         * outside a transaction and possibly cached.
254         *
255         * @return this for fluent chaining
256         */
257        public SimpleStepBuilder<I, O> readerIsTransactionalQueue() {
258                this.readerTransactionalQueue = true;
259                return this;
260        }
261
262        /**
263         * Registers objects using the annotation based listener configuration.
264         *
265         * @param listener the object that has a method configured with listener annotation
266         * @return this for fluent chaining
267         */
268        @SuppressWarnings("unchecked")
269        @Override
270        public SimpleStepBuilder<I, O> listener(Object listener) {
271                super.listener(listener);
272
273                Set<Method> itemListenerMethods = new HashSet<>();
274                itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeRead.class));
275                itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterRead.class));
276                itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeProcess.class));
277                itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterProcess.class));
278                itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeWrite.class));
279                itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterWrite.class));
280                itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnReadError.class));
281                itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnProcessError.class));
282                itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnWriteError.class));
283
284                if(itemListenerMethods.size() > 0) {
285                        StepListenerFactoryBean factory = new StepListenerFactoryBean();
286                        factory.setDelegate(listener);
287                        itemListeners.add((StepListener) factory.getObject());
288                }
289
290                @SuppressWarnings("unchecked")
291                SimpleStepBuilder<I, O> result = this;
292                return result;
293        }
294
295
296        /**
297         * Register an item reader listener.
298         *
299         * @param listener the listener to register
300         * @return this for fluent chaining
301         */
302        public SimpleStepBuilder<I, O> listener(ItemReadListener<? super I> listener) {
303                itemListeners.add(listener);
304                return this;
305        }
306
307        /**
308         * Register an item writer listener.
309         *
310         * @param listener the listener to register
311         * @return this for fluent chaining
312         */
313        public SimpleStepBuilder<I, O> listener(ItemWriteListener<? super O> listener) {
314                itemListeners.add(listener);
315                return this;
316        }
317
318        /**
319         * Register an item processor listener.
320         *
321         * @param listener the listener to register
322         * @return this for fluent chaining
323         */
324        public SimpleStepBuilder<I, O> listener(ItemProcessListener<? super I, ? super O> listener) {
325                itemListeners.add(listener);
326                return this;
327        }
328
329        /**
330         * Instead of a {@link #chunk(int) chunk size} or {@link #chunk(CompletionPolicy) completion policy} you can provide
331         * a complete repeat operations instance that handles the iteration over the item reader.
332         *
333         * @param repeatTemplate a complete repeat template for the chunk
334         * @return this for fluent chaining
335         */
336        public SimpleStepBuilder<I, O> chunkOperations(RepeatOperations repeatTemplate) {
337                this.chunkOperations = repeatTemplate;
338                return this;
339        }
340
341        protected RepeatOperations createChunkOperations() {
342                RepeatOperations repeatOperations = chunkOperations;
343                if (repeatOperations == null) {
344                        RepeatTemplate repeatTemplate = new RepeatTemplate();
345                        repeatTemplate.setCompletionPolicy(getChunkCompletionPolicy());
346                        repeatOperations = repeatTemplate;
347                }
348                return repeatOperations;
349        }
350
351        protected ItemReader<? extends I> getReader() {
352                return reader;
353        }
354
355        protected ItemWriter<? super O> getWriter() {
356                return writer;
357        }
358
359        protected ItemProcessor<? super I, ? extends O> getProcessor() {
360                if(this.itemProcessorFunction != null) {
361                        this.processor = new FunctionItemProcessor<>(this.itemProcessorFunction);
362                }
363
364                return processor;
365        }
366
367        protected int getChunkSize() {
368                return chunkSize;
369        }
370
371        protected boolean isReaderTransactionalQueue() {
372                return readerTransactionalQueue;
373        }
374
375        protected Set<StepListener> getItemListeners() {
376                return itemListeners;
377        }
378
379        /**
380         * @return a {@link CompletionPolicy} consistent with the chunk size and injected policy (if present).
381         */
382        protected CompletionPolicy getChunkCompletionPolicy() {
383                Assert.state(!(completionPolicy != null && chunkSize > 0),
384                                "You must specify either a chunkCompletionPolicy or a commitInterval but not both.");
385                Assert.state(chunkSize >= 0, "The commitInterval must be positive or zero (for default value).");
386
387                if (completionPolicy != null) {
388                        return completionPolicy;
389                }
390                if (chunkSize == 0) {
391                        logger.info("Setting commit interval to default value (" + DEFAULT_COMMIT_INTERVAL + ")");
392                        chunkSize = DEFAULT_COMMIT_INTERVAL;
393                }
394                return new SimpleCompletionPolicy(chunkSize);
395        }
396
397        protected void registerAsStreamsAndListeners(ItemReader<? extends I> itemReader,
398                        ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter) {
399                for (Object itemHandler : new Object[] { itemReader, itemWriter, itemProcessor }) {
400                        if (itemHandler instanceof ItemStream) {
401                                stream((ItemStream) itemHandler);
402                        }
403                        if (StepListenerFactoryBean.isListener(itemHandler)) {
404                                StepListener listener = StepListenerFactoryBean.getListener(itemHandler);
405                                if (listener instanceof StepExecutionListener) {
406                                        listener((StepExecutionListener) listener);
407                                }
408                                if (listener instanceof ChunkListener) {
409                                        listener((ChunkListener) listener);
410                                }
411                                if (listener instanceof ItemReadListener<?> || listener instanceof ItemProcessListener<?, ?>
412                                || listener instanceof ItemWriteListener<?>) {
413                                        itemListeners.add(listener);
414                                }
415                        }
416                }
417        }
418
419}