001/*
002 * Copyright 2012-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.boot.kafka;
018
019import java.util.List;
020import java.util.Map;
021
022import org.apache.kafka.clients.CommonClientConfigs;
023import org.apache.kafka.common.metrics.KafkaMetric;
024import org.apache.kafka.common.metrics.MetricsReporter;
025
026import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
027import org.springframework.context.annotation.Bean;
028import org.springframework.context.annotation.Configuration;
029import org.springframework.kafka.core.ConsumerFactory;
030import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
031import org.springframework.kafka.core.DefaultKafkaProducerFactory;
032import org.springframework.kafka.core.ProducerFactory;
033
034/**
035 * Example custom kafka configuration beans used when the user wants to apply different
036 * common properties to the producer and consumer.
037 *
038 * @author Gary Russell
039 * @since 1.5
040 */
041public class KafkaSpecialProducerConsumerConfigExample {
042
043        // tag::configuration[]
044        @Configuration
045        public static class CustomKafkaBeans {
046
047                /**
048                 * Customized ProducerFactory bean.
049                 * @param properties the kafka properties.
050                 * @return the bean.
051                 */
052                @Bean
053                public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
054                        Map<String, Object> producerProperties = properties.buildProducerProperties();
055                        producerProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
056                                        MyProducerMetricsReporter.class);
057                        return new DefaultKafkaProducerFactory<Object, Object>(producerProperties);
058                }
059
060                /**
061                 * Customized ConsumerFactory bean.
062                 * @param properties the kafka properties.
063                 * @return the bean.
064                 */
065                @Bean
066                public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
067                        Map<String, Object> consumerProperties = properties.buildConsumerProperties();
068                        consumerProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
069                                        MyConsumerMetricsReporter.class);
070                        return new DefaultKafkaConsumerFactory<Object, Object>(consumerProperties);
071                }
072
073        }
074        // end::configuration[]
075
076        public static class MyConsumerMetricsReporter implements MetricsReporter {
077
078                @Override
079                public void configure(Map<String, ?> configs) {
080                }
081
082                @Override
083                public void init(List<KafkaMetric> metrics) {
084                }
085
086                @Override
087                public void metricChange(KafkaMetric metric) {
088                }
089
090                @Override
091                public void metricRemoval(KafkaMetric metric) {
092                }
093
094                @Override
095                public void close() {
096                }
097
098        }
099
100        public static class MyProducerMetricsReporter implements MetricsReporter {
101
102                @Override
103                public void configure(Map<String, ?> configs) {
104                }
105
106                @Override
107                public void init(List<KafkaMetric> metrics) {
108                }
109
110                @Override
111                public void metricChange(KafkaMetric metric) {
112                }
113
114                @Override
115                public void metricRemoval(KafkaMetric metric) {
116                }
117
118                @Override
119                public void close() {
120                }
121
122        }
123
124}