001/*
002 * Copyright 2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *       https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.sample.remotechunking;
017
018import java.util.Arrays;
019
020import org.apache.activemq.ActiveMQConnectionFactory;
021
022import org.springframework.batch.core.Job;
023import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
024import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
025import org.springframework.batch.core.step.tasklet.TaskletStep;
026import org.springframework.batch.integration.chunk.RemoteChunkingMasterStepBuilderFactory;
027import org.springframework.batch.integration.config.annotation.EnableBatchIntegration;
028import org.springframework.batch.item.support.ListItemReader;
029import org.springframework.beans.factory.annotation.Autowired;
030import org.springframework.beans.factory.annotation.Value;
031import org.springframework.context.annotation.Bean;
032import org.springframework.context.annotation.Configuration;
033import org.springframework.context.annotation.PropertySource;
034import org.springframework.integration.channel.DirectChannel;
035import org.springframework.integration.channel.QueueChannel;
036import org.springframework.integration.config.EnableIntegration;
037import org.springframework.integration.dsl.IntegrationFlow;
038import org.springframework.integration.dsl.IntegrationFlows;
039import org.springframework.integration.jms.dsl.Jms;
040
041/**
042 * This configuration class is for the master side of the remote chunking sample.
043 * The master step reads numbers from 1 to 6 and sends 2 chunks {1, 2, 3} and
044 * {4, 5, 6} to workers for processing and writing.
045 *
046 * @author Mahmoud Ben Hassine
047 */
048@Configuration
049@EnableBatchProcessing
050@EnableBatchIntegration
051@EnableIntegration
052@PropertySource("classpath:remote-chunking.properties")
053public class MasterConfiguration {
054
055        @Value("${broker.url}")
056        private String brokerUrl;
057
058        @Autowired
059        private JobBuilderFactory jobBuilderFactory;
060
061        @Autowired
062        private RemoteChunkingMasterStepBuilderFactory masterStepBuilderFactory;
063
064        @Bean
065        public ActiveMQConnectionFactory connectionFactory() {
066                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
067                connectionFactory.setBrokerURL(this.brokerUrl);
068                connectionFactory.setTrustAllPackages(true);
069                return connectionFactory;
070        }
071
072        /*
073         * Configure outbound flow (requests going to workers)
074         */
075        @Bean
076        public DirectChannel requests() {
077                return new DirectChannel();
078        }
079
080        @Bean
081        public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
082                return IntegrationFlows
083                                .from(requests())
084                                .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
085                                .get();
086        }
087
088        /*
089         * Configure inbound flow (replies coming from workers)
090         */
091        @Bean
092        public QueueChannel replies() {
093                return new QueueChannel();
094        }
095
096        @Bean
097        public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
098                return IntegrationFlows
099                                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
100                                .channel(replies())
101                                .get();
102        }
103
104        /*
105         * Configure master step components
106         */
107        @Bean
108        public ListItemReader<Integer> itemReader() {
109                return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6));
110        }
111
112        @Bean
113        public TaskletStep masterStep() {
114                return this.masterStepBuilderFactory.get("masterStep")
115                                .<Integer, Integer>chunk(3)
116                                .reader(itemReader())
117                                .outputChannel(requests())
118                                .inputChannel(replies())
119                                .build();
120        }
121
122        @Bean
123        public Job remoteChunkingJob() {
124                return this.jobBuilderFactory.get("remoteChunkingJob")
125                                .start(masterStep())
126                                .build();
127        }
128
129}