001/*
002 * Copyright 2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *       https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.integration.partition;
018
019import org.springframework.batch.core.Step;
020import org.springframework.batch.core.StepExecutionListener;
021import org.springframework.batch.core.explore.JobExplorer;
022import org.springframework.batch.core.partition.PartitionHandler;
023import org.springframework.batch.core.partition.StepExecutionSplitter;
024import org.springframework.batch.core.partition.support.Partitioner;
025import org.springframework.batch.core.partition.support.StepExecutionAggregator;
026import org.springframework.batch.core.repository.JobRepository;
027import org.springframework.batch.core.step.builder.PartitionStepBuilder;
028import org.springframework.batch.core.step.builder.StepBuilder;
029import org.springframework.beans.factory.BeanCreationException;
030import org.springframework.beans.factory.BeanFactory;
031import org.springframework.integration.channel.QueueChannel;
032import org.springframework.integration.core.MessagingTemplate;
033import org.springframework.integration.dsl.IntegrationFlows;
034import org.springframework.integration.dsl.StandardIntegrationFlow;
035import org.springframework.integration.dsl.context.IntegrationFlowContext;
036import org.springframework.messaging.MessageChannel;
037import org.springframework.messaging.PollableChannel;
038import org.springframework.transaction.PlatformTransactionManager;
039import org.springframework.util.Assert;
040
041/**
042 * Builder for a master step in a remote partitioning setup. This builder creates and
043 * sets a {@link MessageChannelPartitionHandler} on the master step.
044 *
045 * <p>If no {@code messagingTemplate} is provided through
046 * {@link RemotePartitioningMasterStepBuilder#messagingTemplate(MessagingTemplate)},
047 * this builder will create one and set its default channel to the {@code outputChannel}
048 * provided through {@link RemotePartitioningMasterStepBuilder#outputChannel(MessageChannel)}.</p>
049 *
050 * <p>If a {@code messagingTemplate} is provided, it is assumed that it is fully configured
051 * and that its default channel is set to an output channel on which requests to workers
052 * will be sent.</p>
053 *
054 * @since 4.1
055 * @author Mahmoud Ben Hassine
056 */
057public class RemotePartitioningMasterStepBuilder extends PartitionStepBuilder {
058
059        private static final long DEFAULT_POLL_INTERVAL = 10000L;
060        private static final long DEFAULT_TIMEOUT = -1L;
061
062        private MessagingTemplate messagingTemplate;
063        private MessageChannel inputChannel;
064        private MessageChannel outputChannel;
065        private JobExplorer jobExplorer;
066        private BeanFactory beanFactory;
067        private long pollInterval = DEFAULT_POLL_INTERVAL;
068        private long timeout = DEFAULT_TIMEOUT;
069
070        /**
071         * Create a new {@link RemotePartitioningMasterStepBuilder}.
072         * @param stepName name of the master step
073         */
074        public RemotePartitioningMasterStepBuilder(String stepName) {
075                super(new StepBuilder(stepName));
076        }
077
078        /**
079         * Set the input channel on which replies from workers will be received.
080         * @param inputChannel the input channel
081         * @return this builder instance for fluent chaining
082         */
083        public RemotePartitioningMasterStepBuilder inputChannel(MessageChannel inputChannel) {
084                Assert.notNull(inputChannel, "inputChannel must not be null");
085                this.inputChannel = inputChannel;
086                return this;
087        }
088
089        /**
090         * Set the output channel on which requests to workers will be sent. By using
091         * this setter, a default messaging template will be created and the output
092         * channel will be set as its default channel.
093         * <p>Use either this setter or {@link RemotePartitioningMasterStepBuilder#messagingTemplate(MessagingTemplate)}
094         * to provide a fully configured messaging template.</p>
095         *
096         * @param outputChannel the output channel.
097         * @return this builder instance for fluent chaining
098         * @see RemotePartitioningMasterStepBuilder#messagingTemplate(MessagingTemplate)
099         */
100        public RemotePartitioningMasterStepBuilder outputChannel(MessageChannel outputChannel) {
101                Assert.notNull(outputChannel, "outputChannel must not be null");
102                this.outputChannel = outputChannel;
103                return this;
104        }
105
106        /**
107         * Set the {@link MessagingTemplate} to use to send data to workers.
108         * <strong>The default channel of the messaging template must be set</strong>.
109         * <p>Use either this setter to provide a fully configured messaging template or
110         * provide an output channel through {@link RemotePartitioningMasterStepBuilder#outputChannel(MessageChannel)}
111         * and a default messaging template will be created.</p>
112         *
113         * @param messagingTemplate the messaging template to use
114         * @return this builder instance for fluent chaining
115         * @see RemotePartitioningMasterStepBuilder#outputChannel(MessageChannel)
116         */
117        public RemotePartitioningMasterStepBuilder messagingTemplate(MessagingTemplate messagingTemplate) {
118                Assert.notNull(messagingTemplate, "messagingTemplate must not be null");
119                this.messagingTemplate = messagingTemplate;
120                return this;
121        }
122
123        /**
124         * Set the job explorer.
125         * @param jobExplorer the job explorer to use.
126         * @return this builder instance for fluent chaining
127         */
128        public RemotePartitioningMasterStepBuilder jobExplorer(JobExplorer jobExplorer) {
129                Assert.notNull(jobExplorer, "jobExplorer must not be null");
130                this.jobExplorer = jobExplorer;
131                return this;
132        }
133
134        /**
135         * How often to poll the job repository for the status of the workers. Defaults to 10 seconds.
136         * @param pollInterval the poll interval value in milliseconds
137         * @return this builder instance for fluent chaining
138         */
139        public RemotePartitioningMasterStepBuilder pollInterval(long pollInterval) {
140                Assert.isTrue(pollInterval > 0, "The poll interval must be greater than zero");
141                this.pollInterval = pollInterval;
142                return this;
143        }
144
145        /**
146         * When using job repository polling, the time limit to wait. Defaults to -1 (no timeout).
147         * @param timeout the timeout value in milliseconds
148         * @return this builder instance for fluent chaining
149         */
150        public RemotePartitioningMasterStepBuilder timeout(long timeout) {
151                this.timeout = timeout;
152                return this;
153        }
154
155        /**
156         * Set the bean factory.
157         * @param beanFactory the bean factory to use
158         * @return this builder instance for fluent chaining
159         */
160        public RemotePartitioningMasterStepBuilder beanFactory(BeanFactory beanFactory) {
161                this.beanFactory = beanFactory;
162                return this;
163        }
164
165        public Step build() {
166                Assert.state(this.outputChannel == null || this.messagingTemplate == null,
167                                "You must specify either an outputChannel or a messagingTemplate but not both.");
168
169                // configure messaging template
170                if (this.messagingTemplate == null) {
171                        this.messagingTemplate = new MessagingTemplate();
172                        this.messagingTemplate.setDefaultChannel(this.outputChannel);
173                        if (this.logger.isDebugEnabled()) {
174                                this.logger.debug("No messagingTemplate was provided, using a default one");
175                        }
176                }
177
178                // Configure the partition handler
179                final MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
180                partitionHandler.setStepName(getStepName());
181                partitionHandler.setGridSize(getGridSize());
182                partitionHandler.setMessagingOperations(this.messagingTemplate);
183
184                if (isPolling()) {
185                        partitionHandler.setJobExplorer(this.jobExplorer);
186                        partitionHandler.setPollInterval(this.pollInterval);
187                        partitionHandler.setTimeout(this.timeout);
188                }
189                else {
190                        PollableChannel replies = new QueueChannel();
191                        partitionHandler.setReplyChannel(replies);
192                        StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows
193                                        .from(this.inputChannel)
194                                        .aggregate(aggregatorSpec -> aggregatorSpec.processor(partitionHandler))
195                                        .channel(replies)
196                                        .get();
197                        IntegrationFlowContext integrationFlowContext = this.beanFactory.getBean(IntegrationFlowContext.class);
198                        integrationFlowContext.registration(standardIntegrationFlow)
199                                        .autoStartup(false)
200                                        .register();
201                }
202
203                try {
204                        partitionHandler.afterPropertiesSet();
205                        super.partitionHandler(partitionHandler);
206                }
207                catch (Exception e) {
208                        throw new BeanCreationException("Unable to create a master step for remote partitioning", e);
209                }
210
211                return super.build();
212        }
213
214        private boolean isPolling() {
215                return this.inputChannel == null;
216        }
217
218        @Override
219        public RemotePartitioningMasterStepBuilder repository(JobRepository jobRepository) {
220                super.repository(jobRepository);
221                return this;
222        }
223
224        @Override
225        public  RemotePartitioningMasterStepBuilder transactionManager(PlatformTransactionManager transactionManager) {
226                super.transactionManager(transactionManager);
227                return this;
228        }
229
230        @Override
231        public RemotePartitioningMasterStepBuilder partitioner(String slaveStepName, Partitioner partitioner) {
232                super.partitioner(slaveStepName, partitioner);
233                return this;
234        }
235
236        @Override
237        public RemotePartitioningMasterStepBuilder gridSize(int gridSize) {
238                super.gridSize(gridSize);
239                return this;
240        }
241
242        @Override
243        public RemotePartitioningMasterStepBuilder step(Step step) {
244                super.step(step);
245                return this;
246        }
247
248        @Override
249        public RemotePartitioningMasterStepBuilder splitter(StepExecutionSplitter splitter) {
250                super.splitter(splitter);
251                return this;
252        }
253
254        @Override
255        public RemotePartitioningMasterStepBuilder aggregator(StepExecutionAggregator aggregator) {
256                super.aggregator(aggregator);
257                return this;
258        }
259
260        @Override
261        public RemotePartitioningMasterStepBuilder startLimit(int startLimit) {
262                super.startLimit(startLimit);
263                return this;
264        }
265
266        @Override
267        public RemotePartitioningMasterStepBuilder listener(Object listener) {
268                super.listener(listener);
269                return this;
270        }
271
272        @Override
273        public RemotePartitioningMasterStepBuilder listener(StepExecutionListener listener) {
274                super.listener(listener);
275                return this;
276        }
277
278        @Override
279        public RemotePartitioningMasterStepBuilder allowStartIfComplete(boolean allowStartIfComplete) {
280                super.allowStartIfComplete(allowStartIfComplete);
281                return this;
282        }
283
284        /**
285         * This method will throw a {@link UnsupportedOperationException} since
286         * the partition handler of the master step will be automatically set to an
287         * instance of {@link MessageChannelPartitionHandler}.
288         *
289         * When building a master step for remote partitioning using this builder,
290         * no partition handler must be provided.
291         *
292         * @param partitionHandler a partition handler
293         * @return this builder instance for fluent chaining
294         * @throws UnsupportedOperationException if a partition handler is provided
295         */
296        @Override
297        public RemotePartitioningMasterStepBuilder partitionHandler(PartitionHandler partitionHandler) throws UnsupportedOperationException {
298                throw new UnsupportedOperationException("When configuring a master step " +
299                                "for remote partitioning using the RemotePartitioningMasterStepBuilder, " +
300                                "the partition handler will be automatically set to an instance " +
301                                "of MessageChannelPartitionHandler. The partition handler must " +
302                                "not be provided in this case.");
303        }
304
305}