001/* 002 * Copyright 2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.sample.remotechunking; 017 018import org.apache.activemq.ActiveMQConnectionFactory; 019 020import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; 021import org.springframework.batch.integration.chunk.RemoteChunkingWorkerBuilder; 022import org.springframework.batch.integration.config.annotation.EnableBatchIntegration; 023import org.springframework.batch.item.ItemProcessor; 024import org.springframework.batch.item.ItemWriter; 025import org.springframework.beans.factory.annotation.Autowired; 026import org.springframework.beans.factory.annotation.Value; 027import org.springframework.context.annotation.Bean; 028import org.springframework.context.annotation.Configuration; 029import org.springframework.context.annotation.PropertySource; 030import org.springframework.integration.channel.DirectChannel; 031import org.springframework.integration.config.EnableIntegration; 032import org.springframework.integration.dsl.IntegrationFlow; 033import org.springframework.integration.dsl.IntegrationFlows; 034import org.springframework.integration.jms.dsl.Jms; 035 036/** 037 * This configuration class is for the worker side of the remote chunking sample. 038 * It uses the {@link RemoteChunkingWorkerBuilder} to configure an 039 * {@link IntegrationFlow} in order to: 040 * <ul> 041 * <li>receive requests from the master</li> 042 * <li>process chunks with the configured item processor and writer</li> 043 * <li>send replies to the master</li> 044 * </ul> 045 * 046 * @author Mahmoud Ben Hassine 047 */ 048@Configuration 049@EnableBatchProcessing 050@EnableBatchIntegration 051@EnableIntegration 052@PropertySource("classpath:remote-chunking.properties") 053public class WorkerConfiguration { 054 055 @Value("${broker.url}") 056 private String brokerUrl; 057 058 @Autowired 059 private RemoteChunkingWorkerBuilder<Integer, Integer> remoteChunkingWorkerBuilder; 060 061 @Bean 062 public ActiveMQConnectionFactory connectionFactory() { 063 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 064 connectionFactory.setBrokerURL(this.brokerUrl); 065 connectionFactory.setTrustAllPackages(true); 066 return connectionFactory; 067 } 068 069 /* 070 * Configure inbound flow (requests coming from the master) 071 */ 072 @Bean 073 public DirectChannel requests() { 074 return new DirectChannel(); 075 } 076 077 @Bean 078 public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) { 079 return IntegrationFlows 080 .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests")) 081 .channel(requests()) 082 .get(); 083 } 084 085 /* 086 * Configure outbound flow (replies going to the master) 087 */ 088 @Bean 089 public DirectChannel replies() { 090 return new DirectChannel(); 091 } 092 093 @Bean 094 public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) { 095 return IntegrationFlows 096 .from(replies()) 097 .handle(Jms.outboundAdapter(connectionFactory).destination("replies")) 098 .get(); 099 } 100 101 /* 102 * Configure worker components 103 */ 104 @Bean 105 public ItemProcessor<Integer, Integer> itemProcessor() { 106 return item -> { 107 System.out.println("processing item " + item); 108 return item; 109 }; 110 } 111 112 @Bean 113 public ItemWriter<Integer> itemWriter() { 114 return items -> { 115 for (Integer item : items) { 116 System.out.println("writing item " + item); 117 } 118 }; 119 } 120 121 @Bean 122 public IntegrationFlow workerIntegrationFlow() { 123 return this.remoteChunkingWorkerBuilder 124 .itemProcessor(itemProcessor()) 125 .itemWriter(itemWriter()) 126 .inputChannel(requests()) 127 .outputChannel(replies()) 128 .build(); 129 } 130 131}