001/* 002 * Copyright 2012-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.boot.autoconfigure.kafka; 018 019import java.time.Duration; 020 021import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener; 022import org.springframework.boot.context.properties.PropertyMapper; 023import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; 024import org.springframework.kafka.core.ConsumerFactory; 025import org.springframework.kafka.core.KafkaTemplate; 026import org.springframework.kafka.listener.AfterRollbackProcessor; 027import org.springframework.kafka.listener.ContainerProperties; 028import org.springframework.kafka.listener.ErrorHandler; 029import org.springframework.kafka.support.converter.RecordMessageConverter; 030import org.springframework.kafka.transaction.KafkaAwareTransactionManager; 031 032/** 033 * Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults. 034 * 035 * @author Gary Russell 036 * @author EddĂș MelĂ©ndez 037 * @since 1.5.0 038 */ 039public class ConcurrentKafkaListenerContainerFactoryConfigurer { 040 041 private KafkaProperties properties; 042 043 private RecordMessageConverter messageConverter; 044 045 private KafkaTemplate<Object, Object> replyTemplate; 046 047 private KafkaAwareTransactionManager<Object, Object> transactionManager; 048 049 private ErrorHandler errorHandler; 050 051 private AfterRollbackProcessor<Object, Object> afterRollbackProcessor; 052 053 /** 054 * Set the {@link KafkaProperties} to use. 055 * @param properties the properties 056 */ 057 void setKafkaProperties(KafkaProperties properties) { 058 this.properties = properties; 059 } 060 061 /** 062 * Set the {@link RecordMessageConverter} to use. 063 * @param messageConverter the message converter 064 */ 065 void setMessageConverter(RecordMessageConverter messageConverter) { 066 this.messageConverter = messageConverter; 067 } 068 069 /** 070 * Set the {@link KafkaTemplate} to use to send replies. 071 * @param replyTemplate the reply template 072 */ 073 void setReplyTemplate(KafkaTemplate<Object, Object> replyTemplate) { 074 this.replyTemplate = replyTemplate; 075 } 076 077 /** 078 * Set the {@link KafkaAwareTransactionManager} to use. 079 * @param transactionManager the transaction manager 080 */ 081 void setTransactionManager( 082 KafkaAwareTransactionManager<Object, Object> transactionManager) { 083 this.transactionManager = transactionManager; 084 } 085 086 /** 087 * Set the {@link ErrorHandler} to use. 088 * @param errorHandler the error handler 089 */ 090 void setErrorHandler(ErrorHandler errorHandler) { 091 this.errorHandler = errorHandler; 092 } 093 094 /** 095 * Set the {@link AfterRollbackProcessor} to use. 096 * @param afterRollbackProcessor the after rollback processor 097 */ 098 void setAfterRollbackProcessor( 099 AfterRollbackProcessor<Object, Object> afterRollbackProcessor) { 100 this.afterRollbackProcessor = afterRollbackProcessor; 101 } 102 103 /** 104 * Configure the specified Kafka listener container factory. The factory can be 105 * further tuned and default settings can be overridden. 106 * @param listenerFactory the {@link ConcurrentKafkaListenerContainerFactory} instance 107 * to configure 108 * @param consumerFactory the {@link ConsumerFactory} to use 109 */ 110 public void configure( 111 ConcurrentKafkaListenerContainerFactory<Object, Object> listenerFactory, 112 ConsumerFactory<Object, Object> consumerFactory) { 113 listenerFactory.setConsumerFactory(consumerFactory); 114 configureListenerFactory(listenerFactory); 115 configureContainer(listenerFactory.getContainerProperties()); 116 } 117 118 private void configureListenerFactory( 119 ConcurrentKafkaListenerContainerFactory<Object, Object> factory) { 120 PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); 121 Listener properties = this.properties.getListener(); 122 map.from(properties::getConcurrency).to(factory::setConcurrency); 123 map.from(this.messageConverter).to(factory::setMessageConverter); 124 map.from(this.replyTemplate).to(factory::setReplyTemplate); 125 map.from(properties::getType).whenEqualTo(Listener.Type.BATCH) 126 .toCall(() -> factory.setBatchListener(true)); 127 map.from(this.errorHandler).to(factory::setErrorHandler); 128 map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor); 129 } 130 131 private void configureContainer(ContainerProperties container) { 132 PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); 133 Listener properties = this.properties.getListener(); 134 map.from(properties::getAckMode).to(container::setAckMode); 135 map.from(properties::getClientId).to(container::setClientId); 136 map.from(properties::getAckCount).to(container::setAckCount); 137 map.from(properties::getAckTime).as(Duration::toMillis).to(container::setAckTime); 138 map.from(properties::getPollTimeout).as(Duration::toMillis) 139 .to(container::setPollTimeout); 140 map.from(properties::getNoPollThreshold).to(container::setNoPollThreshold); 141 map.from(properties::getIdleEventInterval).as(Duration::toMillis) 142 .to(container::setIdleEventInterval); 143 map.from(properties::getMonitorInterval).as(Duration::getSeconds) 144 .as(Number::intValue).to(container::setMonitorInterval); 145 map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig); 146 map.from(this.transactionManager).to(container::setTransactionManager); 147 } 148 149}