如何在java中使用结构化流对kafka的记录进行反序列化?

vaqhlq81  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(248)

我使用spark 2.1。
我试图读取Kafka记录使用Spark结构化流,反序列化他们和应用聚合之后。
我有以下代码:

SparkSession spark = SparkSession
        .builder()
        .appName("Statistics")
        .getOrCreate();

Dataset<Row> df = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", kafkaUri)
        .option("subscribe", "Statistics")
        .option("startingOffsets", "earliest")
        .load();

df.selectExpr("CAST(value AS STRING)")

我想要的是反序列化 value 在我的对象中输入字段而不是作为 String .
我有一个自定义反序列化程序。

public StatisticsRecord deserialize(String s, byte[] bytes)

如何在java中实现这一点?
我找到的唯一相关链接是https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html,但这是给scala的。

bnl4lu3b

bnl4lu3b1#

为json消息定义模式。

StructType schema = DataTypes.createStructType(new StructField[] { 
                DataTypes.createStructField("Id", DataTypes.IntegerType, false),
                DataTypes.createStructField("Name", DataTypes.StringType, false),
                DataTypes.createStructField("DOB", DataTypes.DateType, false) });

现在阅读下面的信息。messagedata是json消息的javabean。

Dataset<MessageData> df = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", kafkaUri)
            .option("subscribe", "Statistics")
            .option("startingOffsets", "earliest")
            .load()
            .selectExpr("CAST(value AS STRING) as message")
            .select(functions.from_json(functions.col("message"),schema).as("json"))
            .select("json.*")
            .as(Encoders.bean(MessageData.class));
bnl4lu3b

bnl4lu3b2#

如果您有一个用于数据的java自定义反序列化程序,请在从kafka获得的字节上使用它 load .

df.select("value")

那条线给你 Dataset<Row> 只有一列 value .
我专门使用spark api for scala,因此我将在scala中执行以下操作来处理“反序列化”情况:

import org.apache.spark.sql.Encoders
implicit val statisticsRecordEncoder = Encoders.product[StatisticsRecord]
val myDeserializerUDF = udf { bytes => deserialize("hello", bytes) }
df.select(myDeserializerUDF($"value") as "value_des")

那应该会给你想要的…在斯卡拉。将其转换为java是您的家庭练习:)
请注意,自定义对象必须有可用的编码器,否则sparksql将拒绝将其对象放入数据集中。

相关问题