KafkaStreams Table -表内部连接产生重复

8xiog9wr  于 5个月前  发布在  Apache
关注(0)|答案(1)|浏览(66)

我的应用程序需要使用Kafka流在两个Kafka主题之间执行连接。我们曾经将一个建模为KStream(左)和一个KTable(右)并执行leftJoin,但我们发现很多次连接导致右值为空,这是不希望的行为。对于我们的用例来说,两个流上的窗口连接也有问题。所以现在我们将两者建模为KTable并执行内部连接,然后使用该流来生成我们的业务逻辑。但出乎意料的是,我发现在每个输入主题上使用相同的键触发1条消息实际上会导致2个连接发生,导致结果流上有2条相同的消息。
我使用TestTopologyDriver编写了一个单元测试,并且得到了正确的行为(每对1条结果消息),但是当使用TestCompanion和测试代理运行时,或者实际上运行应用程序时,我得到了重复的结果。
我们的流应用程序在Quarkus 2.16.x和Streams 3.3.2中,配置设置为

kafka-streams.cache.max.bytes.buffering=10240
kafka-streams.commit.interval.ms=500
kafka-streams.metadata.max.age.ms=500
kafka-streams.auto.offset.reset=earliest
kafka-streams.consumer.heartbeat.interval.ms=200

字符串
下面描述的拓扑(忽略节点编号从41开始,下面是自封装的)。子拓扑1允许1个输入消息到达2个输入主题,以便于测试。子拓扑2使它们都是KTable,然后进行内部连接。实际上,我注意到外部连接也有相同的重复行为(一旦我过滤掉空的左/右值)

Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000041 (topics: [trigger])
      --> KSTREAM-FILTER-0000000044, KSTREAM-FILTER-0000000042
    Processor: KSTREAM-FILTER-0000000044 (stores: [])
      --> KSTREAM-MAPVALUES-0000000045
      <-- KSTREAM-SOURCE-0000000041
    Processor: KSTREAM-FILTER-0000000042 (stores: [])
      --> KSTREAM-SINK-0000000043
      <-- KSTREAM-SOURCE-0000000041
    Processor: KSTREAM-MAPVALUES-0000000045 (stores: [])
      --> KSTREAM-SINK-0000000046
      <-- KSTREAM-FILTER-0000000044
    Sink: KSTREAM-SINK-0000000043 (topic: value)
      <-- KSTREAM-FILTER-0000000042
    Sink: KSTREAM-SINK-0000000046 (topic: state)
      <-- KSTREAM-MAPVALUES-0000000045

  Sub-topology: 2
    Source: KSTREAM-SOURCE-0000000047 (topics: [value])
      --> KSTREAM-TOTABLE-0000000048
    Source: KSTREAM-SOURCE-0000000051 (topics: [state])
      --> KTABLE-SOURCE-0000000052
    Processor: KSTREAM-TOTABLE-0000000048 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000049])
      --> KTABLE-JOINOTHER-0000000055
      <-- KSTREAM-SOURCE-0000000047
    Processor: KTABLE-SOURCE-0000000052 (stores: [status-STATE-STORE-0000000050])
      --> KTABLE-JOINTHIS-0000000054
      <-- KSTREAM-SOURCE-0000000051
    Processor: KTABLE-JOINOTHER-0000000055 (stores: [status-STATE-STORE-0000000050])
      --> KTABLE-MERGE-0000000053
      <-- KSTREAM-TOTABLE-0000000048
    Processor: KTABLE-JOINTHIS-0000000054 (stores: [KSTREAM-TOTABLE-STATE-STORE-0000000049])
      --> KTABLE-MERGE-0000000053
      <-- KTABLE-SOURCE-0000000052
    Processor: KTABLE-MERGE-0000000053 (stores: [])
      --> KTABLE-TOSTREAM-0000000056
      <-- KTABLE-JOINTHIS-0000000054, KTABLE-JOINOTHER-0000000055
    Processor: KTABLE-TOSTREAM-0000000056 (stores: [])
      --> KSTREAM-MAPVALUES-0000000057
      <-- KTABLE-MERGE-0000000053
    Processor: KSTREAM-MAPVALUES-0000000057 (stores: [])
      --> KSTREAM-SINK-0000000058
      <-- KTABLE-TOSTREAM-0000000056
    Sink: KSTREAM-SINK-0000000058 (topic: joined)
      <-- KSTREAM-MAPVALUES-0000000057


我看到一些帖子建议我需要在结果流上写一个去重处理器,但这似乎有点奇怪,因为我需要维护第三个存储?此外,我在文档中看不到为什么处理发生两次。如果它确实与此有关,在两个输入主题上具有相同键的消息的到达在时间上可能非常接近(即,在几毫秒内)

xn1cxnb4

xn1cxnb41#

但出乎意料的是,我发现在每个输入主题上使用相同的键触发1条消息实际上会导致2个连接发生,导致结果流上有2条相同的消息。
这是join操作符在Kafka流中的预期行为。使用join操作符,当接收到新记录时,将触发join,而不管它是从哪一边接收的。
如果这不是你想要的,你可以使用leftJoin

相关问题