使用Kafka方法和Spark流对Kafka进行消费会产生不同的结果

q43xntqr  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(368)

我正在尝试使用spark流媒体来使用Kafka的一些数据。
我创造了两个工作岗位,
一个简单的Kafka作业,使用:

consumeFirstStringMessageFrom(topic)

这给出了主题的期望值。

{
  "data": {
    "type": "SA_LIST",
    "login": "username@mycompany.com",
    "updateDate": "2020-09-09T14:58:39.775Z",
    "content": [
      {
        "sku": "800633955",
        "status": "ACTIVE",
        "quantity": 1
      }
    ],
    "saCode": "E40056",
    "clientId": "30179801688090",
    "$setOnInsert": {
      "__v": 0
    }
  },
  "operation": "UPDATE",
  "type": "List"
}

spark流媒体作业:

val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaConfig.broker)
      .option("subscribe", kafkaConfig.topic)
      .option("startingOffsets", kafkaConfig.startingOffsets)
      .load()

 df.writeStream
      .outputMode(OutputMode.Append())
      .format("console")
      .option("truncate", false)
      .trigger(Trigger.ProcessingTime("2 seconds"))
      .start().awaitTermination()

它显示以下结果

{
  "key": "I4NTY4NV9MSVNUX1dJU0hMSVNUIg==",
  "value": "eyJkYXRhIjp7InR5cGUiOiJXSVNITElTVCIsImxvZ2luIjoiZHJlYW1lcjJAeW9wbWFpbC5jb20iLCJ1cGRhdGVEYXRZSI6Ikxpc3QifQ==",
  "topic": "PLP_GLOBAL_QA",
  "partition": 0,
  "offset": 1826,
  "timestamp": "2020-09-10T16:09:08.606Z",
  "timestampType": 0
}

它似乎显示了主题信息(键、值、主题、分区、偏移量等等)我是否遗漏了什么?
如果需要,我可以添加更多信息。

guykilcj

guykilcj1#

spark流作业以序列化的形式显示数据,而kafka使用者已经反序列化了数据。
根据spark结构化Kafka集成指南,您不仅可以获得Kafka信息的关键和价值,还可以获得其他(元)信息。以下是您从Kafka收到的每条消息的模式:

Column      Type
key         binary
value       binary
topic       string
partition   int
offset      long
timestamp   timestamp
timestampType   int

如果只想选择键和值,甚至只想选择值,则可以选择它们并将其转换为可读字符串:

[...]
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

相关问题