我是一个flink的新手,我有五个无限的Kafka源,有不同的数据模式。我想减少消息,并获得最新的一个,然后外连接所有的Kafka源与相同的关键字。所以我使用union将它们合并组合在一起,然后使用 ProcessWindowFunction 将它们转换为一个大对象,然后发送到下游。但是在union之后总是有很多数据丢失,我认为丢失数据是因为晚了。
class CommonObj {
var id: Long = 0
var entityType: String? = null
var timestamp: Long = System.currentTimeMillis()
val eventMetas: MutableList<EventMeta> = mutableListOf()
var kafkaStreamValue1: KafkaStreamObj1? = null
var kafkaStreamValue2: KafkaStreamObj2? = null
var kafkaStreamValue3: KafkaStreamObj3? = null
var kafkaStreamValue4: KafkaStreamObj4? = null
fun buildOutPutObjt(): OutPutObj = ....
}
字符串
这是一个Kafka的源代码。其他的Kafka源代码逻辑非常相似。
val watermarkStrategy = WatermarkStrategy.forBoundedOutOfOrderness<OfferUpdated>(Duration.ofSeconds(5))
.withIdleness(Duration.ofMinutes(1))
val sourceStream1 = env.fromSource(
getKafkaStream1(params),
watermarkStrategy,
"Kafka Source 1"
)
val kafkaSource1 = sourceStream1
.filter { it != null }
.map {
EventObj<KafkaStreamObj1>(
it.id.toString() + it.entity, //this is key
it, //obj
it.sequence, //timestamp
mutableListOf(EventMeta(it.transactionId, it.type, it.sequence, it.changed))
)
}
.returns(TypeInformation.of(object : TypeHint<EventObj<KafkaStreamObj1>>() {}))
.keyBy {
it.key }
.window(TumblingEventTimeWindows.of(Time.milliseconds(30000)))
.reduce { v1, v2 ->
if (v1.obj.timestamp > v2.obj.timestamp) {
v1
} else {
v2
}
}
.map {
val commonObj = CommonObj()
commonObj.id = it.obj.id
commonObj.entityType = it.obj.entityType
commonObj.timestamp = System.currentTimeMillis()
commonObj.kafkaStreamValue1 = it.obj.entity // For other kafka stream, it will use kafkaStreamValue2 or kafkaStreamValue3
commonObj
}
.returns(TypeInformation.of(object : TypeHint<CommonObj>() {}))
型
这个联盟的代码
kafkaStream1.union(kafkaStream2,kafkaStream3,kafkaStream4,kafkaStream5)
.keyBy(keySelector)
.window(TumblingEventTimeWindows.of(Time.milliseconds(30000)))
.process(EventProcessFunction(params))
.sinkTo(kafkaSink())
型
此EventProcessFunction
class EventProcessFunction(private val params: ParameterTool) : ProcessWindowFunction<CommonObj, OutPutObj, String, TimeWindow>() {
override fun open(parameters: Configuration?) {
super.open(parameters)
//open data source
}
override fun close() {
//close data source
}
override fun process(
key: String,
context: Context,
elements: MutableIterable<CommonObj>,
out: Collector<OutPutObj>
) {
val commonObj = CommonObj()
//LTS: latest time stamp
var kafkaStreamObj1LTS: Long = Long.MIN_VALUE
var kafkaStreamObj2LTS: Long = Long.MIN_VALUE
var kafkaStreamObj3LTS: Long = Long.MIN_VALUE
var kafkaStreamObj4LTS: Long = Long.MIN_VALUE
val id = elements.first().id
elements.forEach {
commonObj.id = it.id
commonObj.entityType = elements.first().entityType
commonObj.timestamp = System.currentTimeMillis()
if (it.id != id) {
log.error { "id not equal ele id: ${it.id}, first id $id" }
}
if (it.kafkaStreamObj1 != null) {
if (commonObj.kafkaStreamObj1 != null && kafkaStreamObj1LTS > it.timestamp) {
return@forEach
}
kafkaStreamObj1LTS = it.timestamp
commonObj.kafkaStreamObj1 = it.kafkaStreamObj1
} else if (it.kafkaStreamObj2 != null) {
if (commonObj.kafkaStreamObj2 != null && kafkaStreamObj2LTS > it.timestamp) {
return@forEach
}
kafkaStreamObj2LTS = it.timestamp
commonObj.kafkaStreamObj2 = it.kafkaStreamObj2
} else if (it.kafkaStreamObj3 != null) {
if (commonObj.kafkaStreamObj3 != null && kafkaStreamObj3LTS > it.timestamp) {
return@forEach
}
kafkaStreamObj3LTS = it.timestamp
commonObj.kafkaStreamObj3 = it.kafkaStreamObj3
} else if (it.kafkaStreamObj4 != null) {
if (commonObj.kafkaStreamObj4 != null && kafkaStreamObj4LTS > it.timestamp) {
return@forEach
}
kafkaStreamObj4LTS = it.timestamp
commonObj.kafkaStreamObj4 = it.kafkaStreamObj4
}
}
if (commonObj.kafkaStreamObj1 == null && commonObj.entityType == EntityType.V.name) {
val kafkaStreamObj1Db = kafkaStreamObj1Repository.findKafkaStreamObj1(commonObj.id, commonObj.entityType!!)
commonObj.kafkaStreamObj1 = kafkaStreamObj1Db
}
if (commonObj.kafkaStreamObj2 == null) {
val kafkaStreamObj2Db = kafkaStreamObj2Repository.findKafkaStreamObj2(commonObj.id, commonObj.entityType!!)
commonObj.kafkaStreamObj2 = kafkaStreamObj2Db
}
if (commonObj.kafkaStreamObj3 == null) {
val kafkaStreamObj3Db =
kafkaStreamObj3Repository.kafkaStreamObj3Repository(commonObj.id, commonObj.entityType!!)
commonObj.kafkaStreamObj3 = kafkaStreamObj3Db
}
if (commonObj.kafkaStreamObj4 == null) {
val kafkaStreamObj4Db =
kafkaStreamObj4Repository.kafkaStreamObj4Repository(commonObj.id, commonObj.entityType!!)
commonObj.kafkaStreamObj4 = kafkaStreamObj4Db
}
val outPutObj = commonObj.buildOutPutObjt()
out.collect(outPutObj)
}
}
型
我删除了一些敏感信息,为什么合并后消息会丢失?据我所知,合并水印会使用所有Kafka源中的最小值,应该不会丢失任何一个,可能会对一些更快的Kafka源产生反压。
我也试过TumblingProcessingTimeWindows,没有水印。但是当Kafka主题有滞后时,它会有很大的背压。然后检查点会超时。即使增加检查点超时,检查点也会开始显示很长时间(10- 30分钟)。根本原因应该是DB查询需要很长时间,合并后处理受阻。但是当Kafka主题没有滞后时,这是正常的。
1条答案
按热度按时间pbpqsu0x1#
你应该做两个改变...
1.切换到进程时间(相对于事件时间),因此使用
TumblingProcessingTimeWindows
而不是TumblingEventTimeWindows
。这应该可以避免任何延迟事件。1.在第二次(联合后)窗口操作中使用聚合函数,这样Flink就不必将状态中的每条记录保存30秒。这应该会减少状态的大小。