001/* 002 * Copyright 2012-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.boot.kafka; 018 019import java.util.List; 020import java.util.Map; 021 022import org.apache.kafka.clients.CommonClientConfigs; 023import org.apache.kafka.common.metrics.KafkaMetric; 024import org.apache.kafka.common.metrics.MetricsReporter; 025 026import org.springframework.boot.autoconfigure.kafka.KafkaProperties; 027import org.springframework.context.annotation.Bean; 028import org.springframework.context.annotation.Configuration; 029import org.springframework.kafka.core.ConsumerFactory; 030import org.springframework.kafka.core.DefaultKafkaConsumerFactory; 031import org.springframework.kafka.core.DefaultKafkaProducerFactory; 032import org.springframework.kafka.core.ProducerFactory; 033 034/** 035 * Example custom kafka configuration beans used when the user wants to apply different 036 * common properties to the producer and consumer. 037 * 038 * @author Gary Russell 039 * @since 1.5 040 */ 041public class KafkaSpecialProducerConsumerConfigExample { 042 043 // tag::configuration[] 044 @Configuration 045 public static class CustomKafkaBeans { 046 047 /** 048 * Customized ProducerFactory bean. 049 * @param properties the kafka properties. 050 * @return the bean. 051 */ 052 @Bean 053 public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) { 054 Map<String, Object> producerProperties = properties.buildProducerProperties(); 055 producerProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, 056 MyProducerMetricsReporter.class); 057 return new DefaultKafkaProducerFactory<Object, Object>(producerProperties); 058 } 059 060 /** 061 * Customized ConsumerFactory bean. 062 * @param properties the kafka properties. 063 * @return the bean. 064 */ 065 @Bean 066 public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) { 067 Map<String, Object> consumerProperties = properties.buildConsumerProperties(); 068 consumerProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, 069 MyConsumerMetricsReporter.class); 070 return new DefaultKafkaConsumerFactory<Object, Object>(consumerProperties); 071 } 072 073 } 074 // end::configuration[] 075 076 public static class MyConsumerMetricsReporter implements MetricsReporter { 077 078 @Override 079 public void configure(Map<String, ?> configs) { 080 } 081 082 @Override 083 public void init(List<KafkaMetric> metrics) { 084 } 085 086 @Override 087 public void metricChange(KafkaMetric metric) { 088 } 089 090 @Override 091 public void metricRemoval(KafkaMetric metric) { 092 } 093 094 @Override 095 public void close() { 096 } 097 098 } 099 100 public static class MyProducerMetricsReporter implements MetricsReporter { 101 102 @Override 103 public void configure(Map<String, ?> configs) { 104 } 105 106 @Override 107 public void init(List<KafkaMetric> metrics) { 108 } 109 110 @Override 111 public void metricChange(KafkaMetric metric) { 112 } 113 114 @Override 115 public void metricRemoval(KafkaMetric metric) { 116 } 117 118 @Override 119 public void close() { 120 } 121 122 } 123 124}