kafka生产者将消息接收到kafka主题,但在不同的分区上

wvt8vs2t  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(393)

现在,我的flink代码正在处理一个文件,并用1个分区将kafka主题的数据下沉。
现在我有一个2分区的主题,我希望flink代码使用defaultpartitioner接收这2个分区上的数据。
你能帮我一下吗。
以下是我当前代码的代码片段:

DataStream<String> speStream = inputStream..map(new MapFunction<Row, String>(){....}
Properties props = Producer.getProducerConfig(propertiesFilePath);
speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE));
zynd9foi

zynd9foi1#

通过将flinkproducer改为

speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName,new SimpleStringSchema(), 
 props));

之前我用的是

speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName,
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props,
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE));
dkqlctbz

dkqlctbz2#

Flink 版本 1.11 (我正在用java)的 SimpleStringSchema 需要一个 Package 纸。 KeyedSerializationSchemaWrapper )它也被@ankit使用,但在下面我将从建议的解决方案中删除 constructor 相同原因导致的相关错误。

FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(
                        topic_name, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), 
                        properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

错误:

The constructor FlinkKafkaProducer<String>(String, SimpleStringSchema, Properties, FlinkKafkaProducer.Semantic) is undefined

相关问题