001/* 002 * Copyright 2013-2014 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.jsr.configuration.xml; 017 018import javax.batch.api.Batchlet; 019import javax.batch.api.chunk.CheckpointAlgorithm; 020import javax.batch.api.chunk.ItemProcessor; 021import javax.batch.api.chunk.ItemReader; 022import javax.batch.api.chunk.ItemWriter; 023import javax.batch.api.partition.PartitionReducer; 024 025import org.springframework.batch.core.Step; 026import org.springframework.batch.core.configuration.xml.StepParserStepFactoryBean; 027import org.springframework.batch.core.jsr.configuration.support.BatchPropertyContext; 028import org.springframework.batch.core.jsr.partition.JsrPartitionHandler; 029import org.springframework.batch.core.jsr.step.batchlet.BatchletAdapter; 030import org.springframework.batch.core.jsr.step.builder.JsrBatchletStepBuilder; 031import org.springframework.batch.core.jsr.step.builder.JsrFaultTolerantStepBuilder; 032import org.springframework.batch.core.jsr.step.builder.JsrPartitionStepBuilder; 033import org.springframework.batch.core.jsr.step.builder.JsrSimpleStepBuilder; 034import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; 035import org.springframework.batch.core.step.builder.SimpleStepBuilder; 036import org.springframework.batch.core.step.builder.StepBuilder; 037import org.springframework.batch.core.step.builder.TaskletStepBuilder; 038import org.springframework.batch.core.step.tasklet.Tasklet; 039import org.springframework.batch.core.step.tasklet.TaskletStep; 040import org.springframework.batch.jsr.item.ItemProcessorAdapter; 041import org.springframework.batch.jsr.item.ItemReaderAdapter; 042import org.springframework.batch.jsr.item.ItemWriterAdapter; 043import org.springframework.batch.jsr.repeat.CheckpointAlgorithmAdapter; 044import org.springframework.batch.repeat.CompletionPolicy; 045import org.springframework.batch.repeat.policy.CompositeCompletionPolicy; 046import org.springframework.batch.repeat.policy.SimpleCompletionPolicy; 047import org.springframework.batch.repeat.policy.TimeoutTerminationPolicy; 048import org.springframework.beans.factory.FactoryBean; 049import org.springframework.util.Assert; 050 051/** 052 * This {@link FactoryBean} is used by the JSR-352 namespace parser to create 053 * {@link Step} objects. It stores all of the properties that are 054 * configurable on the <step/>. 055 * 056 * @author Michael Minella 057 * @author Chris Schaefer 058 * @since 3.0 059 */ 060public class StepFactoryBean<I, O> extends StepParserStepFactoryBean<I, O> { 061 062 @SuppressWarnings("unused") 063 private int partitions; 064 private BatchPropertyContext batchPropertyContext; 065 066 private PartitionReducer reducer; 067 068 private Integer timeout; 069 070 public void setPartitionReducer(PartitionReducer reducer) { 071 this.reducer = reducer; 072 } 073 074 public void setBatchPropertyContext(BatchPropertyContext context) { 075 this.batchPropertyContext = context; 076 } 077 078 public void setPartitions(int partitions) { 079 this.partitions = partitions; 080 } 081 082 /** 083 * Create a {@link Step} from the configuration provided. 084 * 085 * @see FactoryBean#getObject() 086 */ 087 @Override 088 public Step getObject() throws Exception { 089 if(hasPartitionElement()) { 090 return createPartitionStep(); 091 } 092 else if (hasChunkElement()) { 093 Assert.isTrue(!hasTasklet(), "Step [" + getName() 094 + "] has both a <chunk/> element and a 'ref' attribute referencing a Tasklet."); 095 096 validateFaultTolerantSettings(); 097 098 if (isFaultTolerant()) { 099 return createFaultTolerantStep(); 100 } 101 else { 102 return createSimpleStep(); 103 } 104 } 105 else if (hasTasklet()) { 106 return createTaskletStep(); 107 } 108 else { 109 return createFlowStep(); 110 } 111 } 112 113 /** 114 * @return a new {@link TaskletStep} 115 */ 116 @Override 117 protected TaskletStep createTaskletStep() { 118 JsrBatchletStepBuilder jsrBatchletStepBuilder = new JsrBatchletStepBuilder(new StepBuilder(getName())); 119 jsrBatchletStepBuilder.setBatchPropertyContext(batchPropertyContext); 120 TaskletStepBuilder builder = jsrBatchletStepBuilder.tasklet(getTasklet()); 121 enhanceTaskletStepBuilder(builder); 122 return builder.build(); 123 } 124 125 @Override 126 protected void setChunk(SimpleStepBuilder<I, O> builder) { 127 if(timeout != null && getCommitInterval() != null) { 128 CompositeCompletionPolicy completionPolicy = new CompositeCompletionPolicy(); 129 CompletionPolicy [] policies = new CompletionPolicy[2]; 130 policies[0] = new SimpleCompletionPolicy(getCommitInterval()); 131 policies[1] = new TimeoutTerminationPolicy(timeout * 1000); 132 completionPolicy.setPolicies(policies); 133 builder.chunk(completionPolicy); 134 } else if(timeout != null) { 135 builder.chunk(new TimeoutTerminationPolicy(timeout * 1000)); 136 } else if(getCommitInterval() != null) { 137 builder.chunk(getCommitInterval()); 138 } 139 140 if(getCompletionPolicy() != null) { 141 builder.chunk(getCompletionPolicy()); 142 } 143 } 144 145 146 @Override 147 protected Step createPartitionStep() { 148 // Creating a partitioned step for the JSR needs to create two steps...the partitioned step and the step being executed. 149 Step executedStep = null; 150 151 if (hasChunkElement()) { 152 Assert.isTrue(!hasTasklet(), "Step [" + getName() 153 + "] has both a <chunk/> element and a 'ref' attribute referencing a Tasklet."); 154 155 validateFaultTolerantSettings(); 156 157 if (isFaultTolerant()) { 158 executedStep = createFaultTolerantStep(); 159 } 160 else { 161 executedStep = createSimpleStep(); 162 } 163 } 164 else if (hasTasklet()) { 165 executedStep = createTaskletStep(); 166 } 167 168 ((JsrPartitionHandler) super.getPartitionHandler()).setStep(executedStep); 169 170 JsrPartitionStepBuilder builder = new JsrSimpleStepBuilder<I, O>(new StepBuilder(executedStep.getName())).partitioner(executedStep); 171 172 enhanceCommonStep(builder); 173 174 if (getPartitionHandler() != null) { 175 builder.partitionHandler(getPartitionHandler()); 176 } 177 178 if(reducer != null) { 179 builder.reducer(reducer); 180 } 181 182 builder.aggregator(getStepExecutionAggergator()); 183 184 return builder.build(); 185 } 186 187 /** 188 * Wraps a {@link Batchlet} in a {@link BatchletAdapter} if required for consumption 189 * by the rest of the framework. 190 * 191 * @param tasklet {@link Tasklet} or {@link Batchlet} implementation 192 * @throws IllegalArgumentException if tasklet does not implement either Tasklet or Batchlet 193 */ 194 public void setStepTasklet(Object tasklet) { 195 if(tasklet instanceof Tasklet) { 196 super.setTasklet((Tasklet) tasklet); 197 } else if(tasklet instanceof Batchlet){ 198 super.setTasklet(new BatchletAdapter((Batchlet) tasklet)); 199 } else { 200 throw new IllegalArgumentException("The field tasklet must reference an implementation of " + 201 "either org.springframework.batch.core.step.tasklet.Tasklet or javax.batch.api.Batchlet"); 202 } 203 } 204 205 /** 206 * Wraps a {@link ItemReader} in a {@link ItemReaderAdapter} if required for consumption 207 * by the rest of the framework. 208 * 209 * @param itemReader {@link ItemReader} or {@link org.springframework.batch.item.ItemReader} implementation 210 * @throws IllegalArgumentException if itemReader does not implement either version of ItemReader 211 */ 212 @SuppressWarnings("unchecked") 213 public void setStepItemReader(Object itemReader) { 214 if(itemReader instanceof org.springframework.batch.item.ItemReader) { 215 super.setItemReader((org.springframework.batch.item.ItemReader<I>) itemReader); 216 } else if(itemReader instanceof ItemReader){ 217 super.setItemReader(new ItemReaderAdapter<I>((ItemReader) itemReader)); 218 } else { 219 throw new IllegalArgumentException("The definition of an item reader must implement either " + 220 "org.springframework.batch.item.ItemReader or javax.batch.api.chunk.ItemReader"); 221 } 222 } 223 224 /** 225 * Wraps a {@link ItemProcessor} in a {@link ItemProcessorAdapter} if required for consumption 226 * by the rest of the framework. 227 * 228 * @param itemProcessor {@link ItemProcessor} or {@link org.springframework.batch.item.ItemProcessor} implementation 229 * @throws IllegalArgumentException if itemProcessor does not implement either version of ItemProcessor 230 */ 231 @SuppressWarnings("unchecked") 232 public void setStepItemProcessor(Object itemProcessor) { 233 if(itemProcessor instanceof org.springframework.batch.item.ItemProcessor) { 234 super.setItemProcessor((org.springframework.batch.item.ItemProcessor<I, O>) itemProcessor); 235 } else if(itemProcessor instanceof ItemProcessor){ 236 super.setItemProcessor(new ItemProcessorAdapter<I, O>((ItemProcessor)itemProcessor)); 237 } else { 238 throw new IllegalArgumentException("The definition of an item processor must implement either " + 239 "org.springframework.batch.item.ItemProcessor or javax.batch.api.chunk.ItemProcessor"); 240 } 241 } 242 243 /** 244 * Wraps a {@link ItemWriter} in a {@link ItemWriterAdapter} if required for consumption 245 * by the rest of the framework. 246 * 247 * @param itemWriter {@link ItemWriter} or {@link org.springframework.batch.item.ItemWriter} implementation 248 * @throws IllegalArgumentException if itemWriter does not implement either version of ItemWriter 249 */ 250 @SuppressWarnings("unchecked") 251 public void setStepItemWriter(Object itemWriter) { 252 if(itemWriter instanceof org.springframework.batch.item.ItemWriter) { 253 super.setItemWriter((org.springframework.batch.item.ItemWriter<O>) itemWriter); 254 } else if(itemWriter instanceof ItemWriter){ 255 super.setItemWriter(new ItemWriterAdapter<O>((ItemWriter) itemWriter)); 256 } else { 257 throw new IllegalArgumentException("The definition of an item writer must implement either " + 258 "org.springframework.batch.item.ItemWriter or javax.batch.api.chunk.ItemWriter"); 259 } 260 } 261 262 /** 263 * Wraps a {@link CheckpointAlgorithm} in a {@link CheckpointAlgorithmAdapter} if required for consumption 264 * by the rest of the framework. 265 * 266 * @param chunkCompletionPolicy {@link CompletionPolicy} or {@link CheckpointAlgorithm} implementation 267 * @throws IllegalArgumentException if chunkCompletionPolicy does not implement either CompletionPolicy or CheckpointAlgorithm 268 */ 269 public void setStepChunkCompletionPolicy(Object chunkCompletionPolicy) { 270 if(chunkCompletionPolicy instanceof CompletionPolicy) { 271 super.setChunkCompletionPolicy((CompletionPolicy) chunkCompletionPolicy); 272 } else if(chunkCompletionPolicy instanceof CheckpointAlgorithm) { 273 super.setChunkCompletionPolicy(new CheckpointAlgorithmAdapter((CheckpointAlgorithm) chunkCompletionPolicy)); 274 } else { 275 throw new IllegalArgumentException("The definition of a chunk completion policy must implement either " + 276 "org.springframework.batch.repeat.CompletionPolicy or javax.batch.api.chunk.CheckpointAlgorithm"); 277 } 278 } 279 280 @Override 281 protected FaultTolerantStepBuilder<I, O> getFaultTolerantStepBuilder(String stepName) { 282 JsrFaultTolerantStepBuilder<I, O> jsrFaultTolerantStepBuilder = new JsrFaultTolerantStepBuilder<I, O>(new StepBuilder(stepName)); 283 jsrFaultTolerantStepBuilder.setBatchPropertyContext(batchPropertyContext); 284 return jsrFaultTolerantStepBuilder; 285 } 286 287 @Override 288 protected SimpleStepBuilder<I, O> getSimpleStepBuilder(String stepName) { 289 JsrSimpleStepBuilder<I, O> jsrSimpleStepBuilder = new JsrSimpleStepBuilder<I, O>(new StepBuilder(stepName)); 290 jsrSimpleStepBuilder.setBatchPropertyContext(batchPropertyContext); 291 return jsrSimpleStepBuilder; 292 } 293 294 public void setTimeout(Integer timeout) { 295 this.timeout = timeout; 296 } 297}