001/* 002 * Copyright 2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.sample.remotechunking; 017 018import java.util.Arrays; 019 020import org.apache.activemq.ActiveMQConnectionFactory; 021 022import org.springframework.batch.core.Job; 023import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; 024import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; 025import org.springframework.batch.core.step.tasklet.TaskletStep; 026import org.springframework.batch.integration.chunk.RemoteChunkingMasterStepBuilderFactory; 027import org.springframework.batch.integration.config.annotation.EnableBatchIntegration; 028import org.springframework.batch.item.support.ListItemReader; 029import org.springframework.beans.factory.annotation.Autowired; 030import org.springframework.beans.factory.annotation.Value; 031import org.springframework.context.annotation.Bean; 032import org.springframework.context.annotation.Configuration; 033import org.springframework.context.annotation.PropertySource; 034import org.springframework.integration.channel.DirectChannel; 035import org.springframework.integration.channel.QueueChannel; 036import org.springframework.integration.config.EnableIntegration; 037import org.springframework.integration.dsl.IntegrationFlow; 038import org.springframework.integration.dsl.IntegrationFlows; 039import org.springframework.integration.jms.dsl.Jms; 040 041/** 042 * This configuration class is for the master side of the remote chunking sample. 043 * The master step reads numbers from 1 to 6 and sends 2 chunks {1, 2, 3} and 044 * {4, 5, 6} to workers for processing and writing. 045 * 046 * @author Mahmoud Ben Hassine 047 */ 048@Configuration 049@EnableBatchProcessing 050@EnableBatchIntegration 051@EnableIntegration 052@PropertySource("classpath:remote-chunking.properties") 053public class MasterConfiguration { 054 055 @Value("${broker.url}") 056 private String brokerUrl; 057 058 @Autowired 059 private JobBuilderFactory jobBuilderFactory; 060 061 @Autowired 062 private RemoteChunkingMasterStepBuilderFactory masterStepBuilderFactory; 063 064 @Bean 065 public ActiveMQConnectionFactory connectionFactory() { 066 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 067 connectionFactory.setBrokerURL(this.brokerUrl); 068 connectionFactory.setTrustAllPackages(true); 069 return connectionFactory; 070 } 071 072 /* 073 * Configure outbound flow (requests going to workers) 074 */ 075 @Bean 076 public DirectChannel requests() { 077 return new DirectChannel(); 078 } 079 080 @Bean 081 public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) { 082 return IntegrationFlows 083 .from(requests()) 084 .handle(Jms.outboundAdapter(connectionFactory).destination("requests")) 085 .get(); 086 } 087 088 /* 089 * Configure inbound flow (replies coming from workers) 090 */ 091 @Bean 092 public QueueChannel replies() { 093 return new QueueChannel(); 094 } 095 096 @Bean 097 public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) { 098 return IntegrationFlows 099 .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies")) 100 .channel(replies()) 101 .get(); 102 } 103 104 /* 105 * Configure master step components 106 */ 107 @Bean 108 public ListItemReader<Integer> itemReader() { 109 return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6)); 110 } 111 112 @Bean 113 public TaskletStep masterStep() { 114 return this.masterStepBuilderFactory.get("masterStep") 115 .<Integer, Integer>chunk(3) 116 .reader(itemReader()) 117 .outputChannel(requests()) 118 .inputChannel(replies()) 119 .build(); 120 } 121 122 @Bean 123 public Job remoteChunkingJob() { 124 return this.jobBuilderFactory.get("remoteChunkingJob") 125 .start(masterStep()) 126 .build(); 127 } 128 129}