如何将数据集< row>转换为json消息的数据集以写入kafka?

cyej8jka  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(449)

我使用spark 2.1.1。
我有以下几点 DataSet<Row> ds1型;

name   | ratio | count  // column names
"hello" |  1.56 | 34

( ds1.isStreaming 给予 true )
我正在努力创造 DataSet<String> ds2。换句话说,当我写KafkaFlume我想写这样的东西

{"name": "hello", "ratio": 1.56, "count": 34}

我试过这样的方法 df2.toJSON().writeStream().foreach(new KafkaSink()).start() 但它给出了以下错误

Queries with streaming sources must be executed with writeStream.start()

to_json 以及 json_tuple 然而,我不知道如何利用他们在这里?
我试着用 json_tuple() 功能

Dataset<String> df4 = df3.select(json_tuple(new Column("result"), " name", "ratio", "count")).as(Encoders.STRING());

我得到以下错误:
无法解析' result '给定的输入列:[name,ratio,count];;

vmdwslir

vmdwslir1#

热释光;dr使用 struct 函数后跟 to_json (作为 toJSON 由于spark-17029(20天前刚刚修复),流式数据集已损坏)。
引用struct的scaladoc:
struct(colname:string,colnames:string*):column创建一个由多个输入列组成的新struct列。
如果您使用java api,那么struct function也有4种不同的变体:
公共静态列结构(列。。。cols)创建新的结构列。
使用tojson函数,您的案例包括:
public static column to \u json(column e)将包含structtype的列转换为具有指定模式的json字符串。
以下是scala代码(将其转换为java是您的家庭练习):

val ds1 = Seq(("hello", 1.56, 34)).toDF("name", "ratio", "count")
val recordCol = to_json(struct("name", "ratio", "count")) as "record"
scala> ds1.select(recordCol).show(truncate = false)
+----------------------------------------+
|record                                  |
+----------------------------------------+
|{"name":"hello","ratio":1.56,"count":34}|
+----------------------------------------+

我还尝试了一下您的解决方案(使用今天构建的spark2.3.0-snapshot),它似乎工作得非常完美。

val fromKafka = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  load.
  select('value cast "string")
fromKafka.
  toJSON. // <-- JSON conversion
  writeStream.
  format("console"). // using console sink
  start
``` `format("kafka")` 在spark-19719中添加,在2.1.0中不可用。

相关问题