001/*
002 * Copyright 2006-2013 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.step.factory;
017
018import org.apache.commons.logging.Log;
019import org.apache.commons.logging.LogFactory;
020import org.springframework.batch.core.ChunkListener;
021import org.springframework.batch.core.ItemProcessListener;
022import org.springframework.batch.core.ItemReadListener;
023import org.springframework.batch.core.ItemWriteListener;
024import org.springframework.batch.core.Step;
025import org.springframework.batch.core.StepExecutionListener;
026import org.springframework.batch.core.StepListener;
027import org.springframework.batch.core.repository.JobRepository;
028import org.springframework.batch.core.step.builder.SimpleStepBuilder;
029import org.springframework.batch.core.step.builder.StepBuilder;
030import org.springframework.batch.core.step.tasklet.TaskletStep;
031import org.springframework.batch.item.ItemProcessor;
032import org.springframework.batch.item.ItemReader;
033import org.springframework.batch.item.ItemStream;
034import org.springframework.batch.item.ItemWriter;
035import org.springframework.batch.repeat.CompletionPolicy;
036import org.springframework.batch.repeat.RepeatOperations;
037import org.springframework.batch.repeat.exception.DefaultExceptionHandler;
038import org.springframework.batch.repeat.exception.ExceptionHandler;
039import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
040import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate;
041import org.springframework.beans.factory.BeanNameAware;
042import org.springframework.beans.factory.FactoryBean;
043import org.springframework.core.task.TaskExecutor;
044import org.springframework.transaction.PlatformTransactionManager;
045import org.springframework.transaction.annotation.Isolation;
046import org.springframework.transaction.annotation.Propagation;
047import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
048import org.springframework.transaction.interceptor.TransactionAttribute;
049
050/**
051 * Most common configuration options for simple steps should be found here. Use this factory bean instead of creating a
052 * {@link Step} implementation manually.
053 *
054 * This factory does not support configuration of fault-tolerant behavior, use appropriate subclass of this factory bean
055 * to configure skip or retry.
056 *
057 * @see FaultTolerantStepFactoryBean
058 *
059 * @author Dave Syer
060 * @author Robert Kasanicky
061 *
062 */
063public class SimpleStepFactoryBean<T, S> implements FactoryBean<Step>, BeanNameAware {
064
065        private String name;
066
067        private int startLimit = Integer.MAX_VALUE;
068
069        private boolean allowStartIfComplete;
070
071        private ItemReader<? extends T> itemReader;
072
073        private ItemProcessor<? super T, ? extends S> itemProcessor;
074
075        private ItemWriter<? super S> itemWriter;
076
077        private PlatformTransactionManager transactionManager;
078
079        private Propagation propagation = Propagation.REQUIRED;
080
081        private Isolation isolation = Isolation.DEFAULT;
082
083        private int transactionTimeout = DefaultTransactionAttribute.TIMEOUT_DEFAULT;
084
085        private JobRepository jobRepository;
086
087        private boolean singleton = true;
088
089        private ItemStream[] streams = new ItemStream[0];
090
091        private StepListener[] listeners = new StepListener[0];
092
093        protected final Log logger = LogFactory.getLog(getClass());
094
095        private int commitInterval = 0;
096
097        private TaskExecutor taskExecutor;
098
099        private RepeatOperations stepOperations;
100
101        private RepeatOperations chunkOperations;
102
103        private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
104
105        private CompletionPolicy chunkCompletionPolicy;
106
107        private int throttleLimit = TaskExecutorRepeatTemplate.DEFAULT_THROTTLE_LIMIT;
108
109        private boolean isReaderTransactionalQueue = false;
110
111        /**
112         * Default constructor for {@link SimpleStepFactoryBean}.
113         */
114        public SimpleStepFactoryBean() {
115                super();
116        }
117
118        /**
119         * Flag to signal that the reader is transactional (usually a JMS consumer) so that items are re-presented after a
120         * rollback. The default is false and readers are assumed to be forward-only.
121         *
122         * @param isReaderTransactionalQueue the value of the flag
123         */
124        public void setIsReaderTransactionalQueue(boolean isReaderTransactionalQueue) {
125                this.isReaderTransactionalQueue = isReaderTransactionalQueue;
126        }
127
128        /**
129         * Convenience method for subclasses.
130         * @return true if the flag is set (default false)
131         */
132        protected boolean isReaderTransactionalQueue() {
133                return isReaderTransactionalQueue;
134        }
135
136        /**
137         * Set the bean name property, which will become the name of the {@link Step} when it is created.
138         *
139         * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String)
140         */
141        @Override
142        public void setBeanName(String name) {
143                this.name = name;
144        }
145
146        /**
147         * Public getter for the name of the step.
148         * @return the name
149         */
150        public String getName() {
151                return name;
152        }
153
154        /**
155         * The timeout for an individual transaction in the step.
156         *
157         * @param transactionTimeout the transaction timeout to set, defaults to infinite
158         */
159        public void setTransactionTimeout(int transactionTimeout) {
160                this.transactionTimeout = transactionTimeout;
161        }
162
163        /**
164         * @param propagation the propagation to set for business transactions
165         */
166        public void setPropagation(Propagation propagation) {
167                this.propagation = propagation;
168        }
169
170        /**
171         * @param isolation the isolation to set for business transactions
172         */
173        public void setIsolation(Isolation isolation) {
174                this.isolation = isolation;
175        }
176
177        /**
178         * Public setter for the start limit for the step.
179         *
180         * @param startLimit the startLimit to set
181         */
182        public void setStartLimit(int startLimit) {
183                this.startLimit = startLimit;
184        }
185
186        /**
187         * Public setter for the flag to indicate that the step should be replayed on a restart, even if successful the
188         * first time.
189         *
190         * @param allowStartIfComplete the shouldAllowStartIfComplete to set
191         */
192        public void setAllowStartIfComplete(boolean allowStartIfComplete) {
193                this.allowStartIfComplete = allowStartIfComplete;
194        }
195
196        /**
197         * @param itemReader the {@link ItemReader} to set
198         */
199        public void setItemReader(ItemReader<? extends T> itemReader) {
200                this.itemReader = itemReader;
201        }
202
203        /**
204         * @param itemWriter the {@link ItemWriter} to set
205         */
206        public void setItemWriter(ItemWriter<? super S> itemWriter) {
207                this.itemWriter = itemWriter;
208        }
209
210        /**
211         * @param itemProcessor the {@link ItemProcessor} to set
212         */
213        public void setItemProcessor(ItemProcessor<? super T, ? extends S> itemProcessor) {
214                this.itemProcessor = itemProcessor;
215        }
216
217        /**
218         * The streams to inject into the {@link Step}. Any instance of {@link ItemStream} can be used, and will then
219         * receive callbacks at the appropriate stage in the step.
220         *
221         * @param streams an array of listeners
222         */
223        public void setStreams(ItemStream[] streams) {
224                this.streams = streams;
225        }
226
227        /**
228         * The listeners to inject into the {@link Step}. Any instance of {@link StepListener} can be used, and will then
229         * receive callbacks at the appropriate stage in the step.
230         *
231         * @param listeners an array of listeners
232         */
233        public void setListeners(StepListener[] listeners) {
234                this.listeners = listeners;
235        }
236
237        /**
238         * Protected getter for the {@link StepListener}s.
239         * @return the listeners
240         */
241        protected StepListener[] getListeners() {
242                return listeners;
243        }
244
245        /**
246         * Protected getter for the {@link ItemReader} for subclasses to use.
247         * @return the itemReader
248         */
249        protected ItemReader<? extends T> getItemReader() {
250                return itemReader;
251        }
252
253        /**
254         * Protected getter for the {@link ItemWriter} for subclasses to use
255         * @return the itemWriter
256         */
257        protected ItemWriter<? super S> getItemWriter() {
258                return itemWriter;
259        }
260
261        /**
262         * Protected getter for the {@link ItemProcessor} for subclasses to use
263         * @return the itemProcessor
264         */
265        protected ItemProcessor<? super T, ? extends S> getItemProcessor() {
266                return itemProcessor;
267        }
268
269        /**
270         * Public setter for {@link JobRepository}.
271         *
272         * @param jobRepository is a mandatory dependence (no default).
273         */
274        public void setJobRepository(JobRepository jobRepository) {
275                this.jobRepository = jobRepository;
276        }
277
278        /**
279         * Public setter for the {@link PlatformTransactionManager}.
280         *
281         * @param transactionManager the transaction manager to set
282         */
283        public void setTransactionManager(PlatformTransactionManager transactionManager) {
284                this.transactionManager = transactionManager;
285        }
286
287        /**
288         * Getter for the {@link TransactionAttribute} for subclasses only.
289         * @return the transactionAttribute
290         */
291        @SuppressWarnings("serial")
292        protected TransactionAttribute getTransactionAttribute() {
293
294                DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
295                attribute.setPropagationBehavior(propagation.value());
296                attribute.setIsolationLevel(isolation.value());
297                attribute.setTimeout(transactionTimeout);
298                return new DefaultTransactionAttribute(attribute) {
299
300                        /**
301                         * Ignore the default behaviour and rollback on all exceptions that bubble up to the tasklet level. The
302                         * tasklet has to deal with the rollback rules internally.
303                         */
304                        @Override
305                        public boolean rollbackOn(Throwable ex) {
306                                return true;
307                        }
308
309                };
310
311        }
312
313        /**
314         * Create a {@link Step} from the configuration provided.
315         *
316         * @see FactoryBean#getObject()
317         */
318        @Override
319        public final Step getObject() throws Exception {
320                SimpleStepBuilder<T, S> builder = createBuilder(getName());
321                applyConfiguration(builder);
322                TaskletStep step = builder.build();
323                return step;
324        }
325
326        protected SimpleStepBuilder<T, S> createBuilder(String name) {
327                return new SimpleStepBuilder<>(new StepBuilder(name));
328        }
329
330        @Override
331        public Class<TaskletStep> getObjectType() {
332                return TaskletStep.class;
333        }
334
335        /**
336         * Returns true by default, but in most cases a {@link Step} should <b>not</b> be treated as thread-safe. Clients are
337         * recommended to create a new step for each job execution.
338         *
339         * @see org.springframework.beans.factory.FactoryBean#isSingleton()
340         */
341        @Override
342        public boolean isSingleton() {
343                return this.singleton;
344        }
345
346        /**
347         * Public setter for the singleton flag.
348         * @param singleton the value to set. Defaults to true.
349         */
350        public void setSingleton(boolean singleton) {
351                this.singleton = singleton;
352        }
353
354        /**
355         * Set the commit interval. Either set this or the chunkCompletionPolicy but not both.
356         *
357         * @param commitInterval 1 by default
358         */
359        public void setCommitInterval(int commitInterval) {
360                this.commitInterval = commitInterval;
361        }
362
363        /**
364         * Public setter for the {@link CompletionPolicy} applying to the chunk level. A transaction will be committed when
365         * this policy decides to complete. Defaults to a {@link SimpleCompletionPolicy} with chunk size equal to the
366         * commitInterval property.
367         *
368         * @param chunkCompletionPolicy the chunkCompletionPolicy to set
369         */
370        public void setChunkCompletionPolicy(CompletionPolicy chunkCompletionPolicy) {
371                this.chunkCompletionPolicy = chunkCompletionPolicy;
372        }
373
374        /**
375         * Protected getter for the step operations to make them available in subclasses.
376         * @return the step operations
377         */
378        protected RepeatOperations getStepOperations() {
379                return stepOperations;
380        }
381
382        /**
383         * Public setter for the stepOperations.
384         * @param stepOperations the stepOperations to set
385         */
386        public void setStepOperations(RepeatOperations stepOperations) {
387                this.stepOperations = stepOperations;
388        }
389
390        /**
391         * Public setter for the chunkOperations.
392         * @param chunkOperations the chunkOperations to set
393         */
394        public void setChunkOperations(RepeatOperations chunkOperations) {
395                this.chunkOperations = chunkOperations;
396        }
397
398        /**
399         * Protected getter for the chunk operations to make them available in subclasses.
400         * @return the step operations
401         */
402        protected RepeatOperations getChunkOperations() {
403                return chunkOperations;
404        }
405
406        /**
407         * Public setter for the {@link ExceptionHandler}.
408         * @param exceptionHandler the exceptionHandler to set
409         */
410        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
411                this.exceptionHandler = exceptionHandler;
412        }
413
414        /**
415         * Protected getter for the {@link ExceptionHandler}.
416         * @return the {@link ExceptionHandler}
417         */
418        protected ExceptionHandler getExceptionHandler() {
419                return exceptionHandler;
420        }
421
422        /**
423         * Public setter for the {@link TaskExecutor}. If this is set, then it will be used to execute the chunk processing
424         * inside the {@link Step}.
425         *
426         * @param taskExecutor the taskExecutor to set
427         */
428        public void setTaskExecutor(TaskExecutor taskExecutor) {
429                this.taskExecutor = taskExecutor;
430        }
431
432        /**
433         * Make the {@link TaskExecutor} available to subclasses
434         * @return the taskExecutor to be used to execute chunks
435         */
436        protected TaskExecutor getTaskExecutor() {
437                return taskExecutor;
438        }
439
440        /**
441         * Public setter for the throttle limit. This limits the number of tasks queued for concurrent processing to prevent
442         * thread pools from being overwhelmed. Defaults to {@link TaskExecutorRepeatTemplate#DEFAULT_THROTTLE_LIMIT}.
443         * @param throttleLimit the throttle limit to set.
444         */
445        public void setThrottleLimit(int throttleLimit) {
446                this.throttleLimit = throttleLimit;
447        }
448
449        protected void applyConfiguration(SimpleStepBuilder<T, S> builder) {
450
451                builder.reader(itemReader);
452                builder.processor(itemProcessor);
453                builder.writer(itemWriter);
454                for (StepExecutionListener listener : BatchListenerFactoryHelper.<StepExecutionListener> getListeners(
455                                listeners, StepExecutionListener.class)) {
456                        builder.listener(listener);
457                }
458                for (ChunkListener listener : BatchListenerFactoryHelper.<ChunkListener> getListeners(listeners,
459                                ChunkListener.class)) {
460                        builder.listener(listener);
461                }
462                for (ItemReadListener<T> listener : BatchListenerFactoryHelper.<ItemReadListener<T>> getListeners(listeners,
463                                ItemReadListener.class)) {
464                        builder.listener(listener);
465                }
466                for (ItemWriteListener<S> listener : BatchListenerFactoryHelper.<ItemWriteListener<S>> getListeners(listeners,
467                                ItemWriteListener.class)) {
468                        builder.listener(listener);
469                }
470                for (ItemProcessListener<T, S> listener : BatchListenerFactoryHelper.<ItemProcessListener<T, S>> getListeners(
471                                listeners, ItemProcessListener.class)) {
472                        builder.listener(listener);
473                }
474                builder.transactionManager(transactionManager);
475                builder.transactionAttribute(getTransactionAttribute());
476                builder.repository(jobRepository);
477                builder.startLimit(startLimit);
478                builder.allowStartIfComplete(allowStartIfComplete);
479                builder.chunk(commitInterval);
480                builder.chunk(chunkCompletionPolicy);
481                builder.chunkOperations(chunkOperations);
482                builder.stepOperations(stepOperations);
483                builder.taskExecutor(taskExecutor);
484                builder.throttleLimit(throttleLimit);
485                builder.exceptionHandler(exceptionHandler);
486                if (isReaderTransactionalQueue) {
487                        builder.readerIsTransactionalQueue();
488                }
489                for (ItemStream stream : streams) {
490                        builder.stream(stream);
491                }
492
493        }
494}