001/* 002 * Copyright 2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.sample.remotepartitioning.aggregating; 017 018import org.apache.activemq.ActiveMQConnectionFactory; 019 020import org.springframework.batch.core.Job; 021import org.springframework.batch.core.Step; 022import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; 023import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; 024import org.springframework.batch.integration.config.annotation.EnableBatchIntegration; 025import org.springframework.batch.integration.partition.RemotePartitioningMasterStepBuilderFactory; 026import org.springframework.batch.sample.remotepartitioning.BasicPartitioner; 027import org.springframework.batch.sample.remotepartitioning.BrokerConfiguration; 028import org.springframework.batch.sample.remotepartitioning.DataSourceConfiguration; 029import org.springframework.context.annotation.Bean; 030import org.springframework.context.annotation.Configuration; 031import org.springframework.context.annotation.Import; 032import org.springframework.integration.channel.DirectChannel; 033import org.springframework.integration.dsl.IntegrationFlow; 034import org.springframework.integration.dsl.IntegrationFlows; 035import org.springframework.integration.jms.dsl.Jms; 036 037/** 038 * This configuration class is for the master side of the remote partitioning sample. 039 * The master step will create 3 partitions for workers to process. 040 * 041 * @author Mahmoud Ben Hassine 042 */ 043@Configuration 044@EnableBatchProcessing 045@EnableBatchIntegration 046@Import(value = {DataSourceConfiguration.class, BrokerConfiguration.class}) 047public class MasterConfiguration { 048 049 private static final int GRID_SIZE = 3; 050 051 private final JobBuilderFactory jobBuilderFactory; 052 053 private final RemotePartitioningMasterStepBuilderFactory masterStepBuilderFactory; 054 055 056 public MasterConfiguration(JobBuilderFactory jobBuilderFactory, 057 RemotePartitioningMasterStepBuilderFactory masterStepBuilderFactory) { 058 059 this.jobBuilderFactory = jobBuilderFactory; 060 this.masterStepBuilderFactory = masterStepBuilderFactory; 061 } 062 063 /* 064 * Configure outbound flow (requests going to workers) 065 */ 066 @Bean 067 public DirectChannel requests() { 068 return new DirectChannel(); 069 } 070 071 @Bean 072 public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) { 073 return IntegrationFlows 074 .from(requests()) 075 .handle(Jms.outboundAdapter(connectionFactory).destination("requests")) 076 .get(); 077 } 078 079 /* 080 * Configure inbound flow (replies coming from workers) 081 */ 082 @Bean 083 public DirectChannel replies() { 084 return new DirectChannel(); 085 } 086 087 @Bean 088 public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) { 089 return IntegrationFlows 090 .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies")) 091 .channel(replies()) 092 .get(); 093 } 094 095 /* 096 * Configure the master step 097 */ 098 @Bean 099 public Step masterStep() { 100 return this.masterStepBuilderFactory.get("masterStep") 101 .partitioner("workerStep", new BasicPartitioner()) 102 .gridSize(GRID_SIZE) 103 .outputChannel(requests()) 104 .inputChannel(replies()) 105 .build(); 106 } 107 108 @Bean 109 public Job remotePartitioningJob() { 110 return this.jobBuilderFactory.get("remotePartitioningJob") 111 .start(masterStep()) 112 .build(); 113 } 114 115}