我试图得到Kafka的消息,并处理它与Spark在独立。kafka以json格式存储数据。我可以得到kafka消息,但不能用定义的模式解析json数据。
当我运行 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_kafka_topic --from-beginning
命令查看Kafka主题中的Kafka消息,其输出如下:
"{\"timestamp\":1553792312117,\"values\":[{\"id\":\"Simulation.Simulator.Temperature\",\"v\":21,\"q\":true,\"t\":1553792311686}]}"
"{\"timestamp\":1553792317117,\"values\":[{\"id\":\"Simulation.Simulator.Temperature\",\"v\":22,\"q\":true,\"t\":1553792316688}]}"
我可以用spark中的代码块成功地获取这些数据:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "my_kafka_topic") \
.load() \
.select(col("value").cast("string"))
模式如下:
df.printSchema()
root
|-- value: string (nullable = true)
然后将此Dataframe写入控制台并打印kafka消息:
Batch: 9
-------------------------------------------
+--------------------+
| value|
+--------------------+
|"{\"timestamp\":1...|
+--------------------+
但是我想解析json数据来定义模式和我尝试过的代码块:
schema = StructType([ StructField("timestamp", LongType(), False), StructField("values", ArrayType( StructType([ StructField("id", StringType(), True), StructField("v", IntegerType(), False), StructField("q", BooleanType(), False), StructField("t", LongType(), False) ]), True ), True) ])
parsed = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "my_kafka_topic") \
.load() \
.select(from_json(col("value").cast("string"), schema).alias("opc"))
以及 parsed
Dataframe:
parsed.printSchema()
root
|-- opc: struct (nullable = true)
| |-- timestamp: string (nullable = true)
| |-- values: struct (nullable = true)
| | |-- id: string (nullable = true)
| | |-- v: integer (nullable = true)
| | |-- q: boolean (nullable = true)
| | |-- t: string (nullable = true)
这些代码块运行正常。但是当我想写作的时候 parsed
控制台的Dataframe:
query = parsed\
.writeStream\
.format("console")\
.start()
query.awaitTermination()
它在写作 null
在控制台中如下所示:
+----+
| opc|
+----+
|null|
+----+
所以,解析json数据似乎有问题,但无法解决。
你能告诉我怎么了吗?
1条答案
按热度按时间c8ib6hqw1#
似乎架构不适合您的情况请尝试应用下一个:
还要记住
inferSchema
选项工作得很好,因此您可以让spark发现模式并保存它。另一个问题是json数据有前导双引号和尾随双引号
"
它还包含\
这些生成了一个无效的json,它阻止了spark解析消息。为了删除无效字符,您的代码应修改如下:
现在您的输出应该是:
祝你好运!