001/*
002 * Copyright 2012-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.boot.autoconfigure.kafka;
018
019import java.time.Duration;
020
021import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
022import org.springframework.boot.context.properties.PropertyMapper;
023import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
024import org.springframework.kafka.core.ConsumerFactory;
025import org.springframework.kafka.core.KafkaTemplate;
026import org.springframework.kafka.listener.AfterRollbackProcessor;
027import org.springframework.kafka.listener.ContainerProperties;
028import org.springframework.kafka.listener.ErrorHandler;
029import org.springframework.kafka.support.converter.RecordMessageConverter;
030import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
031
032/**
033 * Configure {@link ConcurrentKafkaListenerContainerFactory} with sensible defaults.
034 *
035 * @author Gary Russell
036 * @author EddĂș MelĂ©ndez
037 * @since 1.5.0
038 */
039public class ConcurrentKafkaListenerContainerFactoryConfigurer {
040
041        private KafkaProperties properties;
042
043        private RecordMessageConverter messageConverter;
044
045        private KafkaTemplate<Object, Object> replyTemplate;
046
047        private KafkaAwareTransactionManager<Object, Object> transactionManager;
048
049        private ErrorHandler errorHandler;
050
051        private AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
052
053        /**
054         * Set the {@link KafkaProperties} to use.
055         * @param properties the properties
056         */
057        void setKafkaProperties(KafkaProperties properties) {
058                this.properties = properties;
059        }
060
061        /**
062         * Set the {@link RecordMessageConverter} to use.
063         * @param messageConverter the message converter
064         */
065        void setMessageConverter(RecordMessageConverter messageConverter) {
066                this.messageConverter = messageConverter;
067        }
068
069        /**
070         * Set the {@link KafkaTemplate} to use to send replies.
071         * @param replyTemplate the reply template
072         */
073        void setReplyTemplate(KafkaTemplate<Object, Object> replyTemplate) {
074                this.replyTemplate = replyTemplate;
075        }
076
077        /**
078         * Set the {@link KafkaAwareTransactionManager} to use.
079         * @param transactionManager the transaction manager
080         */
081        void setTransactionManager(
082                        KafkaAwareTransactionManager<Object, Object> transactionManager) {
083                this.transactionManager = transactionManager;
084        }
085
086        /**
087         * Set the {@link ErrorHandler} to use.
088         * @param errorHandler the error handler
089         */
090        void setErrorHandler(ErrorHandler errorHandler) {
091                this.errorHandler = errorHandler;
092        }
093
094        /**
095         * Set the {@link AfterRollbackProcessor} to use.
096         * @param afterRollbackProcessor the after rollback processor
097         */
098        void setAfterRollbackProcessor(
099                        AfterRollbackProcessor<Object, Object> afterRollbackProcessor) {
100                this.afterRollbackProcessor = afterRollbackProcessor;
101        }
102
103        /**
104         * Configure the specified Kafka listener container factory. The factory can be
105         * further tuned and default settings can be overridden.
106         * @param listenerFactory the {@link ConcurrentKafkaListenerContainerFactory} instance
107         * to configure
108         * @param consumerFactory the {@link ConsumerFactory} to use
109         */
110        public void configure(
111                        ConcurrentKafkaListenerContainerFactory<Object, Object> listenerFactory,
112                        ConsumerFactory<Object, Object> consumerFactory) {
113                listenerFactory.setConsumerFactory(consumerFactory);
114                configureListenerFactory(listenerFactory);
115                configureContainer(listenerFactory.getContainerProperties());
116        }
117
118        private void configureListenerFactory(
119                        ConcurrentKafkaListenerContainerFactory<Object, Object> factory) {
120                PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
121                Listener properties = this.properties.getListener();
122                map.from(properties::getConcurrency).to(factory::setConcurrency);
123                map.from(this.messageConverter).to(factory::setMessageConverter);
124                map.from(this.replyTemplate).to(factory::setReplyTemplate);
125                map.from(properties::getType).whenEqualTo(Listener.Type.BATCH)
126                                .toCall(() -> factory.setBatchListener(true));
127                map.from(this.errorHandler).to(factory::setErrorHandler);
128                map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor);
129        }
130
131        private void configureContainer(ContainerProperties container) {
132                PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
133                Listener properties = this.properties.getListener();
134                map.from(properties::getAckMode).to(container::setAckMode);
135                map.from(properties::getClientId).to(container::setClientId);
136                map.from(properties::getAckCount).to(container::setAckCount);
137                map.from(properties::getAckTime).as(Duration::toMillis).to(container::setAckTime);
138                map.from(properties::getPollTimeout).as(Duration::toMillis)
139                                .to(container::setPollTimeout);
140                map.from(properties::getNoPollThreshold).to(container::setNoPollThreshold);
141                map.from(properties::getIdleEventInterval).as(Duration::toMillis)
142                                .to(container::setIdleEventInterval);
143                map.from(properties::getMonitorInterval).as(Duration::getSeconds)
144                                .as(Number::intValue).to(container::setMonitorInterval);
145                map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig);
146                map.from(this.transactionManager).to(container::setTransactionManager);
147        }
148
149}