001/* 002 * Copyright 2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.integration.partition; 018 019import org.apache.commons.logging.Log; 020import org.apache.commons.logging.LogFactory; 021 022import org.springframework.batch.core.Job; 023import org.springframework.batch.core.Step; 024import org.springframework.batch.core.StepExecutionListener; 025import org.springframework.batch.core.explore.JobExplorer; 026import org.springframework.batch.core.job.flow.Flow; 027import org.springframework.batch.core.partition.support.Partitioner; 028import org.springframework.batch.core.repository.JobRepository; 029import org.springframework.batch.core.step.StepLocator; 030import org.springframework.batch.core.step.builder.FlowStepBuilder; 031import org.springframework.batch.core.step.builder.JobStepBuilder; 032import org.springframework.batch.core.step.builder.PartitionStepBuilder; 033import org.springframework.batch.core.step.builder.SimpleStepBuilder; 034import org.springframework.batch.core.step.builder.StepBuilder; 035import org.springframework.batch.core.step.builder.TaskletStepBuilder; 036import org.springframework.batch.core.step.tasklet.Tasklet; 037import org.springframework.batch.repeat.CompletionPolicy; 038import org.springframework.beans.factory.BeanFactory; 039import org.springframework.integration.channel.NullChannel; 040import org.springframework.integration.dsl.IntegrationFlow; 041import org.springframework.integration.dsl.IntegrationFlows; 042import org.springframework.integration.dsl.StandardIntegrationFlow; 043import org.springframework.integration.dsl.context.IntegrationFlowContext; 044import org.springframework.messaging.MessageChannel; 045import org.springframework.transaction.PlatformTransactionManager; 046import org.springframework.util.Assert; 047 048/** 049 * Builder for a worker step in a remote partitioning setup. This builder 050 * creates an {@link IntegrationFlow} that: 051 * 052 * <ul> 053 * <li>listens to {@link StepExecutionRequest}s coming from the master 054 * on the input channel</li> 055 * <li>invokes the {@link StepExecutionRequestHandler} to execute the worker 056 * step for each incoming request. The worker step is located using the provided 057 * {@link StepLocator}. If no {@link StepLocator} is provided, a {@link BeanFactoryStepLocator} 058 * configured with the current {@link BeanFactory} will be used 059 * <li>replies to the master on the output channel (when the master step is 060 * configured to aggregate replies from workers). If no output channel 061 * is provided, a {@link NullChannel} will be used (assuming the master side 062 * is configured to poll the job repository for workers status)</li> 063 * </ul> 064 * 065 * @since 4.1 066 * @author Mahmoud Ben Hassine 067 */ 068public class RemotePartitioningWorkerStepBuilder extends StepBuilder { 069 070 private static final String SERVICE_ACTIVATOR_METHOD_NAME = "handle"; 071 private static final Log logger = LogFactory.getLog(RemotePartitioningWorkerStepBuilder.class); 072 073 private MessageChannel inputChannel; 074 private MessageChannel outputChannel; 075 private JobExplorer jobExplorer; 076 private StepLocator stepLocator; 077 private BeanFactory beanFactory; 078 079 /** 080 * Initialize a step builder for a step with the given name. 081 * @param name the name of the step 082 */ 083 public RemotePartitioningWorkerStepBuilder(String name) { 084 super(name); 085 } 086 087 /** 088 * Set the input channel on which step execution requests sent by the master 089 * are received. 090 * @param inputChannel the input channel 091 * @return this builder instance for fluent chaining 092 */ 093 public RemotePartitioningWorkerStepBuilder inputChannel(MessageChannel inputChannel) { 094 Assert.notNull(inputChannel, "inputChannel must not be null"); 095 this.inputChannel = inputChannel; 096 return this; 097 } 098 099 /** 100 * Set the output channel on which replies will be sent to the master step. 101 * @param outputChannel the input channel 102 * @return this builder instance for fluent chaining 103 */ 104 public RemotePartitioningWorkerStepBuilder outputChannel(MessageChannel outputChannel) { 105 Assert.notNull(outputChannel, "outputChannel must not be null"); 106 this.outputChannel = outputChannel; 107 return this; 108 } 109 110 /** 111 * Set the job explorer. 112 * @param jobExplorer the job explorer to use 113 * @return this builder instance for fluent chaining 114 */ 115 public RemotePartitioningWorkerStepBuilder jobExplorer(JobExplorer jobExplorer) { 116 Assert.notNull(jobExplorer, "jobExplorer must not be null"); 117 this.jobExplorer = jobExplorer; 118 return this; 119 } 120 121 /** 122 * Set the step locator used to locate the worker step to execute. 123 * @param stepLocator the step locator to use 124 * @return this builder instance for fluent chaining 125 */ 126 public RemotePartitioningWorkerStepBuilder stepLocator(StepLocator stepLocator) { 127 Assert.notNull(stepLocator, "stepLocator must not be null"); 128 this.stepLocator = stepLocator; 129 return this; 130 } 131 132 /** 133 * Set the bean factory. 134 * @param beanFactory the bean factory 135 * @return this builder instance for fluent chaining 136 */ 137 public RemotePartitioningWorkerStepBuilder beanFactory(BeanFactory beanFactory) { 138 Assert.notNull(beanFactory, "beanFactory must not be null"); 139 this.beanFactory = beanFactory; 140 return this; 141 } 142 143 @Override 144 public RemotePartitioningWorkerStepBuilder repository(JobRepository jobRepository) { 145 super.repository(jobRepository); 146 return this; 147 } 148 149 @Override 150 public RemotePartitioningWorkerStepBuilder transactionManager(PlatformTransactionManager transactionManager) { 151 super.transactionManager(transactionManager); 152 return this; 153 } 154 155 @Override 156 public RemotePartitioningWorkerStepBuilder startLimit(int startLimit) { 157 super.startLimit(startLimit); 158 return this; 159 } 160 161 @Override 162 public RemotePartitioningWorkerStepBuilder listener(Object listener) { 163 super.listener(listener); 164 return this; 165 } 166 167 @Override 168 public RemotePartitioningWorkerStepBuilder listener(StepExecutionListener listener) { 169 super.listener(listener); 170 return this; 171 } 172 173 @Override 174 public RemotePartitioningWorkerStepBuilder allowStartIfComplete(boolean allowStartIfComplete) { 175 super.allowStartIfComplete(allowStartIfComplete); 176 return this; 177 } 178 179 @Override 180 public TaskletStepBuilder tasklet(Tasklet tasklet) { 181 configureWorkerIntegrationFlow(); 182 return super.tasklet(tasklet); 183 } 184 185 @Override 186 public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize) { 187 configureWorkerIntegrationFlow(); 188 return super.chunk(chunkSize); 189 } 190 191 @Override 192 public <I, O> SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) { 193 configureWorkerIntegrationFlow(); 194 return super.chunk(completionPolicy); 195 } 196 197 @Override 198 public PartitionStepBuilder partitioner(String stepName, Partitioner partitioner) { 199 configureWorkerIntegrationFlow(); 200 return super.partitioner(stepName, partitioner); 201 } 202 203 @Override 204 public PartitionStepBuilder partitioner(Step step) { 205 configureWorkerIntegrationFlow(); 206 return super.partitioner(step); 207 } 208 209 @Override 210 public JobStepBuilder job(Job job) { 211 configureWorkerIntegrationFlow(); 212 return super.job(job); 213 } 214 215 @Override 216 public FlowStepBuilder flow(Flow flow) { 217 configureWorkerIntegrationFlow(); 218 return super.flow(flow); 219 } 220 221 /** 222 * Create an {@link IntegrationFlow} with a {@link StepExecutionRequestHandler} 223 * configured as a service activator listening to the input channel and replying 224 * on the output channel. 225 */ 226 private void configureWorkerIntegrationFlow() { 227 Assert.notNull(this.inputChannel, "An InputChannel must be provided"); 228 Assert.notNull(this.jobExplorer, "A JobExplorer must be provided"); 229 230 if (this.stepLocator == null) { 231 BeanFactoryStepLocator beanFactoryStepLocator = new BeanFactoryStepLocator(); 232 beanFactoryStepLocator.setBeanFactory(this.beanFactory); 233 this.stepLocator = beanFactoryStepLocator; 234 } 235 if (this.outputChannel == null) { 236 if (logger.isDebugEnabled()) { 237 logger.debug("The output channel is set to a NullChannel. " + 238 "The master step must poll the job repository for workers status."); 239 } 240 this.outputChannel = new NullChannel(); 241 } 242 243 StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler(); 244 stepExecutionRequestHandler.setJobExplorer(this.jobExplorer); 245 stepExecutionRequestHandler.setStepLocator(this.stepLocator); 246 247 StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows 248 .from(this.inputChannel) 249 .handle(stepExecutionRequestHandler, SERVICE_ACTIVATOR_METHOD_NAME) 250 .channel(this.outputChannel) 251 .get(); 252 IntegrationFlowContext integrationFlowContext = this.beanFactory.getBean(IntegrationFlowContext.class); 253 integrationFlowContext.registration(standardIntegrationFlow) 254 .autoStartup(false) 255 .register(); 256 } 257 258}