kafka byte[]输入在重写的flink abstractkafkadeserializationschema中转换为java字符串非常慢

g9icjywg  于 2021-06-26  发布在  Flink
关注(0)|答案(0)|浏览(195)

我有flink应用程序,它读取来自kafka的mysql cdc json消息。5个表json cdc字符串被读取和处理,我使用一个重写的abstractkafkadeserializationschema将kafka byte[]转换为我的定制bean对象。但我发现在5个表中,有2个表,它们的kafka输入byte[]比其他3个表转换为字符串花费了更多的时间,更糟糕的情况只是停留在那里几分钟甚至像永远一样,并且在源子任务的flink web ui中有反压。转换只是 String strValue = new String(valueByte) . 我也试过了 new String(valueByte, "UTF-8"), new String(valueByte, StandardCharsets.US_ASCII) ,没有区别。重写的方法只是:

deserialize(byte[] keyBytes, byte[] valueBytes, String topic, int partition, long offset) throws IOException

这个问题阻止了我发布应用程序一个星期,因为转换是如此简单,我找不到任何替代方法来做它,在stackoverflow搜索,并找到一些类似的投诉,但没有为我工作的解决方案。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题