001/* 002 * Copyright 2012-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.boot.autoconfigure.kafka; 018 019import java.io.IOException; 020 021import org.springframework.beans.factory.ObjectProvider; 022import org.springframework.boot.autoconfigure.EnableAutoConfiguration; 023import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; 024import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; 025import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; 026import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas; 027import org.springframework.boot.context.properties.EnableConfigurationProperties; 028import org.springframework.context.annotation.Bean; 029import org.springframework.context.annotation.Configuration; 030import org.springframework.context.annotation.Import; 031import org.springframework.kafka.core.ConsumerFactory; 032import org.springframework.kafka.core.DefaultKafkaConsumerFactory; 033import org.springframework.kafka.core.DefaultKafkaProducerFactory; 034import org.springframework.kafka.core.KafkaAdmin; 035import org.springframework.kafka.core.KafkaTemplate; 036import org.springframework.kafka.core.ProducerFactory; 037import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; 038import org.springframework.kafka.support.LoggingProducerListener; 039import org.springframework.kafka.support.ProducerListener; 040import org.springframework.kafka.support.converter.RecordMessageConverter; 041import org.springframework.kafka.transaction.KafkaTransactionManager; 042 043/** 044 * {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka. 045 * 046 * @author Gary Russell 047 * @author Stephane Nicoll 048 * @author EddĂș MelĂ©ndez 049 * @author Nakul Mishra 050 * @since 1.5.0 051 */ 052@Configuration 053@ConditionalOnClass(KafkaTemplate.class) 054@EnableConfigurationProperties(KafkaProperties.class) 055@Import({ KafkaAnnotationDrivenConfiguration.class, 056 KafkaStreamsAnnotationDrivenConfiguration.class }) 057public class KafkaAutoConfiguration { 058 059 private final KafkaProperties properties; 060 061 private final RecordMessageConverter messageConverter; 062 063 public KafkaAutoConfiguration(KafkaProperties properties, 064 ObjectProvider<RecordMessageConverter> messageConverter) { 065 this.properties = properties; 066 this.messageConverter = messageConverter.getIfUnique(); 067 } 068 069 @Bean 070 @ConditionalOnMissingBean(KafkaTemplate.class) 071 public KafkaTemplate<?, ?> kafkaTemplate( 072 ProducerFactory<Object, Object> kafkaProducerFactory, 073 ProducerListener<Object, Object> kafkaProducerListener) { 074 KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>( 075 kafkaProducerFactory); 076 if (this.messageConverter != null) { 077 kafkaTemplate.setMessageConverter(this.messageConverter); 078 } 079 kafkaTemplate.setProducerListener(kafkaProducerListener); 080 kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic()); 081 return kafkaTemplate; 082 } 083 084 @Bean 085 @ConditionalOnMissingBean(ProducerListener.class) 086 public ProducerListener<Object, Object> kafkaProducerListener() { 087 return new LoggingProducerListener<>(); 088 } 089 090 @Bean 091 @ConditionalOnMissingBean(ConsumerFactory.class) 092 public ConsumerFactory<?, ?> kafkaConsumerFactory() { 093 return new DefaultKafkaConsumerFactory<>( 094 this.properties.buildConsumerProperties()); 095 } 096 097 @Bean 098 @ConditionalOnMissingBean(ProducerFactory.class) 099 public ProducerFactory<?, ?> kafkaProducerFactory() { 100 DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>( 101 this.properties.buildProducerProperties()); 102 String transactionIdPrefix = this.properties.getProducer() 103 .getTransactionIdPrefix(); 104 if (transactionIdPrefix != null) { 105 factory.setTransactionIdPrefix(transactionIdPrefix); 106 } 107 return factory; 108 } 109 110 @Bean 111 @ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix") 112 @ConditionalOnMissingBean 113 public KafkaTransactionManager<?, ?> kafkaTransactionManager( 114 ProducerFactory<?, ?> producerFactory) { 115 return new KafkaTransactionManager<>(producerFactory); 116 } 117 118 @Bean 119 @ConditionalOnProperty(name = "spring.kafka.jaas.enabled") 120 @ConditionalOnMissingBean 121 public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException { 122 KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer(); 123 Jaas jaasProperties = this.properties.getJaas(); 124 if (jaasProperties.getControlFlag() != null) { 125 jaas.setControlFlag(jaasProperties.getControlFlag()); 126 } 127 if (jaasProperties.getLoginModule() != null) { 128 jaas.setLoginModule(jaasProperties.getLoginModule()); 129 } 130 jaas.setOptions(jaasProperties.getOptions()); 131 return jaas; 132 } 133 134 @Bean 135 @ConditionalOnMissingBean 136 public KafkaAdmin kafkaAdmin() { 137 KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties()); 138 kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast()); 139 return kafkaAdmin; 140 } 141 142}