001/* 002 * Copyright 2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.sample.remotepartitioning.polling; 017 018import org.apache.activemq.ActiveMQConnectionFactory; 019 020import org.springframework.batch.core.Step; 021import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; 022import org.springframework.batch.core.configuration.annotation.StepScope; 023import org.springframework.batch.core.step.tasklet.Tasklet; 024import org.springframework.batch.integration.config.annotation.EnableBatchIntegration; 025import org.springframework.batch.integration.partition.RemotePartitioningWorkerStepBuilderFactory; 026import org.springframework.batch.repeat.RepeatStatus; 027import org.springframework.batch.sample.remotepartitioning.BrokerConfiguration; 028import org.springframework.batch.sample.remotepartitioning.DataSourceConfiguration; 029import org.springframework.beans.factory.annotation.Value; 030import org.springframework.context.annotation.Bean; 031import org.springframework.context.annotation.Configuration; 032import org.springframework.context.annotation.Import; 033import org.springframework.integration.channel.DirectChannel; 034import org.springframework.integration.dsl.IntegrationFlow; 035import org.springframework.integration.dsl.IntegrationFlows; 036import org.springframework.integration.jms.dsl.Jms; 037 038/** 039 * This configuration class is for the worker side of the remote partitioning sample. 040 * Each worker will process a partition sent by the master step. 041 * 042 * @author Mahmoud Ben Hassine 043 */ 044@Configuration 045@EnableBatchProcessing 046@EnableBatchIntegration 047@Import(value = {DataSourceConfiguration.class, BrokerConfiguration.class}) 048public class WorkerConfiguration { 049 050 private final RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory; 051 052 053 public WorkerConfiguration(RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory) { 054 this.workerStepBuilderFactory = workerStepBuilderFactory; 055 } 056 057 /* 058 * Configure inbound flow (requests coming from the master) 059 */ 060 @Bean 061 public DirectChannel requests() { 062 return new DirectChannel(); 063 } 064 065 @Bean 066 public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) { 067 return IntegrationFlows 068 .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests")) 069 .channel(requests()) 070 .get(); 071 } 072 073 /* 074 * Configure the worker step 075 */ 076 @Bean 077 public Step workerStep() { 078 return this.workerStepBuilderFactory.get("workerStep") 079 .inputChannel(requests()) 080 .tasklet(tasklet(null)) 081 .build(); 082 } 083 084 @Bean 085 @StepScope 086 public Tasklet tasklet(@Value("#{stepExecutionContext['partition']}") String partition) { 087 return (contribution, chunkContext) -> { 088 System.out.println("processing " + partition); 089 return RepeatStatus.FINISHED; 090 }; 091 } 092 093}