001/* 002 * Copyright 2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.integration.chunk; 017 018import org.springframework.batch.core.ChunkListener; 019import org.springframework.batch.core.ItemReadListener; 020import org.springframework.batch.core.ItemWriteListener; 021import org.springframework.batch.core.SkipListener; 022import org.springframework.batch.core.StepExecutionListener; 023import org.springframework.batch.core.repository.JobRepository; 024import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; 025import org.springframework.batch.core.step.builder.StepBuilder; 026import org.springframework.batch.core.step.item.KeyGenerator; 027import org.springframework.batch.core.step.skip.SkipPolicy; 028import org.springframework.batch.core.step.tasklet.TaskletStep; 029import org.springframework.batch.item.ItemProcessor; 030import org.springframework.batch.item.ItemReader; 031import org.springframework.batch.item.ItemStream; 032import org.springframework.batch.item.ItemWriter; 033import org.springframework.batch.repeat.CompletionPolicy; 034import org.springframework.batch.repeat.RepeatOperations; 035import org.springframework.batch.repeat.exception.ExceptionHandler; 036import org.springframework.integration.core.MessagingTemplate; 037import org.springframework.messaging.MessageChannel; 038import org.springframework.messaging.PollableChannel; 039import org.springframework.retry.RetryPolicy; 040import org.springframework.retry.backoff.BackOffPolicy; 041import org.springframework.retry.policy.RetryContextCache; 042import org.springframework.transaction.PlatformTransactionManager; 043import org.springframework.transaction.interceptor.TransactionAttribute; 044import org.springframework.util.Assert; 045 046/** 047 * Builder for a master step in a remote chunking setup. This builder creates and 048 * sets a {@link ChunkMessageChannelItemWriter} on the master step. 049 * 050 * <p>If no {@code messagingTemplate} is provided through 051 * {@link RemoteChunkingMasterStepBuilder#messagingTemplate(MessagingTemplate)}, 052 * this builder will create one and set its default channel to the {@code outputChannel} 053 * provided through {@link RemoteChunkingMasterStepBuilder#outputChannel(MessageChannel)}.</p> 054 * 055 * <p>If a {@code messagingTemplate} is provided, it is assumed that it is fully configured 056 * and that its default channel is set to an output channel on which requests to workers 057 * will be sent.</p> 058 * 059 * @param <I> type of input items 060 * @param <O> type of output items 061 * 062 * @since 4.1 063 * @author Mahmoud Ben Hassine 064 */ 065public class RemoteChunkingMasterStepBuilder<I, O> extends FaultTolerantStepBuilder<I, O> { 066 067 private MessagingTemplate messagingTemplate; 068 private PollableChannel inputChannel; 069 private MessageChannel outputChannel; 070 071 private final int DEFAULT_MAX_WAIT_TIMEOUTS = 40; 072 private static final long DEFAULT_THROTTLE_LIMIT = 6; 073 private int maxWaitTimeouts = DEFAULT_MAX_WAIT_TIMEOUTS; 074 private long throttleLimit = DEFAULT_THROTTLE_LIMIT; 075 076 /** 077 * Create a new {@link RemoteChunkingMasterStepBuilder}. 078 * 079 * @param stepName name of the master step 080 */ 081 public RemoteChunkingMasterStepBuilder(String stepName) { 082 super(new StepBuilder(stepName)); 083 } 084 085 /** 086 * Set the input channel on which replies from workers will be received. 087 * The provided input channel will be set as a reply channel on the 088 * {@link ChunkMessageChannelItemWriter} created by this builder. 089 * 090 * @param inputChannel the input channel 091 * @return this builder instance for fluent chaining 092 * 093 * @see ChunkMessageChannelItemWriter#setReplyChannel 094 */ 095 public RemoteChunkingMasterStepBuilder<I, O> inputChannel(PollableChannel inputChannel) { 096 Assert.notNull(inputChannel, "inputChannel must not be null"); 097 this.inputChannel = inputChannel; 098 return this; 099 } 100 101 /** 102 * Set the output channel on which requests to workers will be sent. By using 103 * this setter, a default messaging template will be created and the output 104 * channel will be set as its default channel. 105 * <p>Use either this setter or {@link RemoteChunkingMasterStepBuilder#messagingTemplate(MessagingTemplate)} 106 * to provide a fully configured messaging template.</p> 107 * 108 * @param outputChannel the output channel. 109 * @return this builder instance for fluent chaining 110 * 111 * @see RemoteChunkingMasterStepBuilder#messagingTemplate(MessagingTemplate) 112 */ 113 public RemoteChunkingMasterStepBuilder<I, O> outputChannel(MessageChannel outputChannel) { 114 Assert.notNull(outputChannel, "outputChannel must not be null"); 115 this.outputChannel = outputChannel; 116 return this; 117 } 118 119 /** 120 * Set the {@link MessagingTemplate} to use to send data to workers. 121 * <strong>The default channel of the messaging template must be set</strong>. 122 * <p>Use either this setter to provide a fully configured messaging template or 123 * provide an output channel through {@link RemoteChunkingMasterStepBuilder#outputChannel(MessageChannel)} 124 * and a default messaging template will be created.</p> 125 * 126 * @param messagingTemplate the messaging template to use 127 * @return this builder instance for fluent chaining 128 * @see RemoteChunkingMasterStepBuilder#outputChannel(MessageChannel) 129 */ 130 public RemoteChunkingMasterStepBuilder<I, O> messagingTemplate(MessagingTemplate messagingTemplate) { 131 Assert.notNull(messagingTemplate, "messagingTemplate must not be null"); 132 this.messagingTemplate = messagingTemplate; 133 return this; 134 } 135 136 /** 137 * The maximum number of times to wait at the end of a step for a non-null result from the remote workers. This is a 138 * multiplier on the receive timeout set separately on the gateway. The ideal value is a compromise between allowing 139 * slow workers time to finish, and responsiveness if there is a dead worker. Defaults to 40. 140 * 141 * @param maxWaitTimeouts the maximum number of wait timeouts 142 * @see ChunkMessageChannelItemWriter#setMaxWaitTimeouts(int) 143 */ 144 public RemoteChunkingMasterStepBuilder<I, O> maxWaitTimeouts(int maxWaitTimeouts) { 145 Assert.isTrue(maxWaitTimeouts > 0, "maxWaitTimeouts must be greater than zero"); 146 this.maxWaitTimeouts = maxWaitTimeouts; 147 return this; 148 } 149 150 /** 151 * Public setter for the throttle limit. This limits the number of pending requests for chunk processing to avoid 152 * overwhelming the receivers. 153 * 154 * @param throttleLimit the throttle limit to set 155 * @see ChunkMessageChannelItemWriter#setThrottleLimit(long) 156 */ 157 public RemoteChunkingMasterStepBuilder<I, O> throttleLimit(long throttleLimit) { 158 Assert.isTrue(throttleLimit > 0, "throttleLimit must be greater than zero"); 159 this.throttleLimit = throttleLimit; 160 return this; 161 } 162 163 /** 164 * Build a master {@link TaskletStep}. 165 * 166 * @return the configured master step 167 * @see RemoteChunkHandlerFactoryBean 168 */ 169 public TaskletStep build() { 170 Assert.notNull(this.inputChannel, "An InputChannel must be provided"); 171 Assert.state(this.outputChannel == null || this.messagingTemplate == null, 172 "You must specify either an outputChannel or a messagingTemplate but not both."); 173 174 // configure messaging template 175 if (this.messagingTemplate == null) { 176 this.messagingTemplate = new MessagingTemplate(); 177 this.messagingTemplate.setDefaultChannel(this.outputChannel); 178 if (this.logger.isDebugEnabled()) { 179 this.logger.debug("No messagingTemplate was provided, using a default one"); 180 } 181 } 182 183 // configure item writer 184 ChunkMessageChannelItemWriter<O> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>(); 185 chunkMessageChannelItemWriter.setMessagingOperations(this.messagingTemplate); 186 chunkMessageChannelItemWriter.setMaxWaitTimeouts(this.maxWaitTimeouts); 187 chunkMessageChannelItemWriter.setThrottleLimit(this.throttleLimit); 188 chunkMessageChannelItemWriter.setReplyChannel(this.inputChannel); 189 super.writer(chunkMessageChannelItemWriter); 190 191 return super.build(); 192 } 193 194 /* 195 * The following methods override those from parent builders and return 196 * the current builder type. 197 * FIXME: Change parent builders to be generic and return current builder 198 * type in each method. 199 */ 200 201 @Override 202 public RemoteChunkingMasterStepBuilder<I, O> reader(ItemReader<? extends I> reader) { 203 super.reader(reader); 204 return this; 205 } 206 207 @Override 208 public RemoteChunkingMasterStepBuilder<I, O> repository(JobRepository jobRepository) { 209 super.repository(jobRepository); 210 return this; 211 } 212 213 @Override 214 public RemoteChunkingMasterStepBuilder<I, O> transactionManager(PlatformTransactionManager transactionManager) { 215 super.transactionManager(transactionManager); 216 return this; 217 } 218 219 @Override 220 public RemoteChunkingMasterStepBuilder<I, O> listener(Object listener) { 221 super.listener(listener); 222 return this; 223 } 224 225 @Override 226 public RemoteChunkingMasterStepBuilder<I, O> listener(SkipListener<? super I, ? super O> listener) { 227 super.listener(listener); 228 return this; 229 } 230 231 @Override 232 public RemoteChunkingMasterStepBuilder<I, O> listener(ChunkListener listener) { 233 super.listener(listener); 234 return this; 235 } 236 237 @Override 238 public RemoteChunkingMasterStepBuilder<I, O> transactionAttribute(TransactionAttribute transactionAttribute) { 239 super.transactionAttribute(transactionAttribute); 240 return this; 241 } 242 243 @Override 244 public RemoteChunkingMasterStepBuilder<I, O> listener(org.springframework.retry.RetryListener listener) { 245 super.listener(listener); 246 return this; 247 } 248 249 @Override 250 public RemoteChunkingMasterStepBuilder<I, O> keyGenerator(KeyGenerator keyGenerator) { 251 super.keyGenerator(keyGenerator); 252 return this; 253 } 254 255 @Override 256 public RemoteChunkingMasterStepBuilder<I, O> retryLimit(int retryLimit) { 257 super.retryLimit(retryLimit); 258 return this; 259 } 260 261 @Override 262 public RemoteChunkingMasterStepBuilder<I, O> retryPolicy(RetryPolicy retryPolicy) { 263 super.retryPolicy(retryPolicy); 264 return this; 265 } 266 267 @Override 268 public RemoteChunkingMasterStepBuilder<I, O> backOffPolicy(BackOffPolicy backOffPolicy) { 269 super.backOffPolicy(backOffPolicy); 270 return this; 271 } 272 273 @Override 274 public RemoteChunkingMasterStepBuilder<I, O> retryContextCache(RetryContextCache retryContextCache) { 275 super.retryContextCache(retryContextCache); 276 return this; 277 } 278 279 @Override 280 public RemoteChunkingMasterStepBuilder<I, O> skipLimit(int skipLimit) { 281 super.skipLimit(skipLimit); 282 return this; 283 } 284 285 @Override 286 public RemoteChunkingMasterStepBuilder<I, O> noSkip(Class<? extends Throwable> type) { 287 super.noSkip(type); 288 return this; 289 } 290 291 @Override 292 public RemoteChunkingMasterStepBuilder<I, O> skip(Class<? extends Throwable> type) { 293 super.skip(type); 294 return this; 295 } 296 297 @Override 298 public RemoteChunkingMasterStepBuilder<I, O> skipPolicy(SkipPolicy skipPolicy) { 299 super.skipPolicy(skipPolicy); 300 return this; 301 } 302 303 @Override 304 public RemoteChunkingMasterStepBuilder<I, O> noRollback(Class<? extends Throwable> type) { 305 super.noRollback(type); 306 return this; 307 } 308 309 @Override 310 public RemoteChunkingMasterStepBuilder<I, O> noRetry(Class<? extends Throwable> type) { 311 super.noRetry(type); 312 return this; 313 } 314 315 @Override 316 public RemoteChunkingMasterStepBuilder<I, O> retry(Class<? extends Throwable> type) { 317 super.retry(type); 318 return this; 319 } 320 321 @Override 322 public RemoteChunkingMasterStepBuilder<I, O> stream(ItemStream stream) { 323 super.stream(stream); 324 return this; 325 } 326 327 @Override 328 public RemoteChunkingMasterStepBuilder<I, O> chunk(int chunkSize) { 329 super.chunk(chunkSize); 330 return this; 331 } 332 333 @Override 334 public RemoteChunkingMasterStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) { 335 super.chunk(completionPolicy); 336 return this; 337 } 338 339 /** 340 * This method will throw a {@link UnsupportedOperationException} since 341 * the item writer of the master step in a remote chunking setup will be 342 * automatically set to an instance of {@link ChunkMessageChannelItemWriter}. 343 * 344 * When building a master step for remote chunking, no item writer must be 345 * provided. 346 * 347 * @throws UnsupportedOperationException if an item writer is provided 348 * @see ChunkMessageChannelItemWriter 349 * @see RemoteChunkHandlerFactoryBean#setChunkWriter(ItemWriter) 350 */ 351 @Override 352 public RemoteChunkingMasterStepBuilder<I, O> writer(ItemWriter<? super O> writer) throws UnsupportedOperationException { 353 throw new UnsupportedOperationException("When configuring a master step " + 354 "for remote chunking, the item writer will be automatically set " + 355 "to an instance of ChunkMessageChannelItemWriter. The item writer " + 356 "must not be provided in this case."); 357 } 358 359 @Override 360 public RemoteChunkingMasterStepBuilder<I, O> readerIsTransactionalQueue() { 361 super.readerIsTransactionalQueue(); 362 return this; 363 } 364 365 @Override 366 public RemoteChunkingMasterStepBuilder<I, O> listener(ItemReadListener<? super I> listener) { 367 super.listener(listener); 368 return this; 369 } 370 371 @Override 372 public RemoteChunkingMasterStepBuilder<I, O> listener(ItemWriteListener<? super O> listener) { 373 super.listener(listener); 374 return this; 375 } 376 377 @Override 378 public RemoteChunkingMasterStepBuilder<I, O> chunkOperations(RepeatOperations repeatTemplate) { 379 super.chunkOperations(repeatTemplate); 380 return this; 381 } 382 383 @Override 384 public RemoteChunkingMasterStepBuilder<I, O> exceptionHandler(ExceptionHandler exceptionHandler) { 385 super.exceptionHandler(exceptionHandler); 386 return this; 387 } 388 389 @Override 390 public RemoteChunkingMasterStepBuilder<I, O> stepOperations(RepeatOperations repeatTemplate) { 391 super.stepOperations(repeatTemplate); 392 return this; 393 } 394 395 @Override 396 public RemoteChunkingMasterStepBuilder<I, O> startLimit(int startLimit) { 397 super.startLimit(startLimit); 398 return this; 399 } 400 401 @Override 402 public RemoteChunkingMasterStepBuilder<I, O> listener(StepExecutionListener listener) { 403 super.listener(listener); 404 return this; 405 } 406 407 @Override 408 public RemoteChunkingMasterStepBuilder<I, O> allowStartIfComplete(boolean allowStartIfComplete) { 409 super.allowStartIfComplete(allowStartIfComplete); 410 return this; 411 } 412 413 @Override 414 public RemoteChunkingMasterStepBuilder<I, O> processor(ItemProcessor<? super I, ? extends O> itemProcessor) { 415 super.processor(itemProcessor); 416 return this; 417 } 418}