001/*
002 * Copyright 2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *       https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.sample.remotepartitioning.polling;
017
018import org.apache.activemq.ActiveMQConnectionFactory;
019
020import org.springframework.batch.core.Job;
021import org.springframework.batch.core.Step;
022import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
023import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
024import org.springframework.batch.integration.config.annotation.EnableBatchIntegration;
025import org.springframework.batch.integration.partition.RemotePartitioningMasterStepBuilderFactory;
026import org.springframework.batch.sample.remotepartitioning.BasicPartitioner;
027import org.springframework.batch.sample.remotepartitioning.BrokerConfiguration;
028import org.springframework.batch.sample.remotepartitioning.DataSourceConfiguration;
029import org.springframework.context.annotation.Bean;
030import org.springframework.context.annotation.Configuration;
031import org.springframework.context.annotation.Import;
032import org.springframework.integration.channel.DirectChannel;
033import org.springframework.integration.dsl.IntegrationFlow;
034import org.springframework.integration.dsl.IntegrationFlows;
035import org.springframework.integration.jms.dsl.Jms;
036
037/**
038 * This configuration class is for the master side of the remote partitioning sample.
039 * The master step will create 3 partitions for workers to process.
040 *
041 * @author Mahmoud Ben Hassine
042 */
043@Configuration
044@EnableBatchProcessing
045@EnableBatchIntegration
046@Import(value = {DataSourceConfiguration.class, BrokerConfiguration.class})
047public class MasterConfiguration {
048
049        private static final int GRID_SIZE = 3;
050
051        private final JobBuilderFactory jobBuilderFactory;
052
053        private final RemotePartitioningMasterStepBuilderFactory masterStepBuilderFactory;
054
055
056        public MasterConfiguration(JobBuilderFactory jobBuilderFactory,
057                                                           RemotePartitioningMasterStepBuilderFactory masterStepBuilderFactory) {
058
059                this.jobBuilderFactory = jobBuilderFactory;
060                this.masterStepBuilderFactory = masterStepBuilderFactory;
061        }
062
063        /*
064         * Configure outbound flow (requests going to workers)
065         */
066        @Bean
067        public DirectChannel requests() {
068                return new DirectChannel();
069        }
070
071        @Bean
072        public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
073                return IntegrationFlows
074                                .from(requests())
075                                .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
076                                .get();
077        }
078
079        /*
080         * Configure the master step
081         */
082        @Bean
083        public Step masterStep() {
084                return this.masterStepBuilderFactory.get("masterStep")
085                                .partitioner("workerStep", new BasicPartitioner())
086                                .gridSize(GRID_SIZE)
087                                .outputChannel(requests())
088                                .build();
089        }
090
091        @Bean
092        public Job remotePartitioningJob() {
093                return this.jobBuilderFactory.get("remotePartitioningJob")
094                                .start(masterStep())
095                                .build();
096        }
097
098}