001/*
002 * Copyright 2012-2016 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.boot.autoconfigure.kafka;
018
019import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
020import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
021import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
022import org.springframework.boot.context.properties.EnableConfigurationProperties;
023import org.springframework.context.annotation.Bean;
024import org.springframework.context.annotation.Configuration;
025import org.springframework.context.annotation.Import;
026import org.springframework.kafka.core.ConsumerFactory;
027import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
028import org.springframework.kafka.core.DefaultKafkaProducerFactory;
029import org.springframework.kafka.core.KafkaTemplate;
030import org.springframework.kafka.core.ProducerFactory;
031import org.springframework.kafka.support.LoggingProducerListener;
032import org.springframework.kafka.support.ProducerListener;
033
034/**
035 * {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
036 *
037 * @author Gary Russell
038 * @since 1.5.0
039 */
040@Configuration
041@ConditionalOnClass(KafkaTemplate.class)
042@EnableConfigurationProperties(KafkaProperties.class)
043@Import(KafkaAnnotationDrivenConfiguration.class)
044public class KafkaAutoConfiguration {
045
046        private final KafkaProperties properties;
047
048        public KafkaAutoConfiguration(KafkaProperties properties) {
049                this.properties = properties;
050        }
051
052        @Bean
053        @ConditionalOnMissingBean(KafkaTemplate.class)
054        public KafkaTemplate<?, ?> kafkaTemplate(
055                        ProducerFactory<Object, Object> kafkaProducerFactory,
056                        ProducerListener<Object, Object> kafkaProducerListener) {
057                KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<Object, Object>(
058                                kafkaProducerFactory);
059                kafkaTemplate.setProducerListener(kafkaProducerListener);
060                kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
061                return kafkaTemplate;
062        }
063
064        @Bean
065        @ConditionalOnMissingBean(ProducerListener.class)
066        public ProducerListener<Object, Object> kafkaProducerListener() {
067                return new LoggingProducerListener<Object, Object>();
068        }
069
070        @Bean
071        @ConditionalOnMissingBean(ConsumerFactory.class)
072        public ConsumerFactory<?, ?> kafkaConsumerFactory() {
073                return new DefaultKafkaConsumerFactory<Object, Object>(
074                                this.properties.buildConsumerProperties());
075        }
076
077        @Bean
078        @ConditionalOnMissingBean(ProducerFactory.class)
079        public ProducerFactory<?, ?> kafkaProducerFactory() {
080                return new DefaultKafkaProducerFactory<Object, Object>(
081                                this.properties.buildProducerProperties());
082        }
083
084}