001/* 002 * Copyright 2012-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.core.step.builder; 017 018import java.lang.reflect.Method; 019import java.util.HashSet; 020import java.util.LinkedHashSet; 021import java.util.Set; 022 023import org.springframework.batch.core.ChunkListener; 024import org.springframework.batch.core.Step; 025import org.springframework.batch.core.StepExecutionListener; 026import org.springframework.batch.core.annotation.AfterChunk; 027import org.springframework.batch.core.annotation.AfterChunkError; 028import org.springframework.batch.core.annotation.BeforeChunk; 029import org.springframework.batch.core.listener.StepListenerFactoryBean; 030import org.springframework.batch.core.step.tasklet.Tasklet; 031import org.springframework.batch.core.step.tasklet.TaskletStep; 032import org.springframework.batch.item.ItemStream; 033import org.springframework.batch.repeat.RepeatOperations; 034import org.springframework.batch.repeat.exception.DefaultExceptionHandler; 035import org.springframework.batch.repeat.exception.ExceptionHandler; 036import org.springframework.batch.repeat.support.RepeatTemplate; 037import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate; 038import org.springframework.batch.support.ReflectionUtils; 039import org.springframework.core.task.SyncTaskExecutor; 040import org.springframework.core.task.TaskExecutor; 041import org.springframework.transaction.interceptor.TransactionAttribute; 042 043/** 044 * Base class for step builders that want to build a {@link TaskletStep}. Handles common concerns across all tasklet 045 * step variants, which are mostly to do with the type of tasklet they carry. 046 * 047 * @author Dave Syer 048 * @author Michael Minella 049 * @author Mahmoud Ben Hassine 050 * 051 * @since 2.2 052 * 053 * @param <B> the type of builder represented 054 */ 055public abstract class AbstractTaskletStepBuilder<B extends AbstractTaskletStepBuilder<B>> extends 056StepBuilderHelper<AbstractTaskletStepBuilder<B>> { 057 058 protected Set<ChunkListener> chunkListeners = new LinkedHashSet<ChunkListener>(); 059 060 private RepeatOperations stepOperations; 061 062 private TransactionAttribute transactionAttribute; 063 064 private Set<ItemStream> streams = new LinkedHashSet<ItemStream>(); 065 066 private ExceptionHandler exceptionHandler = new DefaultExceptionHandler(); 067 068 private int throttleLimit = TaskExecutorRepeatTemplate.DEFAULT_THROTTLE_LIMIT; 069 070 private TaskExecutor taskExecutor; 071 072 public AbstractTaskletStepBuilder(StepBuilderHelper<?> parent) { 073 super(parent); 074 } 075 076 protected abstract Tasklet createTasklet(); 077 078 /** 079 * Build the step from the components collected by the fluent setters. Delegates first to {@link #enhance(Step)} and 080 * then to {@link #createTasklet()} in subclasses to create the actual tasklet. 081 * 082 * @return a tasklet step fully configured and ready to execute 083 */ 084 public TaskletStep build() { 085 086 registerStepListenerAsChunkListener(); 087 088 TaskletStep step = new TaskletStep(getName()); 089 090 super.enhance(step); 091 092 step.setChunkListeners(chunkListeners.toArray(new ChunkListener[0])); 093 094 if (transactionAttribute != null) { 095 step.setTransactionAttribute(transactionAttribute); 096 } 097 098 if (stepOperations == null) { 099 100 stepOperations = new RepeatTemplate(); 101 102 if (taskExecutor != null) { 103 TaskExecutorRepeatTemplate repeatTemplate = new TaskExecutorRepeatTemplate(); 104 repeatTemplate.setTaskExecutor(taskExecutor); 105 repeatTemplate.setThrottleLimit(throttleLimit); 106 stepOperations = repeatTemplate; 107 } 108 109 ((RepeatTemplate) stepOperations).setExceptionHandler(exceptionHandler); 110 111 } 112 step.setStepOperations(stepOperations); 113 step.setTasklet(createTasklet()); 114 115 step.setStreams(streams.toArray(new ItemStream[0])); 116 117 try { 118 step.afterPropertiesSet(); 119 } 120 catch (Exception e) { 121 throw new StepBuilderException(e); 122 } 123 124 return step; 125 126 } 127 128 protected void registerStepListenerAsChunkListener() { 129 for (StepExecutionListener stepExecutionListener: properties.getStepExecutionListeners()){ 130 if (stepExecutionListener instanceof ChunkListener){ 131 listener((ChunkListener)stepExecutionListener); 132 } 133 } 134 } 135 136 /** 137 * Register a chunk listener. 138 * 139 * @param listener the listener to register 140 * @return this for fluent chaining 141 */ 142 public AbstractTaskletStepBuilder<B> listener(ChunkListener listener) { 143 chunkListeners.add(listener); 144 return this; 145 } 146 147 /** 148 * Registers objects using the annotation based listener configuration. 149 * 150 * @param listener the object that has a method configured with listener annotation 151 * @return this for fluent chaining 152 */ 153 @Override 154 public B listener(Object listener) { 155 super.listener(listener); 156 157 Set<Method> chunkListenerMethods = new HashSet<>(); 158 chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeChunk.class)); 159 chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterChunk.class)); 160 chunkListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterChunkError.class)); 161 162 if(chunkListenerMethods.size() > 0) { 163 StepListenerFactoryBean factory = new StepListenerFactoryBean(); 164 factory.setDelegate(listener); 165 this.listener((ChunkListener) factory.getObject()); 166 } 167 168 @SuppressWarnings("unchecked") 169 B result = (B) this; 170 return result; 171 } 172 173 /** 174 * Register a stream for callbacks that manage restart data. 175 * 176 * @param stream the stream to register 177 * @return this for fluent chaining 178 */ 179 public AbstractTaskletStepBuilder<B> stream(ItemStream stream) { 180 streams.add(stream); 181 return this; 182 } 183 184 /** 185 * Provide a task executor to use when executing the tasklet. Default is to use a single-threaded (synchronous) 186 * executor. 187 * 188 * @param taskExecutor the task executor to register 189 * @return this for fluent chaining 190 */ 191 public AbstractTaskletStepBuilder<B> taskExecutor(TaskExecutor taskExecutor) { 192 this.taskExecutor = taskExecutor; 193 return this; 194 } 195 196 /** 197 * In the case of an asynchronous {@link #taskExecutor(TaskExecutor)} the number of concurrent tasklet executions 198 * can be throttled (beyond any throttling provided by a thread pool). The throttle limit should be less than the 199 * data source pool size used in the job repository for this step. 200 * 201 * @param throttleLimit maximum number of concurrent tasklet executions allowed 202 * @return this for fluent chaining 203 */ 204 public AbstractTaskletStepBuilder<B> throttleLimit(int throttleLimit) { 205 this.throttleLimit = throttleLimit; 206 return this; 207 } 208 209 /** 210 * Sets the exception handler to use in the case of tasklet failures. Default is to rethrow everything. 211 * 212 * @param exceptionHandler the exception handler 213 * @return this for fluent chaining 214 */ 215 public AbstractTaskletStepBuilder<B> exceptionHandler(ExceptionHandler exceptionHandler) { 216 this.exceptionHandler = exceptionHandler; 217 return this; 218 } 219 220 /** 221 * Sets the repeat template used for iterating the tasklet execution. By default it will terminate only when the 222 * tasklet returns FINISHED (or null). 223 * 224 * @param repeatTemplate a repeat template with rules for iterating 225 * @return this for fluent chaining 226 */ 227 public AbstractTaskletStepBuilder<B> stepOperations(RepeatOperations repeatTemplate) { 228 this.stepOperations = repeatTemplate; 229 return this; 230 } 231 232 /** 233 * Sets the transaction attributes for the tasklet execution. Defaults to the default values for the transaction 234 * manager, but can be manipulated to provide longer timeouts for instance. 235 * 236 * @param transactionAttribute a transaction attribute set 237 * @return this for fluent chaining 238 */ 239 public AbstractTaskletStepBuilder<B> transactionAttribute(TransactionAttribute transactionAttribute) { 240 this.transactionAttribute = transactionAttribute; 241 return this; 242 } 243 244 /** 245 * Convenience method for subclasses to access the step operations that were injected by user. 246 * 247 * @return the repeat operations used to iterate the tasklet executions 248 */ 249 protected RepeatOperations getStepOperations() { 250 return stepOperations; 251 } 252 253 /** 254 * Convenience method for subclasses to access the exception handler that was injected by user. 255 * 256 * @return the exception handler 257 */ 258 protected ExceptionHandler getExceptionHandler() { 259 return exceptionHandler; 260 } 261 262 /** 263 * Convenience method for subclasses to determine if the step is concurrent. 264 * 265 * @return true if the tasklet is going to be run in multiple threads 266 */ 267 protected boolean concurrent() { 268 boolean concurrent = taskExecutor != null && !(taskExecutor instanceof SyncTaskExecutor); 269 return concurrent; 270 } 271 272 protected TaskExecutor getTaskExecutor() { 273 return taskExecutor; 274 } 275 276 protected int getThrottleLimit() { 277 return throttleLimit; 278 } 279 280 protected TransactionAttribute getTransactionAttribute() { 281 return transactionAttribute; 282 } 283 284 protected Set<ItemStream> getStreams() { 285 return this.streams; 286 } 287 288}