为什么Kafka流创建聚合和连接的主题

06odsfpq  于 5个月前  发布在  Apache
关注(0)|答案(1)|浏览(72)

我最近创建了我的第一个Kafka流应用程序用于学习。我使用了spring-cloud-stream-Kafka-binding。这是一个简单的电子商务系统,在其中我阅读一个名为产品的主题,每当有新的产品库存进来时,它都有所有的产品条目。我正在汇总数量以获得产品的总数量。
我有两个选择-
1.将聚合详细信息(KTable)发送到另一个名为aggregated-products的Kafka主题
1.具体化聚合数据
我选择了第二个选项,我发现应用程序自己创建了一个Kafka主题,当我使用来自该主题的消息时,就会得到聚合的消息。

.peek((k,v) -> LOGGER.info("Received product with key [{}] and value [{}]",k, v))
            .groupByKey()
            .aggregate(Product::new,
                    (key, value, aggregate) -> aggregate.process(value),
                    Materialized.<String, Product, KeyValueStore<Bytes, byte[]>>as(PRODUCT_AGGREGATE_STATE_STORE).withValueSerde(productEventSerde)//.withKeySerde(keySerde)
                    // because keySerde is configured in application.properties
            );

字符串
使用InteractiveQueryService,我可以在应用程序中访问此状态存储,以了解产品的可用总量。
我有几个问题-
1.为什么应用程序创建了一个新的Kafka主题?
1.如果答案是“存储聚合数据”,那么这与选项1(我可以自己发送聚合数据)有何不同?

  1. RocksDB在哪里进入画面?
    我的应用程序的代码(它比我在这里解释的更多)可以从这个链接访问-
    https://github.com/prashantbhardwaj/kafka-stream-example/blob/master/src/main/java/com/appcloid/kafka/stream/example/config/SpringStreamBinderTopologyBuilderConfig.java
whhtz7ly

whhtz7ly1#

内部主题被称为更改日志主题,用于容错。聚合的状态以更改日志主题的形式存储在本地磁盘上(使用RocksDB)和Kafka代理上(本质上是“备份”)。如果任务移动到新机器或本地状态因其他原因丢失,Kafka Streams可以通过阅读来自更新日志主题的对原始状态的所有更改并将其应用于新的RocksDB示例来恢复本地状态。恢复完成后(整个更新日志主题已处理),新机器上应该有相同的状态,新机器可以在旧机器停止的地方继续加工。这里面有很多复杂的细节(例如,在默认设置中,当故障发生时,可能发生针对同一输入记录更新两次状态)。
参见https://developer.confluent.io/learn-kafka/kafka-streams/stateful-fault-tolerance/

相关问题