001/* 002 * Copyright 2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.integration.chunk; 017 018import org.springframework.batch.core.step.item.SimpleChunkProcessor; 019import org.springframework.batch.item.ItemProcessor; 020import org.springframework.batch.item.ItemWriter; 021import org.springframework.batch.item.support.PassThroughItemProcessor; 022import org.springframework.integration.dsl.IntegrationFlow; 023import org.springframework.integration.dsl.IntegrationFlows; 024import org.springframework.messaging.MessageChannel; 025import org.springframework.util.Assert; 026 027/** 028 * Builder for a worker in a remote chunking setup. This builder: 029 * 030 * <ul> 031 * <li>creates a {@link ChunkProcessorChunkHandler} with the provided 032 * item processor and writer. If no item processor is provided, a 033 * {@link PassThroughItemProcessor} will be used</li> 034 * <li>creates an {@link IntegrationFlow} with the 035 * {@link ChunkProcessorChunkHandler} as a service activator which listens 036 * to incoming requests on <code>inputChannel</code> and sends replies 037 * on <code>outputChannel</code></li> 038 * </ul> 039 * 040 * @param <I> type of input items 041 * @param <O> type of output items 042 * 043 * @since 4.1 044 * @author Mahmoud Ben Hassine 045 */ 046public class RemoteChunkingWorkerBuilder<I, O> { 047 048 private static final String SERVICE_ACTIVATOR_METHOD_NAME = "handleChunk"; 049 050 private ItemProcessor<I, O> itemProcessor; 051 private ItemWriter<O> itemWriter; 052 private MessageChannel inputChannel; 053 private MessageChannel outputChannel; 054 055 /** 056 * Set the {@link ItemProcessor} to use to process items sent by the master 057 * step. 058 * 059 * @param itemProcessor to use 060 * @return this builder instance for fluent chaining 061 */ 062 public RemoteChunkingWorkerBuilder<I, O> itemProcessor(ItemProcessor<I, O> itemProcessor) { 063 Assert.notNull(itemProcessor, "itemProcessor must not be null"); 064 this.itemProcessor = itemProcessor; 065 return this; 066 } 067 068 /** 069 * Set the {@link ItemWriter} to use to write items sent by the master step. 070 * 071 * @param itemWriter to use 072 * @return this builder instance for fluent chaining 073 */ 074 public RemoteChunkingWorkerBuilder<I, O> itemWriter(ItemWriter<O> itemWriter) { 075 Assert.notNull(itemWriter, "itemWriter must not be null"); 076 this.itemWriter = itemWriter; 077 return this; 078 } 079 080 /** 081 * Set the input channel on which items sent by the master are received. 082 * 083 * @param inputChannel the input channel 084 * @return this builder instance for fluent chaining 085 */ 086 public RemoteChunkingWorkerBuilder<I, O> inputChannel(MessageChannel inputChannel) { 087 Assert.notNull(inputChannel, "inputChannel must not be null"); 088 this.inputChannel = inputChannel; 089 return this; 090 } 091 092 /** 093 * Set the output channel on which replies will be sent to the master step. 094 * 095 * @param outputChannel the output channel 096 * @return this builder instance for fluent chaining 097 */ 098 public RemoteChunkingWorkerBuilder<I, O> outputChannel(MessageChannel outputChannel) { 099 Assert.notNull(outputChannel, "outputChannel must not be null"); 100 this.outputChannel = outputChannel; 101 return this; 102 } 103 104 /** 105 * Create an {@link IntegrationFlow} with a {@link ChunkProcessorChunkHandler} 106 * configured as a service activator listening to the input channel and replying 107 * on the output channel. 108 * 109 * @return the integration flow 110 */ 111 public IntegrationFlow build() { 112 Assert.notNull(this.itemWriter, "An ItemWriter must be provided"); 113 Assert.notNull(this.inputChannel, "An InputChannel must be provided"); 114 Assert.notNull(this.outputChannel, "An OutputChannel must be provided"); 115 116 if(this.itemProcessor == null) { 117 this.itemProcessor = new PassThroughItemProcessor(); 118 } 119 SimpleChunkProcessor<I, O> chunkProcessor = new SimpleChunkProcessor<>(this.itemProcessor, this.itemWriter); 120 121 ChunkProcessorChunkHandler<I> chunkProcessorChunkHandler = new ChunkProcessorChunkHandler<>(); 122 chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor); 123 124 return IntegrationFlows 125 .from(this.inputChannel) 126 .handle(chunkProcessorChunkHandler, SERVICE_ACTIVATOR_METHOD_NAME) 127 .channel(this.outputChannel) 128 .get(); 129 } 130 131}