dataframe—使用结构化流将数组扩展到spark中的列

rks48beu  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(394)

我有个问题:
我用结构化流媒体读取Kafka的数据,数据是csv行。当我从kafka获取数据时,我有一个流式Dataframe,csv行在“value”中,它是一个字节序列。

sDF2 = sDF.selectExpr("CAST(value as string)").select( split("value",","))

使用这个我有了一个新的Dataframe,其中“value”是一个字符串,它是csv行。
如何获得一个新的dataframe,在这里我已经解析了csv字段并将其拆分为dataframe列?
示例:csv行是“d,123,frgh,1321”

sDF schema, which contains the data downloaded from Kafka, is  
key, value, topic, timestamp etc... and here value is a byte sequence with no type

sDF2.schema has only a column ( named value of type string )

我喜欢新的Dataframe是

sDF3.col1 = abcd
sDF3.col2 = 123
sDF3.col3 = frgh ...etc

其中所有列都是字符串。
我仍然可以做到:

sDF3 = sDF2.select( sDF2.csv[0].alias("EventId").cast("string"),
 sDF2.csv[1].alias("DOEntitlementId").cast("string"),               
 sDF2.csv[3].alias("AmazonSubscriptionId").cast("string"),
 sDF2.csv[4].alias("AmazonPlanId").cast("string"),
 ... etc ...

但看起来很难看。

v6ylcynt

v6ylcynt1#

我没有试过,但像这样的东西应该管用。

sDF2 = 
      sDF.selectExpr("CAST(value as string)")
       .alias("csv").select("csv.*")
       .select("split(value,',')[0] as DOEntitlementId", 
               "split(value,',')[1] as AmazonSubscriptionId", 
               "split(value,',')[2] as AmazonPlanId")

相关问题