我的应用程序需要使用Kafka流在两个Kafka主题之间执行连接。我们曾经将一个建模为KStream(左)和一个KTable(右)并执行leftJoin,但我们发现很多次连接导致右值为空,这是不希望的行为。对于我们的用例来说,两个流上的窗口连接也有问题。所以现在我们将两者建模为KTable并执行内部连接,然后使用该流来生成我们的业务逻辑。但出乎意料的是,我发现在每个输入主题上使用相同的键触发1条消息实际上会导致2个连接发生,导致结果流上有2条相同的消息。
我使用TestTopologyDriver编写了一个单元测试,并且得到了正确的行为(每对1条结果消息),但是当使用TestCompanion和测试代理运行时,或者实际上运行应用程序时,我得到了重复的结果。
我们的流应用程序在Quarkus 2.16.x和Streams 3.3.2中,配置设置为
kafka-streams.cache.max.bytes.buffering=10240
kafka-streams.commit.interval.ms=500
kafka-streams.metadata.max.age.ms=500
kafka-streams.auto.offset.reset=earliest
kafka-streams.consumer.heartbeat.interval.ms=200
字符串
下面描述的拓扑(忽略节点编号从41开始,下面是自封装的)。子拓扑1允许1个输入消息到达2个输入主题,以便于测试。子拓扑2使它们都是KTable,然后进行内部连接。实际上,我注意到外部连接也有相同的重复行为(一旦我过滤掉空的左/右值)
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000041 (topics: [trigger])
--> KSTREAM-FILTER-0000000044, KSTREAM-FILTER-0000000042
Processor: KSTREAM-FILTER-0000000044 (stores: [])
--> KSTREAM-MAPVALUES-0000000045
<-- KSTREAM-SOURCE-0000000041
Processor: KSTREAM-FILTER-0000000042 (stores: [])
--> KSTREAM-SINK-0000000043
<-- KSTREAM-SOURCE-0000000041
Processor: KSTREAM-MAPVALUES-0000000045 (stores: [])
--> KSTREAM-SINK-0000000046
<-- KSTREAM-FILTER-0000000044
Sink: KSTREAM-SINK-0000000043 (topic: value)
<-- KSTREAM-FILTER-0000000042
Sink: KSTREAM-SINK-0000000046 (topic: state)
<-- KSTREAM-MAPVALUES-0000000045
Sub-topology: 2
Source: KSTREAM-SOURCE-0000000047 (topics: [value])
--> KSTREAM-TOTABLE-0000000048
Source: KSTREAM-SOURCE-0000000051 (topics: [state])
--> KTABLE-SOURCE-0000000052
Processor: KSTREAM-TOTABLE-0000000048 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000049])
--> KTABLE-JOINOTHER-0000000055
<-- KSTREAM-SOURCE-0000000047
Processor: KTABLE-SOURCE-0000000052 (stores: [status-STATE-STORE-0000000050])
--> KTABLE-JOINTHIS-0000000054
<-- KSTREAM-SOURCE-0000000051
Processor: KTABLE-JOINOTHER-0000000055 (stores: [status-STATE-STORE-0000000050])
--> KTABLE-MERGE-0000000053
<-- KSTREAM-TOTABLE-0000000048
Processor: KTABLE-JOINTHIS-0000000054 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000049])
--> KTABLE-MERGE-0000000053
<-- KTABLE-SOURCE-0000000052
Processor: KTABLE-MERGE-0000000053 (stores: [])
--> KTABLE-TOSTREAM-0000000056
<-- KTABLE-JOINTHIS-0000000054, KTABLE-JOINOTHER-0000000055
Processor: KTABLE-TOSTREAM-0000000056 (stores: [])
--> KSTREAM-MAPVALUES-0000000057
<-- KTABLE-MERGE-0000000053
Processor: KSTREAM-MAPVALUES-0000000057 (stores: [])
--> KSTREAM-SINK-0000000058
<-- KTABLE-TOSTREAM-0000000056
Sink: KSTREAM-SINK-0000000058 (topic: joined)
<-- KSTREAM-MAPVALUES-0000000057
型
我看到一些帖子建议我需要在结果流上写一个去重处理器,但这似乎有点奇怪,因为我需要维护第三个存储?此外,我在文档中看不到为什么处理发生两次。如果它确实与此有关,在两个输入主题上具有相同键的消息的到达在时间上可能非常接近(即,在几毫秒内)
1条答案
按热度按时间xn1cxnb41#
但出乎意料的是,我发现在每个输入主题上使用相同的键触发1条消息实际上会导致2个连接发生,导致结果流上有2条相同的消息。
这是
join
操作符在Kafka流中的预期行为。使用join
操作符,当接收到新记录时,将触发join,而不管它是从哪一边接收的。如果这不是你想要的,你可以使用
leftJoin
。