001/*
002 * Copyright 2012-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.boot.autoconfigure.kafka;
018
019import java.io.IOException;
020
021import org.springframework.beans.factory.ObjectProvider;
022import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
023import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
024import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
025import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
026import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
027import org.springframework.boot.context.properties.EnableConfigurationProperties;
028import org.springframework.context.annotation.Bean;
029import org.springframework.context.annotation.Configuration;
030import org.springframework.context.annotation.Import;
031import org.springframework.kafka.core.ConsumerFactory;
032import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
033import org.springframework.kafka.core.DefaultKafkaProducerFactory;
034import org.springframework.kafka.core.KafkaAdmin;
035import org.springframework.kafka.core.KafkaTemplate;
036import org.springframework.kafka.core.ProducerFactory;
037import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
038import org.springframework.kafka.support.LoggingProducerListener;
039import org.springframework.kafka.support.ProducerListener;
040import org.springframework.kafka.support.converter.RecordMessageConverter;
041import org.springframework.kafka.transaction.KafkaTransactionManager;
042
043/**
044 * {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
045 *
046 * @author Gary Russell
047 * @author Stephane Nicoll
048 * @author EddĂș MelĂ©ndez
049 * @author Nakul Mishra
050 * @since 1.5.0
051 */
052@Configuration
053@ConditionalOnClass(KafkaTemplate.class)
054@EnableConfigurationProperties(KafkaProperties.class)
055@Import({ KafkaAnnotationDrivenConfiguration.class,
056                KafkaStreamsAnnotationDrivenConfiguration.class })
057public class KafkaAutoConfiguration {
058
059        private final KafkaProperties properties;
060
061        private final RecordMessageConverter messageConverter;
062
063        public KafkaAutoConfiguration(KafkaProperties properties,
064                        ObjectProvider<RecordMessageConverter> messageConverter) {
065                this.properties = properties;
066                this.messageConverter = messageConverter.getIfUnique();
067        }
068
069        @Bean
070        @ConditionalOnMissingBean(KafkaTemplate.class)
071        public KafkaTemplate<?, ?> kafkaTemplate(
072                        ProducerFactory<Object, Object> kafkaProducerFactory,
073                        ProducerListener<Object, Object> kafkaProducerListener) {
074                KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(
075                                kafkaProducerFactory);
076                if (this.messageConverter != null) {
077                        kafkaTemplate.setMessageConverter(this.messageConverter);
078                }
079                kafkaTemplate.setProducerListener(kafkaProducerListener);
080                kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
081                return kafkaTemplate;
082        }
083
084        @Bean
085        @ConditionalOnMissingBean(ProducerListener.class)
086        public ProducerListener<Object, Object> kafkaProducerListener() {
087                return new LoggingProducerListener<>();
088        }
089
090        @Bean
091        @ConditionalOnMissingBean(ConsumerFactory.class)
092        public ConsumerFactory<?, ?> kafkaConsumerFactory() {
093                return new DefaultKafkaConsumerFactory<>(
094                                this.properties.buildConsumerProperties());
095        }
096
097        @Bean
098        @ConditionalOnMissingBean(ProducerFactory.class)
099        public ProducerFactory<?, ?> kafkaProducerFactory() {
100                DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
101                                this.properties.buildProducerProperties());
102                String transactionIdPrefix = this.properties.getProducer()
103                                .getTransactionIdPrefix();
104                if (transactionIdPrefix != null) {
105                        factory.setTransactionIdPrefix(transactionIdPrefix);
106                }
107                return factory;
108        }
109
110        @Bean
111        @ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
112        @ConditionalOnMissingBean
113        public KafkaTransactionManager<?, ?> kafkaTransactionManager(
114                        ProducerFactory<?, ?> producerFactory) {
115                return new KafkaTransactionManager<>(producerFactory);
116        }
117
118        @Bean
119        @ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
120        @ConditionalOnMissingBean
121        public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
122                KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
123                Jaas jaasProperties = this.properties.getJaas();
124                if (jaasProperties.getControlFlag() != null) {
125                        jaas.setControlFlag(jaasProperties.getControlFlag());
126                }
127                if (jaasProperties.getLoginModule() != null) {
128                        jaas.setLoginModule(jaasProperties.getLoginModule());
129                }
130                jaas.setOptions(jaasProperties.getOptions());
131                return jaas;
132        }
133
134        @Bean
135        @ConditionalOnMissingBean
136        public KafkaAdmin kafkaAdmin() {
137                KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
138                kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
139                return kafkaAdmin;
140        }
141
142}