我有一个spark应用程序,可以读取Kafka主题的数据。我的密码是-
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("subscribe", "topic1")
.load()
df.printSchema()
这就产生了
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
我需要
查询 key
以及 value
将二进制密钥反序列化为字符串
将二进制值反序列化为对象(我有一个反序列化程序)
根据步骤2中的键字符串重新划分输入元组/数据集
下面的代码是正确的方法吗?
val dataset = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server1")
.option("subscribe", "topic1")
.load()
.select($"key", $"value")
.selectExpr("CAST(key as STRING) AS partitionId", "value AS value")
.repartition(col("partitionId"))
.mapPartitions{row => createOutputMessages()}
它运行得很好,但是我无法知道分区是否正确。
1条答案
按热度按时间hts6caw31#
您可以了解如何使用内置函数对数据集进行分区
spark_partition_id()
. 因此,类似这样的内容将显示跨spark分区的密钥分布: