001/*
002 * Copyright 2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *       https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.integration.partition;
018
019import org.apache.commons.logging.Log;
020import org.apache.commons.logging.LogFactory;
021
022import org.springframework.batch.core.Job;
023import org.springframework.batch.core.Step;
024import org.springframework.batch.core.StepExecutionListener;
025import org.springframework.batch.core.explore.JobExplorer;
026import org.springframework.batch.core.job.flow.Flow;
027import org.springframework.batch.core.partition.support.Partitioner;
028import org.springframework.batch.core.repository.JobRepository;
029import org.springframework.batch.core.step.StepLocator;
030import org.springframework.batch.core.step.builder.FlowStepBuilder;
031import org.springframework.batch.core.step.builder.JobStepBuilder;
032import org.springframework.batch.core.step.builder.PartitionStepBuilder;
033import org.springframework.batch.core.step.builder.SimpleStepBuilder;
034import org.springframework.batch.core.step.builder.StepBuilder;
035import org.springframework.batch.core.step.builder.TaskletStepBuilder;
036import org.springframework.batch.core.step.tasklet.Tasklet;
037import org.springframework.batch.repeat.CompletionPolicy;
038import org.springframework.beans.factory.BeanFactory;
039import org.springframework.integration.channel.NullChannel;
040import org.springframework.integration.dsl.IntegrationFlow;
041import org.springframework.integration.dsl.IntegrationFlows;
042import org.springframework.integration.dsl.StandardIntegrationFlow;
043import org.springframework.integration.dsl.context.IntegrationFlowContext;
044import org.springframework.messaging.MessageChannel;
045import org.springframework.transaction.PlatformTransactionManager;
046import org.springframework.util.Assert;
047
048/**
049 * Builder for a worker step in a remote partitioning setup. This builder
050 * creates an {@link IntegrationFlow} that:
051 *
052 * <ul>
053 *     <li>listens to {@link StepExecutionRequest}s coming from the master
054 *     on the input channel</li>
055 *     <li>invokes the {@link StepExecutionRequestHandler} to execute the worker
056 *     step for each incoming request. The worker step is located using the provided
057 *     {@link StepLocator}. If no {@link StepLocator} is provided, a {@link BeanFactoryStepLocator}
058 *     configured with the current {@link BeanFactory} will be used
059 *     <li>replies to the master on the output channel (when the master step is
060 *     configured to aggregate replies from workers). If no output channel
061 *     is provided, a {@link NullChannel} will be used (assuming the master side
062 *     is configured to poll the job repository for workers status)</li>
063 * </ul>
064 *
065 * @since 4.1
066 * @author Mahmoud Ben Hassine
067 */
068public class RemotePartitioningWorkerStepBuilder extends StepBuilder {
069
070        private static final String SERVICE_ACTIVATOR_METHOD_NAME = "handle";
071        private static final Log logger = LogFactory.getLog(RemotePartitioningWorkerStepBuilder.class);
072
073        private MessageChannel inputChannel;
074        private MessageChannel outputChannel;
075        private JobExplorer jobExplorer;
076        private StepLocator stepLocator;
077        private BeanFactory beanFactory;
078
079        /**
080         * Initialize a step builder for a step with the given name.
081         * @param name the name of the step
082         */
083        public RemotePartitioningWorkerStepBuilder(String name) {
084                super(name);
085        }
086
087        /**
088         * Set the input channel on which step execution requests sent by the master
089         * are received.
090         * @param inputChannel the input channel
091         * @return this builder instance for fluent chaining
092         */
093        public RemotePartitioningWorkerStepBuilder inputChannel(MessageChannel inputChannel) {
094                Assert.notNull(inputChannel, "inputChannel must not be null");
095                this.inputChannel = inputChannel;
096                return this;
097        }
098
099        /**
100         * Set the output channel on which replies will be sent to the master step.
101         * @param outputChannel the input channel
102         * @return this builder instance for fluent chaining
103         */
104        public RemotePartitioningWorkerStepBuilder outputChannel(MessageChannel outputChannel) {
105                Assert.notNull(outputChannel, "outputChannel must not be null");
106                this.outputChannel = outputChannel;
107                return this;
108        }
109
110        /**
111         * Set the job explorer.
112         * @param jobExplorer the job explorer to use
113         * @return this builder instance for fluent chaining
114         */
115        public RemotePartitioningWorkerStepBuilder jobExplorer(JobExplorer jobExplorer) {
116                Assert.notNull(jobExplorer, "jobExplorer must not be null");
117                this.jobExplorer = jobExplorer;
118                return this;
119        }
120
121        /**
122         * Set the step locator used to locate the worker step to execute.
123         * @param stepLocator the step locator to use
124         * @return this builder instance for fluent chaining
125         */
126        public RemotePartitioningWorkerStepBuilder stepLocator(StepLocator stepLocator) {
127                Assert.notNull(stepLocator, "stepLocator must not be null");
128                this.stepLocator = stepLocator;
129                return this;
130        }
131
132        /**
133         * Set the bean factory.
134         * @param beanFactory the bean factory
135         * @return this builder instance for fluent chaining
136         */
137        public RemotePartitioningWorkerStepBuilder beanFactory(BeanFactory beanFactory) {
138                Assert.notNull(beanFactory, "beanFactory must not be null");
139                this.beanFactory = beanFactory;
140                return this;
141        }
142
143        @Override
144        public RemotePartitioningWorkerStepBuilder repository(JobRepository jobRepository) {
145                super.repository(jobRepository);
146                return this;
147        }
148
149        @Override
150        public RemotePartitioningWorkerStepBuilder transactionManager(PlatformTransactionManager transactionManager) {
151                super.transactionManager(transactionManager);
152                return this;
153        }
154
155        @Override
156        public RemotePartitioningWorkerStepBuilder startLimit(int startLimit) {
157                super.startLimit(startLimit);
158                return this;
159        }
160
161        @Override
162        public RemotePartitioningWorkerStepBuilder listener(Object listener) {
163                super.listener(listener);
164                return this;
165        }
166
167        @Override
168        public RemotePartitioningWorkerStepBuilder listener(StepExecutionListener listener) {
169                super.listener(listener);
170                return this;
171        }
172
173        @Override
174        public RemotePartitioningWorkerStepBuilder allowStartIfComplete(boolean allowStartIfComplete) {
175                super.allowStartIfComplete(allowStartIfComplete);
176                return this;
177        }
178
179        @Override
180        public TaskletStepBuilder tasklet(Tasklet tasklet) {
181                configureWorkerIntegrationFlow();
182                return super.tasklet(tasklet);
183        }
184
185        @Override
186        public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize) {
187                configureWorkerIntegrationFlow();
188                return super.chunk(chunkSize);
189        }
190
191        @Override
192        public <I, O> SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) {
193                configureWorkerIntegrationFlow();
194                return super.chunk(completionPolicy);
195        }
196
197        @Override
198        public PartitionStepBuilder partitioner(String stepName, Partitioner partitioner) {
199                configureWorkerIntegrationFlow();
200                return super.partitioner(stepName, partitioner);
201        }
202
203        @Override
204        public PartitionStepBuilder partitioner(Step step) {
205                configureWorkerIntegrationFlow();
206                return super.partitioner(step);
207        }
208
209        @Override
210        public JobStepBuilder job(Job job) {
211                configureWorkerIntegrationFlow();
212                return super.job(job);
213        }
214
215        @Override
216        public FlowStepBuilder flow(Flow flow) {
217                configureWorkerIntegrationFlow();
218                return super.flow(flow);
219        }
220
221        /**
222         * Create an {@link IntegrationFlow} with a {@link StepExecutionRequestHandler}
223         * configured as a service activator listening to the input channel and replying
224         * on the output channel.
225         */
226        private void configureWorkerIntegrationFlow() {
227                Assert.notNull(this.inputChannel, "An InputChannel must be provided");
228                Assert.notNull(this.jobExplorer, "A JobExplorer must be provided");
229
230                if (this.stepLocator == null) {
231                        BeanFactoryStepLocator beanFactoryStepLocator = new BeanFactoryStepLocator();
232                        beanFactoryStepLocator.setBeanFactory(this.beanFactory);
233                        this.stepLocator = beanFactoryStepLocator;
234                }
235                if (this.outputChannel == null) {
236                        if (logger.isDebugEnabled()) {
237                                logger.debug("The output channel is set to a NullChannel. " +
238                                                "The master step must poll the job repository for workers status.");
239                        }
240                        this.outputChannel = new NullChannel();
241                }
242
243                StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();
244                stepExecutionRequestHandler.setJobExplorer(this.jobExplorer);
245                stepExecutionRequestHandler.setStepLocator(this.stepLocator);
246
247                StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows
248                                .from(this.inputChannel)
249                                .handle(stepExecutionRequestHandler, SERVICE_ACTIVATOR_METHOD_NAME)
250                                .channel(this.outputChannel)
251                                .get();
252                IntegrationFlowContext integrationFlowContext = this.beanFactory.getBean(IntegrationFlowContext.class);
253                integrationFlowContext.registration(standardIntegrationFlow)
254                                .autoStartup(false)
255                                .register();
256        }
257
258}