001/*
002 * Copyright 2012-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.step.builder;
017
018import java.lang.reflect.Method;
019import java.util.HashSet;
020import java.util.LinkedHashSet;
021import java.util.Set;
022
023import org.springframework.batch.core.ChunkListener;
024import org.springframework.batch.core.Step;
025import org.springframework.batch.core.StepExecutionListener;
026import org.springframework.batch.core.annotation.AfterChunk;
027import org.springframework.batch.core.annotation.AfterChunkError;
028import org.springframework.batch.core.annotation.BeforeChunk;
029import org.springframework.batch.core.listener.StepListenerFactoryBean;
030import org.springframework.batch.core.step.tasklet.Tasklet;
031import org.springframework.batch.core.step.tasklet.TaskletStep;
032import org.springframework.batch.item.ItemStream;
033import org.springframework.batch.repeat.RepeatOperations;
034import org.springframework.batch.repeat.exception.DefaultExceptionHandler;
035import org.springframework.batch.repeat.exception.ExceptionHandler;
036import org.springframework.batch.repeat.support.RepeatTemplate;
037import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate;
038import org.springframework.batch.support.ReflectionUtils;
039import org.springframework.core.task.SyncTaskExecutor;
040import org.springframework.core.task.TaskExecutor;
041import org.springframework.transaction.interceptor.TransactionAttribute;
042
043/**
044 * Base class for step builders that want to build a {@link TaskletStep}. Handles common concerns across all tasklet
045 * step variants, which are mostly to do with the type of tasklet they carry.
046 *
047 * @author Dave Syer
048 * @author Michael Minella
049 * @author Mahmoud Ben Hassine
050 *
051 * @since 2.2
052 *
053 * @param <B> the type of builder represented
054 */
055public abstract class AbstractTaskletStepBuilder<B extends AbstractTaskletStepBuilder<B>> extends
056StepBuilderHelper<AbstractTaskletStepBuilder<B>> {
057
058        protected Set<ChunkListener> chunkListeners = new LinkedHashSet<ChunkListener>();
059
060        private RepeatOperations stepOperations;
061
062        private TransactionAttribute transactionAttribute;
063
064        private Set<ItemStream> streams = new LinkedHashSet<ItemStream>();
065
066        private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
067
068        private int throttleLimit = TaskExecutorRepeatTemplate.DEFAULT_THROTTLE_LIMIT;
069
070        private TaskExecutor taskExecutor;
071
072        public AbstractTaskletStepBuilder(StepBuilderHelper<?> parent) {
073                super(parent);
074        }
075
076        protected abstract Tasklet createTasklet();
077
078        /**
079         * Build the step from the components collected by the fluent setters. Delegates first to {@link #enhance(Step)} and
080         * then to {@link #createTasklet()} in subclasses to create the actual tasklet.
081         *
082         * @return a tasklet step fully configured and ready to execute
083         */
084        public TaskletStep build() {
085
086                registerStepListenerAsChunkListener();
087
088                TaskletStep step = new TaskletStep(getName());
089
090                super.enhance(step);
091
092                step.setChunkListeners(chunkListeners.toArray(new ChunkListener[0]));
093
094                if (transactionAttribute != null) {
095                        step.setTransactionAttribute(transactionAttribute);
096                }
097
098                if (stepOperations == null) {
099
100                        stepOperations = new RepeatTemplate();
101
102                        if (taskExecutor != null) {
103                                TaskExecutorRepeatTemplate repeatTemplate = new TaskExecutorRepeatTemplate();
104                                repeatTemplate.setTaskExecutor(taskExecutor);
105                                repeatTemplate.setThrottleLimit(throttleLimit);
106                                stepOperations = repeatTemplate;
107                        }
108
109                        ((RepeatTemplate) stepOperations).setExceptionHandler(exceptionHandler);
110
111                }
112                step.setStepOperations(stepOperations);
113                step.setTasklet(createTasklet());
114
115                step.setStreams(streams.toArray(new ItemStream[0]));
116
117                try {
118                        step.afterPropertiesSet();
119                }
120                catch (Exception e) {
121                        throw new StepBuilderException(e);
122                }
123
124                return step;
125
126        }
127
128        protected void registerStepListenerAsChunkListener() {
129                for (StepExecutionListener stepExecutionListener: properties.getStepExecutionListeners()){
130                        if (stepExecutionListener instanceof ChunkListener){
131                                listener((ChunkListener)stepExecutionListener);
132                        }
133                }
134        }
135
136        /**
137         * Register a chunk listener.
138         *
139         * @param listener the listener to register
140         * @return this for fluent chaining
141         */
142        public AbstractTaskletStepBuilder<B> listener(ChunkListener listener) {
143                chunkListeners.add(listener);
144                return this;
145        }
146
147        /**
148         * Registers objects using the annotation based listener configuration.
149         *
150         * @param listener the object that has a method configured with listener annotation
151         * @return this for fluent chaining
152         */
153        @Override
154        public B listener(Object listener) {
155                super.listener(listener);
156
157                Set<Method> chunkListenerMethods = new HashSet<>();
158                chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeChunk.class));
159                chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterChunk.class));
160                chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterChunkError.class));
161
162                if(chunkListenerMethods.size() > 0) {
163                        StepListenerFactoryBean factory = new StepListenerFactoryBean();
164                        factory.setDelegate(listener);
165                        this.listener((ChunkListener) factory.getObject());
166                }
167
168                @SuppressWarnings("unchecked")
169                B result = (B) this;
170                return result;
171        }
172
173        /**
174         * Register a stream for callbacks that manage restart data.
175         *
176         * @param stream the stream to register
177         * @return this for fluent chaining
178         */
179        public AbstractTaskletStepBuilder<B> stream(ItemStream stream) {
180                streams.add(stream);
181                return this;
182        }
183
184        /**
185         * Provide a task executor to use when executing the tasklet. Default is to use a single-threaded (synchronous)
186         * executor.
187         *
188         * @param taskExecutor the task executor to register
189         * @return this for fluent chaining
190         */
191        public AbstractTaskletStepBuilder<B> taskExecutor(TaskExecutor taskExecutor) {
192                this.taskExecutor = taskExecutor;
193                return this;
194        }
195
196        /**
197         * In the case of an asynchronous {@link #taskExecutor(TaskExecutor)} the number of concurrent tasklet executions
198         * can be throttled (beyond any throttling provided by a thread pool). The throttle limit should be less than the
199         * data source pool size used in the job repository for this step.
200         *
201         * @param throttleLimit maximum number of concurrent tasklet executions allowed
202         * @return this for fluent chaining
203         */
204        public AbstractTaskletStepBuilder<B> throttleLimit(int throttleLimit) {
205                this.throttleLimit = throttleLimit;
206                return this;
207        }
208
209        /**
210         * Sets the exception handler to use in the case of tasklet failures. Default is to rethrow everything.
211         *
212         * @param exceptionHandler the exception handler
213         * @return this for fluent chaining
214         */
215        public AbstractTaskletStepBuilder<B> exceptionHandler(ExceptionHandler exceptionHandler) {
216                this.exceptionHandler = exceptionHandler;
217                return this;
218        }
219
220        /**
221         * Sets the repeat template used for iterating the tasklet execution. By default it will terminate only when the
222         * tasklet returns FINISHED (or null).
223         *
224         * @param repeatTemplate a repeat template with rules for iterating
225         * @return this for fluent chaining
226         */
227        public AbstractTaskletStepBuilder<B> stepOperations(RepeatOperations repeatTemplate) {
228                this.stepOperations = repeatTemplate;
229                return this;
230        }
231
232        /**
233         * Sets the transaction attributes for the tasklet execution. Defaults to the default values for the transaction
234         * manager, but can be manipulated to provide longer timeouts for instance.
235         *
236         * @param transactionAttribute a transaction attribute set
237         * @return this for fluent chaining
238         */
239        public AbstractTaskletStepBuilder<B> transactionAttribute(TransactionAttribute transactionAttribute) {
240                this.transactionAttribute = transactionAttribute;
241                return this;
242        }
243
244        /**
245         * Convenience method for subclasses to access the step operations that were injected by user.
246         *
247         * @return the repeat operations used to iterate the tasklet executions
248         */
249        protected RepeatOperations getStepOperations() {
250                return stepOperations;
251        }
252
253        /**
254         * Convenience method for subclasses to access the exception handler that was injected by user.
255         *
256         * @return the exception handler
257         */
258        protected ExceptionHandler getExceptionHandler() {
259                return exceptionHandler;
260        }
261
262        /**
263         * Convenience method for subclasses to determine if the step is concurrent.
264         *
265         * @return true if the tasklet is going to be run in multiple threads
266         */
267        protected boolean concurrent() {
268                boolean concurrent = taskExecutor != null && !(taskExecutor instanceof SyncTaskExecutor);
269                return concurrent;
270        }
271
272        protected TaskExecutor getTaskExecutor() {
273                return taskExecutor;
274        }
275
276        protected int getThrottleLimit() {
277                return throttleLimit;
278        }
279
280        protected TransactionAttribute getTransactionAttribute() {
281                return transactionAttribute;
282        }
283
284        protected Set<ItemStream> getStreams() {
285                return this.streams;
286        }
287
288}