001/*
002 * Copyright 2012-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.boot.docs.kafka;
018
019import org.apache.kafka.common.serialization.Serdes;
020import org.apache.kafka.streams.KeyValue;
021import org.apache.kafka.streams.StreamsBuilder;
022import org.apache.kafka.streams.kstream.KStream;
023import org.apache.kafka.streams.kstream.Produced;
024
025import org.springframework.context.annotation.Bean;
026import org.springframework.context.annotation.Configuration;
027import org.springframework.kafka.annotation.EnableKafkaStreams;
028import org.springframework.kafka.support.serializer.JsonSerde;
029
030/**
031 * Example to show usage of {@link StreamsBuilder}.
032 *
033 * @author Stephane Nicoll
034 */
035public class KafkaStreamsBeanExample {
036
037        // tag::configuration[]
038        @Configuration
039        @EnableKafkaStreams
040        static class KafkaStreamsExampleConfiguration {
041
042                @Bean
043                public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
044                        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
045                        stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",
046                                        Produced.with(Serdes.Integer(), new JsonSerde<>()));
047                        return stream;
048                }
049
050        }
051        // end::configuration[]
052
053}