我有个问题:
我用结构化流媒体读取Kafka的数据,数据是csv行。当我从kafka获取数据时,我有一个流式Dataframe,csv行在“value”中,它是一个字节序列。
sDF2 = sDF.selectExpr("CAST(value as string)").select( split("value",","))
使用这个我有了一个新的Dataframe,其中“value”是一个字符串,它是csv行。
如何获得一个新的dataframe,在这里我已经解析了csv字段并将其拆分为dataframe列?
示例:csv行是“d,123,frgh,1321”
sDF schema, which contains the data downloaded from Kafka, is
key, value, topic, timestamp etc... and here value is a byte sequence with no type
sDF2.schema has only a column ( named value of type string )
我喜欢新的Dataframe是
sDF3.col1 = abcd
sDF3.col2 = 123
sDF3.col3 = frgh ...etc
其中所有列都是字符串。
我仍然可以做到:
sDF3 = sDF2.select( sDF2.csv[0].alias("EventId").cast("string"),
sDF2.csv[1].alias("DOEntitlementId").cast("string"),
sDF2.csv[3].alias("AmazonSubscriptionId").cast("string"),
sDF2.csv[4].alias("AmazonPlanId").cast("string"),
... etc ...
但看起来很难看。
1条答案
按热度按时间v6ylcynt1#
我没有试过,但像这样的东西应该管用。