001/*
002 * Copyright 2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *       https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.integration.chunk;
017
018import org.springframework.batch.core.ChunkListener;
019import org.springframework.batch.core.ItemReadListener;
020import org.springframework.batch.core.ItemWriteListener;
021import org.springframework.batch.core.SkipListener;
022import org.springframework.batch.core.StepExecutionListener;
023import org.springframework.batch.core.repository.JobRepository;
024import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
025import org.springframework.batch.core.step.builder.StepBuilder;
026import org.springframework.batch.core.step.item.KeyGenerator;
027import org.springframework.batch.core.step.skip.SkipPolicy;
028import org.springframework.batch.core.step.tasklet.TaskletStep;
029import org.springframework.batch.item.ItemProcessor;
030import org.springframework.batch.item.ItemReader;
031import org.springframework.batch.item.ItemStream;
032import org.springframework.batch.item.ItemWriter;
033import org.springframework.batch.repeat.CompletionPolicy;
034import org.springframework.batch.repeat.RepeatOperations;
035import org.springframework.batch.repeat.exception.ExceptionHandler;
036import org.springframework.integration.core.MessagingTemplate;
037import org.springframework.messaging.MessageChannel;
038import org.springframework.messaging.PollableChannel;
039import org.springframework.retry.RetryPolicy;
040import org.springframework.retry.backoff.BackOffPolicy;
041import org.springframework.retry.policy.RetryContextCache;
042import org.springframework.transaction.PlatformTransactionManager;
043import org.springframework.transaction.interceptor.TransactionAttribute;
044import org.springframework.util.Assert;
045
046/**
047 * Builder for a master step in a remote chunking setup. This builder creates and
048 * sets a {@link ChunkMessageChannelItemWriter} on the master step.
049 *
050 * <p>If no {@code messagingTemplate} is provided through
051 * {@link RemoteChunkingMasterStepBuilder#messagingTemplate(MessagingTemplate)},
052 * this builder will create one and set its default channel to the {@code outputChannel}
053 * provided through {@link RemoteChunkingMasterStepBuilder#outputChannel(MessageChannel)}.</p>
054 *
055 * <p>If a {@code messagingTemplate} is provided, it is assumed that it is fully configured
056 * and that its default channel is set to an output channel on which requests to workers
057 * will be sent.</p>
058 *
059 * @param <I> type of input items
060 * @param <O> type of output items
061 *
062 * @since 4.1
063 * @author Mahmoud Ben Hassine
064 */
065public class RemoteChunkingMasterStepBuilder<I, O> extends FaultTolerantStepBuilder<I, O> {
066
067        private MessagingTemplate messagingTemplate;
068        private PollableChannel inputChannel;
069        private MessageChannel outputChannel;
070
071        private final int DEFAULT_MAX_WAIT_TIMEOUTS = 40;
072        private static final long DEFAULT_THROTTLE_LIMIT = 6;
073        private int maxWaitTimeouts = DEFAULT_MAX_WAIT_TIMEOUTS;
074        private long throttleLimit = DEFAULT_THROTTLE_LIMIT;
075
076        /**
077         * Create a new {@link RemoteChunkingMasterStepBuilder}.
078         *
079         * @param stepName name of the master step
080         */
081        public RemoteChunkingMasterStepBuilder(String stepName) {
082                super(new StepBuilder(stepName));
083        }
084
085        /**
086         * Set the input channel on which replies from workers will be received.
087         * The provided input channel will be set as a reply channel on the
088         * {@link ChunkMessageChannelItemWriter} created by this builder.
089         *
090         * @param inputChannel the input channel
091         * @return this builder instance for fluent chaining
092         *
093         * @see ChunkMessageChannelItemWriter#setReplyChannel
094         */
095        public RemoteChunkingMasterStepBuilder<I, O> inputChannel(PollableChannel inputChannel) {
096                Assert.notNull(inputChannel, "inputChannel must not be null");
097                this.inputChannel = inputChannel;
098                return this;
099        }
100
101        /**
102         * Set the output channel on which requests to workers will be sent. By using
103         * this setter, a default messaging template will be created and the output
104         * channel will be set as its default channel.
105         * <p>Use either this setter or {@link RemoteChunkingMasterStepBuilder#messagingTemplate(MessagingTemplate)}
106         * to provide a fully configured messaging template.</p>
107         *
108         * @param outputChannel the output channel.
109         * @return this builder instance for fluent chaining
110         *
111         * @see RemoteChunkingMasterStepBuilder#messagingTemplate(MessagingTemplate)
112         */
113        public RemoteChunkingMasterStepBuilder<I, O> outputChannel(MessageChannel outputChannel) {
114                Assert.notNull(outputChannel, "outputChannel must not be null");
115                this.outputChannel = outputChannel;
116                return this;
117        }
118
119        /**
120         * Set the {@link MessagingTemplate} to use to send data to workers.
121         * <strong>The default channel of the messaging template must be set</strong>.
122         * <p>Use either this setter to provide a fully configured messaging template or
123         * provide an output channel through {@link RemoteChunkingMasterStepBuilder#outputChannel(MessageChannel)}
124         * and a default messaging template will be created.</p>
125         *
126         * @param messagingTemplate the messaging template to use
127         * @return this builder instance for fluent chaining
128         * @see RemoteChunkingMasterStepBuilder#outputChannel(MessageChannel)
129         */
130        public RemoteChunkingMasterStepBuilder<I, O> messagingTemplate(MessagingTemplate messagingTemplate) {
131                Assert.notNull(messagingTemplate, "messagingTemplate must not be null");
132                this.messagingTemplate = messagingTemplate;
133                return this;
134        }
135
136        /**
137         * The maximum number of times to wait at the end of a step for a non-null result from the remote workers. This is a
138         * multiplier on the receive timeout set separately on the gateway. The ideal value is a compromise between allowing
139         * slow workers time to finish, and responsiveness if there is a dead worker. Defaults to 40.
140         *
141         * @param maxWaitTimeouts the maximum number of wait timeouts
142         * @see ChunkMessageChannelItemWriter#setMaxWaitTimeouts(int)
143         */
144        public RemoteChunkingMasterStepBuilder<I, O> maxWaitTimeouts(int maxWaitTimeouts) {
145                Assert.isTrue(maxWaitTimeouts > 0, "maxWaitTimeouts must be greater than zero");
146                this.maxWaitTimeouts = maxWaitTimeouts;
147                return this;
148        }
149
150        /**
151         * Public setter for the throttle limit. This limits the number of pending requests for chunk processing to avoid
152         * overwhelming the receivers.
153         *
154         * @param throttleLimit the throttle limit to set
155         * @see ChunkMessageChannelItemWriter#setThrottleLimit(long)
156         */
157        public RemoteChunkingMasterStepBuilder<I, O> throttleLimit(long throttleLimit) {
158                Assert.isTrue(throttleLimit > 0, "throttleLimit must be greater than zero");
159                this.throttleLimit = throttleLimit;
160                return this;
161        }
162
163        /**
164         * Build a master {@link TaskletStep}.
165         *
166         * @return the configured master step
167         * @see RemoteChunkHandlerFactoryBean
168         */
169        public TaskletStep build() {
170                Assert.notNull(this.inputChannel, "An InputChannel must be provided");
171                Assert.state(this.outputChannel == null || this.messagingTemplate == null,
172                                "You must specify either an outputChannel or a messagingTemplate but not both.");
173
174                // configure messaging template
175                if (this.messagingTemplate == null) {
176                        this.messagingTemplate = new MessagingTemplate();
177                        this.messagingTemplate.setDefaultChannel(this.outputChannel);
178                        if (this.logger.isDebugEnabled()) {
179                                this.logger.debug("No messagingTemplate was provided, using a default one");
180                        }
181                }
182
183                // configure item writer
184                ChunkMessageChannelItemWriter<O> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>();
185                chunkMessageChannelItemWriter.setMessagingOperations(this.messagingTemplate);
186                chunkMessageChannelItemWriter.setMaxWaitTimeouts(this.maxWaitTimeouts);
187                chunkMessageChannelItemWriter.setThrottleLimit(this.throttleLimit);
188                chunkMessageChannelItemWriter.setReplyChannel(this.inputChannel);
189                super.writer(chunkMessageChannelItemWriter);
190
191                return super.build();
192        }
193
194        /*
195         * The following methods override those from parent builders and return
196         * the current builder type.
197         * FIXME: Change parent builders to be generic and return current builder
198          * type in each method.
199         */
200
201        @Override
202        public RemoteChunkingMasterStepBuilder<I, O> reader(ItemReader<? extends I> reader) {
203                super.reader(reader);
204                return this;
205        }
206
207        @Override
208        public RemoteChunkingMasterStepBuilder<I, O>  repository(JobRepository jobRepository) {
209                super.repository(jobRepository);
210                return this;
211        }
212
213        @Override
214        public  RemoteChunkingMasterStepBuilder<I, O>  transactionManager(PlatformTransactionManager transactionManager) {
215                super.transactionManager(transactionManager);
216                return this;
217        }
218
219        @Override
220        public RemoteChunkingMasterStepBuilder<I, O> listener(Object listener) {
221                super.listener(listener);
222                return this;
223        }
224
225        @Override
226        public RemoteChunkingMasterStepBuilder<I, O> listener(SkipListener<? super I, ? super O> listener) {
227                super.listener(listener);
228                return this;
229        }
230
231        @Override
232        public RemoteChunkingMasterStepBuilder<I, O> listener(ChunkListener listener) {
233                super.listener(listener);
234                return this;
235        }
236
237        @Override
238        public RemoteChunkingMasterStepBuilder<I, O> transactionAttribute(TransactionAttribute transactionAttribute) {
239                super.transactionAttribute(transactionAttribute);
240                return this;
241        }
242
243        @Override
244        public RemoteChunkingMasterStepBuilder<I, O> listener(org.springframework.retry.RetryListener listener) {
245                super.listener(listener);
246                return this;
247        }
248
249        @Override
250        public RemoteChunkingMasterStepBuilder<I, O> keyGenerator(KeyGenerator keyGenerator) {
251                super.keyGenerator(keyGenerator);
252                return this;
253        }
254
255        @Override
256        public RemoteChunkingMasterStepBuilder<I, O> retryLimit(int retryLimit) {
257                super.retryLimit(retryLimit);
258                return this;
259        }
260
261        @Override
262        public RemoteChunkingMasterStepBuilder<I, O> retryPolicy(RetryPolicy retryPolicy) {
263                super.retryPolicy(retryPolicy);
264                return this;
265        }
266
267        @Override
268        public RemoteChunkingMasterStepBuilder<I, O> backOffPolicy(BackOffPolicy backOffPolicy) {
269                super.backOffPolicy(backOffPolicy);
270                return this;
271        }
272
273        @Override
274        public RemoteChunkingMasterStepBuilder<I, O> retryContextCache(RetryContextCache retryContextCache) {
275                super.retryContextCache(retryContextCache);
276                return this;
277        }
278
279        @Override
280        public RemoteChunkingMasterStepBuilder<I, O> skipLimit(int skipLimit) {
281                super.skipLimit(skipLimit);
282                return this;
283        }
284
285        @Override
286        public RemoteChunkingMasterStepBuilder<I, O> noSkip(Class<? extends Throwable> type) {
287                super.noSkip(type);
288                return this;
289        }
290
291        @Override
292        public RemoteChunkingMasterStepBuilder<I, O> skip(Class<? extends Throwable> type) {
293                super.skip(type);
294                return this;
295        }
296
297        @Override
298        public RemoteChunkingMasterStepBuilder<I, O> skipPolicy(SkipPolicy skipPolicy) {
299                super.skipPolicy(skipPolicy);
300                return this;
301        }
302
303        @Override
304        public RemoteChunkingMasterStepBuilder<I, O> noRollback(Class<? extends Throwable> type) {
305                super.noRollback(type);
306                return this;
307        }
308
309        @Override
310        public RemoteChunkingMasterStepBuilder<I, O> noRetry(Class<? extends Throwable> type) {
311                super.noRetry(type);
312                return this;
313        }
314
315        @Override
316        public RemoteChunkingMasterStepBuilder<I, O> retry(Class<? extends Throwable> type) {
317                super.retry(type);
318                return this;
319        }
320
321        @Override
322        public RemoteChunkingMasterStepBuilder<I, O> stream(ItemStream stream) {
323                super.stream(stream);
324                return this;
325        }
326
327        @Override
328        public RemoteChunkingMasterStepBuilder<I, O> chunk(int chunkSize) {
329                super.chunk(chunkSize);
330                return this;
331        }
332
333        @Override
334        public RemoteChunkingMasterStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) {
335                super.chunk(completionPolicy);
336                return this;
337        }
338
339        /**
340         * This method will throw a {@link UnsupportedOperationException} since
341         * the item writer of the master step in a remote chunking setup will be
342         * automatically set to an instance of {@link ChunkMessageChannelItemWriter}.
343         *
344         * When building a master step for remote chunking, no item writer must be
345         * provided.
346         *
347         * @throws UnsupportedOperationException if an item writer is provided
348         * @see ChunkMessageChannelItemWriter
349         * @see RemoteChunkHandlerFactoryBean#setChunkWriter(ItemWriter)
350         */
351        @Override
352        public RemoteChunkingMasterStepBuilder<I, O> writer(ItemWriter<? super O> writer) throws UnsupportedOperationException {
353                throw new UnsupportedOperationException("When configuring a master step " +
354                                "for remote chunking, the item writer will be automatically set " +
355                                "to an instance of ChunkMessageChannelItemWriter. The item writer " +
356                                "must not be provided in this case.");
357        }
358
359        @Override
360        public RemoteChunkingMasterStepBuilder<I, O> readerIsTransactionalQueue() {
361                super.readerIsTransactionalQueue();
362                return this;
363        }
364
365        @Override
366        public RemoteChunkingMasterStepBuilder<I, O> listener(ItemReadListener<? super I> listener) {
367                super.listener(listener);
368                return this;
369        }
370
371        @Override
372        public RemoteChunkingMasterStepBuilder<I, O> listener(ItemWriteListener<? super O> listener) {
373                super.listener(listener);
374                return this;
375        }
376
377        @Override
378        public RemoteChunkingMasterStepBuilder<I, O> chunkOperations(RepeatOperations repeatTemplate) {
379                super.chunkOperations(repeatTemplate);
380                return this;
381        }
382
383        @Override
384        public RemoteChunkingMasterStepBuilder<I, O> exceptionHandler(ExceptionHandler exceptionHandler) {
385                super.exceptionHandler(exceptionHandler);
386                return this;
387        }
388
389        @Override
390        public RemoteChunkingMasterStepBuilder<I, O> stepOperations(RepeatOperations repeatTemplate) {
391                super.stepOperations(repeatTemplate);
392                return this;
393        }
394
395        @Override
396        public RemoteChunkingMasterStepBuilder<I, O> startLimit(int startLimit) {
397                super.startLimit(startLimit);
398                return this;
399        }
400
401        @Override
402        public RemoteChunkingMasterStepBuilder<I, O> listener(StepExecutionListener listener) {
403                super.listener(listener);
404                return this;
405        }
406
407        @Override
408        public RemoteChunkingMasterStepBuilder<I, O> allowStartIfComplete(boolean allowStartIfComplete) {
409                super.allowStartIfComplete(allowStartIfComplete);
410                return this;
411        }
412
413        @Override
414        public RemoteChunkingMasterStepBuilder<I, O> processor(ItemProcessor<? super I, ? extends O> itemProcessor) {
415                super.processor(itemProcessor);
416                return this;
417        }
418}