001/* 002 * Copyright 2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.integration.partition; 018 019import org.springframework.batch.core.Step; 020import org.springframework.batch.core.StepExecutionListener; 021import org.springframework.batch.core.explore.JobExplorer; 022import org.springframework.batch.core.partition.PartitionHandler; 023import org.springframework.batch.core.partition.StepExecutionSplitter; 024import org.springframework.batch.core.partition.support.Partitioner; 025import org.springframework.batch.core.partition.support.StepExecutionAggregator; 026import org.springframework.batch.core.repository.JobRepository; 027import org.springframework.batch.core.step.builder.PartitionStepBuilder; 028import org.springframework.batch.core.step.builder.StepBuilder; 029import org.springframework.beans.factory.BeanCreationException; 030import org.springframework.beans.factory.BeanFactory; 031import org.springframework.integration.channel.QueueChannel; 032import org.springframework.integration.core.MessagingTemplate; 033import org.springframework.integration.dsl.IntegrationFlows; 034import org.springframework.integration.dsl.StandardIntegrationFlow; 035import org.springframework.integration.dsl.context.IntegrationFlowContext; 036import org.springframework.messaging.MessageChannel; 037import org.springframework.messaging.PollableChannel; 038import org.springframework.transaction.PlatformTransactionManager; 039import org.springframework.util.Assert; 040 041/** 042 * Builder for a master step in a remote partitioning setup. This builder creates and 043 * sets a {@link MessageChannelPartitionHandler} on the master step. 044 * 045 * <p>If no {@code messagingTemplate} is provided through 046 * {@link RemotePartitioningMasterStepBuilder#messagingTemplate(MessagingTemplate)}, 047 * this builder will create one and set its default channel to the {@code outputChannel} 048 * provided through {@link RemotePartitioningMasterStepBuilder#outputChannel(MessageChannel)}.</p> 049 * 050 * <p>If a {@code messagingTemplate} is provided, it is assumed that it is fully configured 051 * and that its default channel is set to an output channel on which requests to workers 052 * will be sent.</p> 053 * 054 * @since 4.1 055 * @author Mahmoud Ben Hassine 056 */ 057public class RemotePartitioningMasterStepBuilder extends PartitionStepBuilder { 058 059 private static final long DEFAULT_POLL_INTERVAL = 10000L; 060 private static final long DEFAULT_TIMEOUT = -1L; 061 062 private MessagingTemplate messagingTemplate; 063 private MessageChannel inputChannel; 064 private MessageChannel outputChannel; 065 private JobExplorer jobExplorer; 066 private BeanFactory beanFactory; 067 private long pollInterval = DEFAULT_POLL_INTERVAL; 068 private long timeout = DEFAULT_TIMEOUT; 069 070 /** 071 * Create a new {@link RemotePartitioningMasterStepBuilder}. 072 * @param stepName name of the master step 073 */ 074 public RemotePartitioningMasterStepBuilder(String stepName) { 075 super(new StepBuilder(stepName)); 076 } 077 078 /** 079 * Set the input channel on which replies from workers will be received. 080 * @param inputChannel the input channel 081 * @return this builder instance for fluent chaining 082 */ 083 public RemotePartitioningMasterStepBuilder inputChannel(MessageChannel inputChannel) { 084 Assert.notNull(inputChannel, "inputChannel must not be null"); 085 this.inputChannel = inputChannel; 086 return this; 087 } 088 089 /** 090 * Set the output channel on which requests to workers will be sent. By using 091 * this setter, a default messaging template will be created and the output 092 * channel will be set as its default channel. 093 * <p>Use either this setter or {@link RemotePartitioningMasterStepBuilder#messagingTemplate(MessagingTemplate)} 094 * to provide a fully configured messaging template.</p> 095 * 096 * @param outputChannel the output channel. 097 * @return this builder instance for fluent chaining 098 * @see RemotePartitioningMasterStepBuilder#messagingTemplate(MessagingTemplate) 099 */ 100 public RemotePartitioningMasterStepBuilder outputChannel(MessageChannel outputChannel) { 101 Assert.notNull(outputChannel, "outputChannel must not be null"); 102 this.outputChannel = outputChannel; 103 return this; 104 } 105 106 /** 107 * Set the {@link MessagingTemplate} to use to send data to workers. 108 * <strong>The default channel of the messaging template must be set</strong>. 109 * <p>Use either this setter to provide a fully configured messaging template or 110 * provide an output channel through {@link RemotePartitioningMasterStepBuilder#outputChannel(MessageChannel)} 111 * and a default messaging template will be created.</p> 112 * 113 * @param messagingTemplate the messaging template to use 114 * @return this builder instance for fluent chaining 115 * @see RemotePartitioningMasterStepBuilder#outputChannel(MessageChannel) 116 */ 117 public RemotePartitioningMasterStepBuilder messagingTemplate(MessagingTemplate messagingTemplate) { 118 Assert.notNull(messagingTemplate, "messagingTemplate must not be null"); 119 this.messagingTemplate = messagingTemplate; 120 return this; 121 } 122 123 /** 124 * Set the job explorer. 125 * @param jobExplorer the job explorer to use. 126 * @return this builder instance for fluent chaining 127 */ 128 public RemotePartitioningMasterStepBuilder jobExplorer(JobExplorer jobExplorer) { 129 Assert.notNull(jobExplorer, "jobExplorer must not be null"); 130 this.jobExplorer = jobExplorer; 131 return this; 132 } 133 134 /** 135 * How often to poll the job repository for the status of the workers. Defaults to 10 seconds. 136 * @param pollInterval the poll interval value in milliseconds 137 * @return this builder instance for fluent chaining 138 */ 139 public RemotePartitioningMasterStepBuilder pollInterval(long pollInterval) { 140 Assert.isTrue(pollInterval > 0, "The poll interval must be greater than zero"); 141 this.pollInterval = pollInterval; 142 return this; 143 } 144 145 /** 146 * When using job repository polling, the time limit to wait. Defaults to -1 (no timeout). 147 * @param timeout the timeout value in milliseconds 148 * @return this builder instance for fluent chaining 149 */ 150 public RemotePartitioningMasterStepBuilder timeout(long timeout) { 151 this.timeout = timeout; 152 return this; 153 } 154 155 /** 156 * Set the bean factory. 157 * @param beanFactory the bean factory to use 158 * @return this builder instance for fluent chaining 159 */ 160 public RemotePartitioningMasterStepBuilder beanFactory(BeanFactory beanFactory) { 161 this.beanFactory = beanFactory; 162 return this; 163 } 164 165 public Step build() { 166 Assert.state(this.outputChannel == null || this.messagingTemplate == null, 167 "You must specify either an outputChannel or a messagingTemplate but not both."); 168 169 // configure messaging template 170 if (this.messagingTemplate == null) { 171 this.messagingTemplate = new MessagingTemplate(); 172 this.messagingTemplate.setDefaultChannel(this.outputChannel); 173 if (this.logger.isDebugEnabled()) { 174 this.logger.debug("No messagingTemplate was provided, using a default one"); 175 } 176 } 177 178 // Configure the partition handler 179 final MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler(); 180 partitionHandler.setStepName(getStepName()); 181 partitionHandler.setGridSize(getGridSize()); 182 partitionHandler.setMessagingOperations(this.messagingTemplate); 183 184 if (isPolling()) { 185 partitionHandler.setJobExplorer(this.jobExplorer); 186 partitionHandler.setPollInterval(this.pollInterval); 187 partitionHandler.setTimeout(this.timeout); 188 } 189 else { 190 PollableChannel replies = new QueueChannel(); 191 partitionHandler.setReplyChannel(replies); 192 StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows 193 .from(this.inputChannel) 194 .aggregate(aggregatorSpec -> aggregatorSpec.processor(partitionHandler)) 195 .channel(replies) 196 .get(); 197 IntegrationFlowContext integrationFlowContext = this.beanFactory.getBean(IntegrationFlowContext.class); 198 integrationFlowContext.registration(standardIntegrationFlow) 199 .autoStartup(false) 200 .register(); 201 } 202 203 try { 204 partitionHandler.afterPropertiesSet(); 205 super.partitionHandler(partitionHandler); 206 } 207 catch (Exception e) { 208 throw new BeanCreationException("Unable to create a master step for remote partitioning", e); 209 } 210 211 return super.build(); 212 } 213 214 private boolean isPolling() { 215 return this.inputChannel == null; 216 } 217 218 @Override 219 public RemotePartitioningMasterStepBuilder repository(JobRepository jobRepository) { 220 super.repository(jobRepository); 221 return this; 222 } 223 224 @Override 225 public RemotePartitioningMasterStepBuilder transactionManager(PlatformTransactionManager transactionManager) { 226 super.transactionManager(transactionManager); 227 return this; 228 } 229 230 @Override 231 public RemotePartitioningMasterStepBuilder partitioner(String slaveStepName, Partitioner partitioner) { 232 super.partitioner(slaveStepName, partitioner); 233 return this; 234 } 235 236 @Override 237 public RemotePartitioningMasterStepBuilder gridSize(int gridSize) { 238 super.gridSize(gridSize); 239 return this; 240 } 241 242 @Override 243 public RemotePartitioningMasterStepBuilder step(Step step) { 244 super.step(step); 245 return this; 246 } 247 248 @Override 249 public RemotePartitioningMasterStepBuilder splitter(StepExecutionSplitter splitter) { 250 super.splitter(splitter); 251 return this; 252 } 253 254 @Override 255 public RemotePartitioningMasterStepBuilder aggregator(StepExecutionAggregator aggregator) { 256 super.aggregator(aggregator); 257 return this; 258 } 259 260 @Override 261 public RemotePartitioningMasterStepBuilder startLimit(int startLimit) { 262 super.startLimit(startLimit); 263 return this; 264 } 265 266 @Override 267 public RemotePartitioningMasterStepBuilder listener(Object listener) { 268 super.listener(listener); 269 return this; 270 } 271 272 @Override 273 public RemotePartitioningMasterStepBuilder listener(StepExecutionListener listener) { 274 super.listener(listener); 275 return this; 276 } 277 278 @Override 279 public RemotePartitioningMasterStepBuilder allowStartIfComplete(boolean allowStartIfComplete) { 280 super.allowStartIfComplete(allowStartIfComplete); 281 return this; 282 } 283 284 /** 285 * This method will throw a {@link UnsupportedOperationException} since 286 * the partition handler of the master step will be automatically set to an 287 * instance of {@link MessageChannelPartitionHandler}. 288 * 289 * When building a master step for remote partitioning using this builder, 290 * no partition handler must be provided. 291 * 292 * @param partitionHandler a partition handler 293 * @return this builder instance for fluent chaining 294 * @throws UnsupportedOperationException if a partition handler is provided 295 */ 296 @Override 297 public RemotePartitioningMasterStepBuilder partitionHandler(PartitionHandler partitionHandler) throws UnsupportedOperationException { 298 throw new UnsupportedOperationException("When configuring a master step " + 299 "for remote partitioning using the RemotePartitioningMasterStepBuilder, " + 300 "the partition handler will be automatically set to an instance " + 301 "of MessageChannelPartitionHandler. The partition handler must " + 302 "not be provided in this case."); 303 } 304 305}