kafka如何防止在加入多个流时使用旧记录

mbzjlibv  于 2021-06-26  发布在  Java
关注(0)|答案(0)|浏览(214)

我对Kafka还比较陌生,我想用一个唯一的密钥加入一堆流。我有两个不同的流,我从听两个不同的主题。我重新分区并通过相应的键将它们连接起来。然后,我听另一个主题c,并将之前连接的数据与新的streamc连接起来。总之

A-B=>AB
C->
AB-C>ABC

这工作,并加入数据完全如我所愿,我很高兴。出于某种原因,如果只有数据到达主题c,比如说,它将从状态存储中获取数据(我相信它会将以前的数据写入状态存储)。因此,它将完成与旧记录的连接。据我所知,kafka有自己的缓存机制和状态存储,所以它使它更有效,但在我的情况下,我的数据是相当动态和变化的,所以我不希望它使用旧记录加入。本质上,我想要的是,只有当所有这些主题都分别获得了一个新的有效负载,并且每个主题也完成了自己的连接时,才能连接这些数据。-在任何情况下都不使用任何以前的记录。
这是相关代码

protected Topology createStreamTopology(){

        StreamsBuilder builder = new StreamsBuilder();

        //Stream A
        KStream<String, A> streamA = builder.stream(
                aTopic,
                Consumed.with(Serdes.String(), aSerde)
        ).selectKey((k, v) -> A.getOrderId(v));

        //Stream B
        //Make a table of the salesforce order line item events
        KStream<String, B> streamB = builder.stream(
                bTopic,
                Consumed.with(Serdes.String(), bSerde)
        ).filter(
                (k1, v1) -> v1.getPayload().getPlatformC() != null
        ).selectKey(
                (k, v) -> v.getOrder()
        );

        //Join stream a and stream b
        KStream<String, AB> abData = streamA.join(
                streamB,
                joinedAB::new,
                JoinWindows.of(Duration.ofHours(2)),
                Joined.with(Serdes.String(), aSerde, bSerde)
        ).selectKey(
                (k, v) -> v.getPayload.getUniqueID());  

        // Stream c
        KStream<String, C>stream streamC=builder.stream(cTopic, Consumed.with(Serdes.String(), serde))
                .mapValues((k,v)->transformToC(v))
                .filter((key, value) -> Objects.nonNull(value))
                .selectKey((key, value) -> value.getID());

        //join stream c and joined abData
        KStream<String, abcData> abcData = streamC.join(
                abData,
                joinedABC::new,
                JoinWindows.of(Duration.ofHours(2)),
                Joined.with(Serdes.String(), cSerde, abSerde)
        );
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题