Flink:为什么在联合多个Kafka源时会丢失数据?

tuwxkamq  于 6个月前  发布在  Apache
关注(0)|答案(1)|浏览(67)

我是一个flink的新手,我有五个无限的Kafka源,有不同的数据模式。我想减少消息,并获得最新的一个,然后外连接所有的Kafka源与相同的关键字。所以我使用union将它们合并组合在一起,然后使用 ProcessWindowFunction 将它们转换为一个大对象,然后发送到下游。但是在union之后总是有很多数据丢失,我认为丢失数据是因为晚了。

class CommonObj {

            var id: Long = 0
            var entityType: String? = null
            var timestamp: Long = System.currentTimeMillis()
            val eventMetas: MutableList<EventMeta> = mutableListOf()

            var kafkaStreamValue1: KafkaStreamObj1? = null
            var kafkaStreamValue2: KafkaStreamObj2? = null
            var kafkaStreamValue3: KafkaStreamObj3? = null
            var kafkaStreamValue4: KafkaStreamObj4? = null

            fun buildOutPutObjt(): OutPutObj = ....
        }

字符串
这是一个Kafka的源代码。其他的Kafka源代码逻辑非常相似。

val watermarkStrategy = WatermarkStrategy.forBoundedOutOfOrderness<OfferUpdated>(Duration.ofSeconds(5))
            .withIdleness(Duration.ofMinutes(1))

        val sourceStream1 = env.fromSource(
            getKafkaStream1(params),
            watermarkStrategy,
            "Kafka Source 1"
        )

        val kafkaSource1 = sourceStream1
            .filter { it != null }
            .map {
                EventObj<KafkaStreamObj1>(
                    it.id.toString() + it.entity, //this is key
                    it,                           //obj
                    it.sequence,                  //timestamp
                    mutableListOf(EventMeta(it.transactionId, it.type, it.sequence, it.changed))
                )
            }
            .returns(TypeInformation.of(object : TypeHint<EventObj<KafkaStreamObj1>>() {}))
            .keyBy {
                it.key }
            .window(TumblingEventTimeWindows.of(Time.milliseconds(30000)))
            .reduce { v1, v2 ->
                if (v1.obj.timestamp > v2.obj.timestamp) {
                    v1
                } else {
                    v2
                }
            }
            .map {
                val commonObj = CommonObj()
                commonObj.id = it.obj.id
                commonObj.entityType = it.obj.entityType
                commonObj.timestamp = System.currentTimeMillis()
                commonObj.kafkaStreamValue1 = it.obj.entity // For other kafka stream, it will use kafkaStreamValue2 or kafkaStreamValue3
                commonObj
            }
            .returns(TypeInformation.of(object : TypeHint<CommonObj>() {}))


这个联盟的代码

kafkaStream1.union(kafkaStream2,kafkaStream3,kafkaStream4,kafkaStream5)
            .keyBy(keySelector)
            .window(TumblingEventTimeWindows.of(Time.milliseconds(30000)))
            .process(EventProcessFunction(params))
            .sinkTo(kafkaSink())


此EventProcessFunction

class EventProcessFunction(private val params: ParameterTool) : ProcessWindowFunction<CommonObj, OutPutObj, String, TimeWindow>() {
    override fun open(parameters: Configuration?) {
        super.open(parameters)
        //open data source
    }

    override fun close() {
        //close data source
    }

