001/* 002 * Copyright 2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.sample.remotepartitioning.polling; 017 018import org.apache.activemq.ActiveMQConnectionFactory; 019 020import org.springframework.batch.core.Job; 021import org.springframework.batch.core.Step; 022import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; 023import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; 024import org.springframework.batch.integration.config.annotation.EnableBatchIntegration; 025import org.springframework.batch.integration.partition.RemotePartitioningMasterStepBuilderFactory; 026import org.springframework.batch.sample.remotepartitioning.BasicPartitioner; 027import org.springframework.batch.sample.remotepartitioning.BrokerConfiguration; 028import org.springframework.batch.sample.remotepartitioning.DataSourceConfiguration; 029import org.springframework.context.annotation.Bean; 030import org.springframework.context.annotation.Configuration; 031import org.springframework.context.annotation.Import; 032import org.springframework.integration.channel.DirectChannel; 033import org.springframework.integration.dsl.IntegrationFlow; 034import org.springframework.integration.dsl.IntegrationFlows; 035import org.springframework.integration.jms.dsl.Jms; 036 037/** 038 * This configuration class is for the master side of the remote partitioning sample. 039 * The master step will create 3 partitions for workers to process. 040 * 041 * @author Mahmoud Ben Hassine 042 */ 043@Configuration 044@EnableBatchProcessing 045@EnableBatchIntegration 046@Import(value = {DataSourceConfiguration.class, BrokerConfiguration.class}) 047public class MasterConfiguration { 048 049 private static final int GRID_SIZE = 3; 050 051 private final JobBuilderFactory jobBuilderFactory; 052 053 private final RemotePartitioningMasterStepBuilderFactory masterStepBuilderFactory; 054 055 056 public MasterConfiguration(JobBuilderFactory jobBuilderFactory, 057 RemotePartitioningMasterStepBuilderFactory masterStepBuilderFactory) { 058 059 this.jobBuilderFactory = jobBuilderFactory; 060 this.masterStepBuilderFactory = masterStepBuilderFactory; 061 } 062 063 /* 064 * Configure outbound flow (requests going to workers) 065 */ 066 @Bean 067 public DirectChannel requests() { 068 return new DirectChannel(); 069 } 070 071 @Bean 072 public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) { 073 return IntegrationFlows 074 .from(requests()) 075 .handle(Jms.outboundAdapter(connectionFactory).destination("requests")) 076 .get(); 077 } 078 079 /* 080 * Configure the master step 081 */ 082 @Bean 083 public Step masterStep() { 084 return this.masterStepBuilderFactory.get("masterStep") 085 .partitioner("workerStep", new BasicPartitioner()) 086 .gridSize(GRID_SIZE) 087 .outputChannel(requests()) 088 .build(); 089 } 090 091 @Bean 092 public Job remotePartitioningJob() { 093 return this.jobBuilderFactory.get("remotePartitioningJob") 094 .start(masterStep()) 095 .build(); 096 } 097 098}