来自单个主主题的多个流

2022-09-04 01:31:08

如何从单个主主题创建多个流?当我做这样的事情时:

KStreamBuilder builder = new KStreamBuilder();

builder.stream(Serdes.String(), Serdes.String(), "master")
            /* Filtering logic */
            .to(Serdes.String(), Serdes.String(), "output1");

builder.stream(Serdes.String(), Serdes.String(), "master")
            /* Filtering logic */
            .to(Serdes.String(), Serdes.String(), "output2");

KafkaStreams streams = new KafkaStreams(builder, /* config */);
streams.start();

我收到以下错误:

org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic master has already been registered by another source.
    at org.apache.kafka.streams.processor.TopologyBuilder.addSource(TopologyBuilder.java:347)
    at org.apache.kafka.streams.kstream.KStreamBuilder.stream(KStreamBuilder.java:92)

我是否需要为来自“master”的每个流创建另一个 KafkaStreams 实例?


答案 1

您可以创建可以重复使用的 KStream:

KStream<String, String> inputStream = builder.stream(Serdes.String(), Serdes.String(), "master");

然后你可以重用它:

inputStream.filter(..logic1)
        .to(Serdes.String(), Serdes.String(), "output1");
inputStream.filter(..logic2)
        .to(Serdes.String(), Serdes.String(), "output2");

KafkaStreams streams = new KafkaStreams(builder, /* config */);
streams.start();

答案 2

您也可以使用分支功能来实现此目的

KStream<String, String> inputStream = builder.stream(Serdes.String(), Serdes.String(), "master");

然后使用分支将创建结果集数组

final KStream<String, String>[] splitStream = inputStream.branch(new Predicate<String, String>() {
                        @Override
                        public boolean test(String key, String value) {
                            //write logic to filter 
                            return true;
                        },
                   new Predicate<String, String>() {
                        @Override
                        public boolean test(String key, String value) {
                            //write logic to filter 
                            return true;
                        },....
                    //you can write multiple predicate to filter inputStream 
                    });

最后,在分支完成后,splitStream[0]将包含第一个滤波器输出,而splitStream[1]将包含第二个滤波器输出,依此类推。要将其发送到任何输出主题,您可以使用下面的代码。

splitStream[0].to("out_topic1");
splitStream[1].to("out_topic2");

推荐