示例数据集:
id,Amount1,Amount2,Account1,Account2
1,10000,20000,AAA,ABC
2,33333,30000,BBB,CDE
3,50000,45000,CCC,BAA
4,60000,65600,DDD,DSP
5,45000,56000,EEE,CBMN
6,20000,25000,FFF,FSS
7,46000,25000,GGG,LAA
8,85000,15000,HHH,MIS
9,90000,10000,III,GML
10,78000,8000,JJJ,SMA
account1取amount1的值,
account2取amount2的值,
acc=金额1和金额2之和
最终输出应为json格式,如下所示,
{
"id":1,
"AAA":10000,
"ABC":20000,
"ACC":30000
}
{
"id":2,
"BBB":33333,
"CDE":30000,
"ACC":63333
}
{
....
....
....
....
}
2条答案
按热度按时间abithluo1#
你可以用
rdd.map
创建一个dict,然后你可以收集它作为对于json,可以使用
json.dumps()
但是它会使每一行都成为字符串,所以您可以使用这个hack来传递它json.loads
如下所示5t7ly7z52#
对于scala world
创建
case class Input(id:String,Amount1:String,Amount2:String,Account1:String,Account2:String)
```val df = spark.read.option("header",true).csv(inputFile).as[Input]
df.show
+---+-------+-------+--------+--------+
| id|Amount1|Amount2|Account1|Account2|
+---+-------+-------+--------+--------+
| 1| 10000| 20000| AAA| ABC|
| 2| 33333| 30000| BBB| CDE|
| 3| 50000| 45000| CCC| BAA|
| 4| 60000| 65600| DDD| DSP|
| 5| 45000| 56000| EEE| CBMN|
| 6| 20000| 25000| FFF| FSS|
| 7| 46000| 25000| GGG| LAA|
| 8| 85000| 15000| HHH| MIS |
| 9| 90000| 10000| III| GML|
| 10| 78000| 8000| JJJ| SMA|
+---+-------+-------+--------+--------+
df.map(x => {
s"""{
"id": ${x.id.trim},
"${x.Account1}" : ${x.Amount1.trim},
"${x.Account2}" : ${x.Amount2.trim},
"ACC" : ${x.Amount1.trim.toInt+x.Amount2.trim.toInt}
}""".stripMargin
}).show(false)