001/*
002 * Copyright 2013-2014 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.core.jsr.configuration.xml;
017
018import javax.batch.api.Batchlet;
019import javax.batch.api.chunk.CheckpointAlgorithm;
020import javax.batch.api.chunk.ItemProcessor;
021import javax.batch.api.chunk.ItemReader;
022import javax.batch.api.chunk.ItemWriter;
023import javax.batch.api.partition.PartitionReducer;
024
025import org.springframework.batch.core.Step;
026import org.springframework.batch.core.configuration.xml.StepParserStepFactoryBean;
027import org.springframework.batch.core.jsr.configuration.support.BatchPropertyContext;
028import org.springframework.batch.core.jsr.partition.JsrPartitionHandler;
029import org.springframework.batch.core.jsr.step.batchlet.BatchletAdapter;
030import org.springframework.batch.core.jsr.step.builder.JsrBatchletStepBuilder;
031import org.springframework.batch.core.jsr.step.builder.JsrFaultTolerantStepBuilder;
032import org.springframework.batch.core.jsr.step.builder.JsrPartitionStepBuilder;
033import org.springframework.batch.core.jsr.step.builder.JsrSimpleStepBuilder;
034import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
035import org.springframework.batch.core.step.builder.SimpleStepBuilder;
036import org.springframework.batch.core.step.builder.StepBuilder;
037import org.springframework.batch.core.step.builder.TaskletStepBuilder;
038import org.springframework.batch.core.step.tasklet.Tasklet;
039import org.springframework.batch.core.step.tasklet.TaskletStep;
040import org.springframework.batch.jsr.item.ItemProcessorAdapter;
041import org.springframework.batch.jsr.item.ItemReaderAdapter;
042import org.springframework.batch.jsr.item.ItemWriterAdapter;
043import org.springframework.batch.jsr.repeat.CheckpointAlgorithmAdapter;
044import org.springframework.batch.repeat.CompletionPolicy;
045import org.springframework.batch.repeat.policy.CompositeCompletionPolicy;
046import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
047import org.springframework.batch.repeat.policy.TimeoutTerminationPolicy;
048import org.springframework.beans.factory.FactoryBean;
049import org.springframework.util.Assert;
050
051/**
052 * This {@link FactoryBean} is used by the JSR-352 namespace parser to create
053 * {@link Step} objects. It stores all of the properties that are
054 * configurable on the <step/>.
055 *
056 * @author Michael Minella
057 * @author Chris Schaefer
058 * @since 3.0
059 */
060public class StepFactoryBean<I, O> extends StepParserStepFactoryBean<I, O> {
061
062        @SuppressWarnings("unused")
063        private int partitions;
064        private BatchPropertyContext batchPropertyContext;
065
066        private PartitionReducer reducer;
067
068        private Integer timeout;
069
070        public void setPartitionReducer(PartitionReducer reducer) {
071                this.reducer = reducer;
072        }
073
074        public void setBatchPropertyContext(BatchPropertyContext context) {
075                this.batchPropertyContext = context;
076        }
077
078        public void setPartitions(int partitions) {
079                this.partitions = partitions;
080        }
081
082        /**
083         * Create a {@link Step} from the configuration provided.
084         *
085         * @see FactoryBean#getObject()
086         */
087        @Override
088        public Step getObject() throws Exception {
089                if(hasPartitionElement()) {
090                        return createPartitionStep();
091                }
092                else if (hasChunkElement()) {
093                        Assert.isTrue(!hasTasklet(), "Step [" + getName()
094                                        + "] has both a <chunk/> element and a 'ref' attribute  referencing a Tasklet.");
095
096                        validateFaultTolerantSettings();
097
098                        if (isFaultTolerant()) {
099                                return createFaultTolerantStep();
100                        }
101                        else {
102                                return createSimpleStep();
103                        }
104                }
105                else if (hasTasklet()) {
106                        return createTaskletStep();
107                }
108                else {
109                        return createFlowStep();
110                }
111        }
112
113        /**
114         * @return a new {@link TaskletStep}
115         */
116        @Override
117        protected TaskletStep createTaskletStep() {
118                JsrBatchletStepBuilder jsrBatchletStepBuilder = new JsrBatchletStepBuilder(new StepBuilder(getName()));
119                jsrBatchletStepBuilder.setBatchPropertyContext(batchPropertyContext);
120                TaskletStepBuilder builder = jsrBatchletStepBuilder.tasklet(getTasklet());
121                enhanceTaskletStepBuilder(builder);
122                return builder.build();
123        }
124
125        @Override
126        protected void setChunk(SimpleStepBuilder<I, O> builder) {
127                if(timeout != null && getCommitInterval() != null) {
128                        CompositeCompletionPolicy completionPolicy = new CompositeCompletionPolicy();
129                        CompletionPolicy [] policies = new CompletionPolicy[2];
130                        policies[0] = new SimpleCompletionPolicy(getCommitInterval());
131                        policies[1] = new TimeoutTerminationPolicy(timeout * 1000);
132                        completionPolicy.setPolicies(policies);
133                        builder.chunk(completionPolicy);
134                } else if(timeout != null) {
135                        builder.chunk(new TimeoutTerminationPolicy(timeout * 1000));
136                } else if(getCommitInterval() != null) {
137                        builder.chunk(getCommitInterval());
138                }
139
140                if(getCompletionPolicy() != null) {
141                        builder.chunk(getCompletionPolicy());
142                }
143        }
144
145
146        @Override
147        protected Step createPartitionStep() {
148                // Creating a partitioned step for the JSR needs to create two steps...the partitioned step and the step being executed.
149                Step executedStep = null;
150
151                if (hasChunkElement()) {
152                        Assert.isTrue(!hasTasklet(), "Step [" + getName()
153                                        + "] has both a <chunk/> element and a 'ref' attribute  referencing a Tasklet.");
154
155                        validateFaultTolerantSettings();
156
157                        if (isFaultTolerant()) {
158                                executedStep = createFaultTolerantStep();
159                        }
160                        else {
161                                executedStep = createSimpleStep();
162                        }
163                }
164                else if (hasTasklet()) {
165                        executedStep = createTaskletStep();
166                }
167
168                ((JsrPartitionHandler) super.getPartitionHandler()).setStep(executedStep);
169
170                JsrPartitionStepBuilder builder = new JsrSimpleStepBuilder<I, O>(new StepBuilder(executedStep.getName())).partitioner(executedStep);
171
172                enhanceCommonStep(builder);
173
174                if (getPartitionHandler() != null) {
175                        builder.partitionHandler(getPartitionHandler());
176                }
177
178                if(reducer != null) {
179                        builder.reducer(reducer);
180                }
181
182                builder.aggregator(getStepExecutionAggergator());
183
184                return builder.build();
185        }
186
187        /**
188         * Wraps a {@link Batchlet} in a {@link BatchletAdapter} if required for consumption
189         * by the rest of the framework.
190         *
191         * @param tasklet {@link Tasklet} or {@link Batchlet} implementation
192         * @throws IllegalArgumentException if tasklet does not implement either Tasklet or Batchlet
193         */
194        public void setStepTasklet(Object tasklet) {
195                if(tasklet instanceof Tasklet) {
196                        super.setTasklet((Tasklet) tasklet);
197                } else if(tasklet instanceof Batchlet){
198                        super.setTasklet(new BatchletAdapter((Batchlet) tasklet));
199                } else {
200                        throw new IllegalArgumentException("The field tasklet must reference an implementation of " +
201                                        "either org.springframework.batch.core.step.tasklet.Tasklet or javax.batch.api.Batchlet");
202                }
203        }
204
205        /**
206         * Wraps a {@link ItemReader} in a {@link ItemReaderAdapter} if required for consumption
207         * by the rest of the framework.
208         *
209         * @param itemReader {@link ItemReader} or {@link org.springframework.batch.item.ItemReader} implementation
210         * @throws IllegalArgumentException if itemReader does not implement either version of ItemReader
211         */
212        @SuppressWarnings("unchecked")
213        public void setStepItemReader(Object itemReader) {
214                if(itemReader instanceof org.springframework.batch.item.ItemReader) {
215                        super.setItemReader((org.springframework.batch.item.ItemReader<I>) itemReader);
216                } else if(itemReader instanceof ItemReader){
217                        super.setItemReader(new ItemReaderAdapter<I>((ItemReader) itemReader));
218                } else {
219                        throw new IllegalArgumentException("The definition of an item reader must implement either " +
220                                        "org.springframework.batch.item.ItemReader or javax.batch.api.chunk.ItemReader");
221                }
222        }
223
224        /**
225         * Wraps a {@link ItemProcessor} in a {@link ItemProcessorAdapter} if required for consumption
226         * by the rest of the framework.
227         *
228         * @param itemProcessor {@link ItemProcessor} or {@link org.springframework.batch.item.ItemProcessor} implementation
229         * @throws IllegalArgumentException if itemProcessor does not implement either version of ItemProcessor
230         */
231        @SuppressWarnings("unchecked")
232        public void setStepItemProcessor(Object itemProcessor) {
233                if(itemProcessor instanceof org.springframework.batch.item.ItemProcessor) {
234                        super.setItemProcessor((org.springframework.batch.item.ItemProcessor<I, O>) itemProcessor);
235                } else if(itemProcessor instanceof ItemProcessor){
236                        super.setItemProcessor(new ItemProcessorAdapter<I, O>((ItemProcessor)itemProcessor));
237                } else {
238                        throw new IllegalArgumentException("The definition of an item processor must implement either " +
239                                        "org.springframework.batch.item.ItemProcessor or javax.batch.api.chunk.ItemProcessor");
240                }
241        }
242
243        /**
244         * Wraps a {@link ItemWriter} in a {@link ItemWriterAdapter} if required for consumption
245         * by the rest of the framework.
246         *
247         * @param itemWriter {@link ItemWriter} or {@link org.springframework.batch.item.ItemWriter} implementation
248         * @throws IllegalArgumentException if itemWriter does not implement either version of ItemWriter
249         */
250        @SuppressWarnings("unchecked")
251        public void setStepItemWriter(Object itemWriter) {
252                if(itemWriter instanceof org.springframework.batch.item.ItemWriter) {
253                        super.setItemWriter((org.springframework.batch.item.ItemWriter<O>) itemWriter);
254                } else if(itemWriter instanceof ItemWriter){
255                        super.setItemWriter(new ItemWriterAdapter<O>((ItemWriter) itemWriter));
256                } else {
257                        throw new IllegalArgumentException("The definition of an item writer must implement either " +
258                                        "org.springframework.batch.item.ItemWriter or javax.batch.api.chunk.ItemWriter");
259                }
260        }
261
262        /**
263         * Wraps a {@link CheckpointAlgorithm} in a {@link CheckpointAlgorithmAdapter} if required for consumption
264         * by the rest of the framework.
265         *
266         * @param chunkCompletionPolicy {@link CompletionPolicy} or {@link CheckpointAlgorithm} implementation
267         * @throws IllegalArgumentException if chunkCompletionPolicy does not implement either CompletionPolicy or CheckpointAlgorithm
268         */
269        public void setStepChunkCompletionPolicy(Object chunkCompletionPolicy) {
270                if(chunkCompletionPolicy instanceof CompletionPolicy) {
271                        super.setChunkCompletionPolicy((CompletionPolicy) chunkCompletionPolicy);
272                } else if(chunkCompletionPolicy instanceof CheckpointAlgorithm) {
273                        super.setChunkCompletionPolicy(new CheckpointAlgorithmAdapter((CheckpointAlgorithm) chunkCompletionPolicy));
274                } else {
275                        throw new IllegalArgumentException("The definition of a chunk completion policy must implement either " +
276                                        "org.springframework.batch.repeat.CompletionPolicy or javax.batch.api.chunk.CheckpointAlgorithm");
277                }
278        }
279
280        @Override
281        protected FaultTolerantStepBuilder<I, O> getFaultTolerantStepBuilder(String stepName) {
282                JsrFaultTolerantStepBuilder<I, O> jsrFaultTolerantStepBuilder = new JsrFaultTolerantStepBuilder<I, O>(new StepBuilder(stepName));
283                jsrFaultTolerantStepBuilder.setBatchPropertyContext(batchPropertyContext);
284                return jsrFaultTolerantStepBuilder;
285        }
286
287        @Override
288        protected SimpleStepBuilder<I, O> getSimpleStepBuilder(String stepName) {
289                JsrSimpleStepBuilder<I, O> jsrSimpleStepBuilder = new JsrSimpleStepBuilder<I, O>(new StepBuilder(stepName));
290                jsrSimpleStepBuilder.setBatchPropertyContext(batchPropertyContext);
291                return jsrSimpleStepBuilder;
292        }
293
294        public void setTimeout(Integer timeout) {
295                this.timeout = timeout;
296        }
297}