    override fun process(
        key: String,
        context: Context,
        elements: MutableIterable<CommonObj>,
        out: Collector<OutPutObj>
    ) {
        val commonObj = CommonObj()
        //LTS: latest time stamp
        var kafkaStreamObj1LTS: Long = Long.MIN_VALUE
        var kafkaStreamObj2LTS: Long = Long.MIN_VALUE
        var kafkaStreamObj3LTS: Long = Long.MIN_VALUE
        var kafkaStreamObj4LTS: Long = Long.MIN_VALUE
        val id = elements.first().id

        elements.forEach {
            commonObj.id = it.id
            commonObj.entityType = elements.first().entityType
            commonObj.timestamp = System.currentTimeMillis()
            if (it.id != id) {
                log.error { "id not equal ele id: ${it.id}, first id $id" }
            }

            if (it.kafkaStreamObj1 != null) {
                if (commonObj.kafkaStreamObj1 != null && kafkaStreamObj1LTS > it.timestamp) {
                    return@forEach
                }
                kafkaStreamObj1LTS = it.timestamp
                commonObj.kafkaStreamObj1 = it.kafkaStreamObj1
            } else if (it.kafkaStreamObj2 != null) {
                if (commonObj.kafkaStreamObj2 != null && kafkaStreamObj2LTS > it.timestamp) {
                    return@forEach
                }
                kafkaStreamObj2LTS = it.timestamp
                commonObj.kafkaStreamObj2 = it.kafkaStreamObj2
            } else if (it.kafkaStreamObj3 != null) {
                if (commonObj.kafkaStreamObj3 != null && kafkaStreamObj3LTS > it.timestamp) {
                    return@forEach
                }
                kafkaStreamObj3LTS = it.timestamp
                commonObj.kafkaStreamObj3 = it.kafkaStreamObj3
            } else if (it.kafkaStreamObj4 != null) {
                if (commonObj.kafkaStreamObj4 != null && kafkaStreamObj4LTS > it.timestamp) {
                    return@forEach
                }
                kafkaStreamObj4LTS = it.timestamp
                commonObj.kafkaStreamObj4 = it.kafkaStreamObj4
            }
        }

        if (commonObj.kafkaStreamObj1 == null && commonObj.entityType == EntityType.V.name) {
            val kafkaStreamObj1Db = kafkaStreamObj1Repository.findKafkaStreamObj1(commonObj.id, commonObj.entityType!!)
            commonObj.kafkaStreamObj1 = kafkaStreamObj1Db
        }
        if (commonObj.kafkaStreamObj2 == null) {
            val kafkaStreamObj2Db = kafkaStreamObj2Repository.findKafkaStreamObj2(commonObj.id, commonObj.entityType!!)
            commonObj.kafkaStreamObj2 = kafkaStreamObj2Db
        }
        if (commonObj.kafkaStreamObj3 == null) {
            val kafkaStreamObj3Db =
                kafkaStreamObj3Repository.kafkaStreamObj3Repository(commonObj.id, commonObj.entityType!!)
            commonObj.kafkaStreamObj3 = kafkaStreamObj3Db
        }
        if (commonObj.kafkaStreamObj4 == null) {
            val kafkaStreamObj4Db =
                kafkaStreamObj4Repository.kafkaStreamObj4Repository(commonObj.id, commonObj.entityType!!)
            commonObj.kafkaStreamObj4 = kafkaStreamObj4Db
        }

        val outPutObj = commonObj.buildOutPutObjt()
        out.collect(outPutObj)
    }

}


我删除了一些敏感信息,为什么合并后消息会丢失?据我所知,合并水印会使用所有Kafka源中的最小值,应该不会丢失任何一个,可能会对一些更快的Kafka源产生反压。
我也试过TumblingProcessingTimeWindows,没有水印。但是当Kafka主题有滞后时,它会有很大的背压。然后检查点会超时。即使增加检查点超时,检查点也会开始显示很长时间(10- 30分钟)。根本原因应该是DB查询需要很长时间,合并后处理受阻。但是当Kafka主题没有滞后时,这是正常的。

pbpqsu0x

pbpqsu0x1#

你应该做两个改变...
1.切换到进程时间(相对于事件时间),因此使用TumblingProcessingTimeWindows而不是TumblingEventTimeWindows。这应该可以避免任何延迟事件。
1.在第二次(联合后)窗口操作中使用聚合函数,这样Flink就不必将状态中的每条记录保存30秒。这应该会减少状态的大小。

相关问题