scala—如何使用kafka输入中的字段对spark数据集进行分区

kh212irz  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(372)

我有一个spark应用程序,可以读取Kafka主题的数据。我的密码是-

val df = spark.readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "server1")
 .option("subscribe", "topic1")
 .load()
df.printSchema()

这就产生了

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)

我需要
查询 key 以及 value 将二进制密钥反序列化为字符串
将二进制值反序列化为对象(我有一个反序列化程序)
根据步骤2中的键字符串重新划分输入元组/数据集
下面的代码是正确的方法吗?

val dataset = spark.readStream
     .format("kafka")
     .option("kafka.bootstrap.servers", "server1")
     .option("subscribe", "topic1")
     .load()
     .select($"key", $"value")
     .selectExpr("CAST(key as STRING) AS partitionId", "value AS value")
     .repartition(col("partitionId"))
     .mapPartitions{row => createOutputMessages()}

它运行得很好,但是我无法知道分区是否正确。

hts6caw3

hts6caw31#

您可以了解如何使用内置函数对数据集进行分区 spark_partition_id() . 因此,类似这样的内容将显示跨spark分区的密钥分布:

dataset.groupBy(spark_partition_id,$"partitionId").count.show(false)

相关问题