使用所需的键和值更新map类型的spark dataframe的列

relj7zay  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(321)

我有一个下面的sparkDataframe,其中所有的列(除了主键列empïid)都由一个Map组成(其中键“from”和“to”可以有空值)。我想计算每一列的'from'和'to'(emp\u id除外),并向Map(名为'change')添加一个新键,该Map的值为a)'insert'if'from'value为null,to'不为null b)'delete'if'to'value为null,from'不为null b)'update'if'from'和'to'不为null&'from'value不同于'to'value
注意:具有空值的列将保持不变。
我们如何在scala中实现这一点。

|emp_id|emp_city             |emp_name                    |emp_phone            |emp_sal                    |emp_site                          |

|1     |null                 |[from -> Will, to -> Watson]|null                 |[from -> 1000, to -> 8000]|[from ->, to -> Seattle]          |
|3     |null                 |[from -> Norman, to -> Nate]|null                 |[from -> 1000, to -> 8000]|[from -> CherryHill, to -> Newark]|
|4     |[from ->, to -> Iowa]|[from ->, to -> Ian]        |[from ->, to -> 1004]|[from ->, to -> 8000]     |[from ->, to -> Des Moines]       |

预期:

|emp_id|emp_city             |emp_name                    |emp_phone            |emp_sal                    |emp_site                          |

|1     |null                 |[from -> Will, to -> Watson, change -> update]|null                 |[from -> 1000, to -> 8000, change -> update]|[from ->, to -> Seattle, change -> insert]          |
|3     |null                 |[from -> Norman, to -> Nate, change -> update]|null                 |[from -> 1000, to -> 8000, change -> update]|[from -> CherryHill, to -> Newark, change -> update]|
|4     |[from ->, to -> Iowa, change -> insert]|[from ->, to -> Ian, change -> insert]        |[from ->, to -> 1004, change -> insert]|[from ->, to -> 8000, change -> insert]     |[from ->, to -> Des Moines, change -> insert]       |
btxsgosb

btxsgosb1#

实现这一点的一种方法是使用 UDF ,这不是一个很好的解决方案,但我想不出其他解决方案。
尽量不要使用 UDF 尽可能

val updateMap = udf((input: Map[String, String]) => {
  if (input == null || input.isEmpty)
    Map.empty[String, String]
  else if (input("from") == null && input("to") != null)
    input + ("change" -> "insert")
  else if (input("from") != null && input("to") == null)
    input + ("change" -> "delete")
  else if (!(input("from").equals(input("to"))))
    input + ("change" -> "update")
  else
    Map.empty[String, String]

})

val result = df.columns.tail.foldLeft(df) { (acc, name) =>
  acc.withColumn(name, updateMap(col(name)))
}

确保您的列 Map[String, String] 希望这有帮助!

相关问题