001/*
002 * Copyright 2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *       https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.sample.remotechunking;
017
018import org.apache.activemq.ActiveMQConnectionFactory;
019
020import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
021import org.springframework.batch.integration.chunk.RemoteChunkingWorkerBuilder;
022import org.springframework.batch.integration.config.annotation.EnableBatchIntegration;
023import org.springframework.batch.item.ItemProcessor;
024import org.springframework.batch.item.ItemWriter;
025import org.springframework.beans.factory.annotation.Autowired;
026import org.springframework.beans.factory.annotation.Value;
027import org.springframework.context.annotation.Bean;
028import org.springframework.context.annotation.Configuration;
029import org.springframework.context.annotation.PropertySource;
030import org.springframework.integration.channel.DirectChannel;
031import org.springframework.integration.config.EnableIntegration;
032import org.springframework.integration.dsl.IntegrationFlow;
033import org.springframework.integration.dsl.IntegrationFlows;
034import org.springframework.integration.jms.dsl.Jms;
035
036/**
037 * This configuration class is for the worker side of the remote chunking sample.
038 * It uses the {@link RemoteChunkingWorkerBuilder} to configure an
039 * {@link IntegrationFlow} in order to:
040 * <ul>
041 *     <li>receive requests from the master</li>
042 *     <li>process chunks with the configured item processor and writer</li>
043 *     <li>send replies to the master</li>
044 * </ul>
045 *
046 * @author Mahmoud Ben Hassine
047 */
048@Configuration
049@EnableBatchProcessing
050@EnableBatchIntegration
051@EnableIntegration
052@PropertySource("classpath:remote-chunking.properties")
053public class WorkerConfiguration {
054
055        @Value("${broker.url}")
056        private String brokerUrl;
057
058        @Autowired
059        private RemoteChunkingWorkerBuilder<Integer, Integer> remoteChunkingWorkerBuilder;
060
061        @Bean
062        public ActiveMQConnectionFactory connectionFactory() {
063                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
064                connectionFactory.setBrokerURL(this.brokerUrl);
065                connectionFactory.setTrustAllPackages(true);
066                return connectionFactory;
067        }
068
069        /*
070         * Configure inbound flow (requests coming from the master)
071         */
072        @Bean
073        public DirectChannel requests() {
074                return new DirectChannel();
075        }
076
077        @Bean
078        public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
079                return IntegrationFlows
080                                .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
081                                .channel(requests())
082                                .get();
083        }
084
085        /*
086         * Configure outbound flow (replies going to the master)
087         */
088        @Bean
089        public DirectChannel replies() {
090                return new DirectChannel();
091        }
092
093        @Bean
094        public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
095                return IntegrationFlows
096                                .from(replies())
097                                .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
098                                .get();
099        }
100
101        /*
102         * Configure worker components
103         */
104        @Bean
105        public ItemProcessor<Integer, Integer> itemProcessor() {
106                return item -> {
107                        System.out.println("processing item " + item);
108                        return item;
109                };
110        }
111
112        @Bean
113        public ItemWriter<Integer> itemWriter() {
114                return items -> {
115                        for (Integer item : items) {
116                                System.out.println("writing item " + item);
117                        }
118                };
119        }
120
121        @Bean
122        public IntegrationFlow workerIntegrationFlow() {
123                return this.remoteChunkingWorkerBuilder
124                                .itemProcessor(itemProcessor())
125                                .itemWriter(itemWriter())
126                                .inputChannel(requests())
127                                .outputChannel(replies())
128                                .build();
129        }
130
131}