现在,我的flink代码正在处理一个文件,并用1个分区将kafka主题的数据下沉。
现在我有一个2分区的主题,我希望flink代码使用defaultpartitioner接收这2个分区上的数据。
你能帮我一下吗。
以下是我当前代码的代码片段:
DataStream<String> speStream = inputStream..map(new MapFunction<Row, String>(){....}
Properties props = Producer.getProducerConfig(propertiesFilePath);
speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE));
2条答案
按热度按时间zynd9foi1#
通过将flinkproducer改为
之前我用的是
dkqlctbz2#
在
Flink
版本1.11
(我正在用java)的SimpleStringSchema
需要一个 Package 纸。KeyedSerializationSchemaWrapper
)它也被@ankit使用,但在下面我将从建议的解决方案中删除constructor
相同原因导致的相关错误。错误: