我正在与foreachwriter[row]合作实现一个定制的spark sink。
对于过程函数,我想得到一个整型字段的值 val row = Row("city","name","age")
我想把年龄取为int,其余字段取为string。
def process(row: Row) = {
val fieldNames = row.schema.fieldNames
val rowAsMap = row.getValuesMap(fieldNames)
使用getvaluesmap,每个字段都被解析为一个字符串。
我考虑了模式匹配来更改getvaluesmap函数:
val rowAsMap = fieldNames.map {
case "age" => row.getAs[Int]("age")
case _ => row.getAs[String]
}.toMap
这是不起作用的,因为它总是作为一个字符串写在接收器中,任何帮助/想法,从行中获取预期类型的值
2条答案
按热度按时间yfjy0ee71#
你能补充一下“不工作”的细节吗?仍然以字符串形式返回“age”,抛出异常,是否发生其他问题?
总的来说,你的解决方案似乎还可以,不过我不确定
toMap
最后打电话-你没有提供Map的钥匙。也许可以试试u0njafvf2#
我不知道你为什么要把这种类型的铸造逻辑放进去
ForEachWriter[Row]
. 如果你想进入老年Int
它的调用者负责转换age
行内到Int
,没有?而且,我认为没有必要这样做-