因此,我有一个案例类客户数据和一个案例类帐户数据,如下所示:
case class CustomerData(
customerId: String,
forename: String,
surname: String
)
case class AccountData(
customerId: String,
accountId: String,
balance: Long
)
字符串
我需要把这两个连接起来,让它们形成下面的case类:
case class CustomerAccountOutput(
customerId: String,
forename: String,
surname: String,
//Accounts for this customer
accounts: Seq[AccountData],
//Statistics of the accounts
numberAccounts: Int,
totalBalance: Long,
averageBalance: Double
)
型
我需要表明,如果空是出现在accountsId或余额thennumber的帐户为0,总余额为空和平均余额也为空。替换为0也接受空。
最后的结果应该是这样的:
+----------+-----------+--------+---------------------------------------------------------------------+--------------+------------+-----------------+
|customerId|forename |surname |accounts |numberAccounts|totalBalance|averageBalance |
+----------+-----------+--------+---------------------------------------------------------------------+--------------+------------+-----------------+
|IND0113 |Leonard |Ball |[[IND0113,ACC0577,531]] |1 |531 |531.0 |
|IND0277 |Victoria |Hodges |[[IND0277,null,null]] |0 |null |null |
|IND0055 |Ella |Taylor |[[IND0055,ACC0156,137], [IND0055,ACC0117,148]] |2 |285 |142.5 |
|IND0129 |Christopher|Young |[[IND0129,null,null]] |0 |null
型
我已经得到了两个要加入的case类,下面是代码:
val customerDS = customerDF.as[CustomerData]
val accountDS = accountDF.withColumn("balance",'balance.cast("long")).as[AccountData]
//END GIVEN CODE
val customerAccountsDS = customerDS.joinWith(accountDS,customerDS("customerID") === accountDS("customerID"),"leftouter")
型
我怎么才能得到上面的结果呢?我根本不允许使用**“spark.sql.function._"**库。
1条答案
按热度按时间f45qwnt81#
你应该可以通过使用spark中的
concat_ws
和collect_list
函数来实现。字符串
您可以看到如下输出: