scala—计算spark.sql数据库列中包含的列表中特定元素的最长序列

balp4ylt  于 2021-07-26  发布在  Java
关注(0)|答案(2)|浏览(259)

我想解决以下问题。
我从一个查询中创建了以下Dataframe val temp = spark.sql("select Id, collect_list(from) as letter from f group by Id") ```
|Id| letter|
+-----------+---------------+
| 106| [c]|
| 101| [p]|
| 104|[c, c, c, t, u]|
| 100|[d, t, j, j, c]|
| 110| [p, n, f]|
| 113|[s, c, c, b, ..|
| 115|[u, s, t, c, ..|
| 11| [c, c, i, s]|
| 117| [d, d, p, s]|
| 118|[a, s, c, t, ..|
| 123| [d, n]|
| 125| [n, b]|
| 128| [c]|
| 131| [c, t, c, u]|
| 132| [c, u, i]|
| 134|[c, p, j, u, c]|
| 136|[b, a, t, n, c]|
| 137| [b, a]|
| 138| [b, t, c]|
| 141| [s]|

我想创建一个名为“n”的新列此列将包含一个数值,该数值表示在“c”出现之前单元格中最长的字母序列。最长的序列可以在列表中的任何位置。
例如,本节的“解决方案”列(假设没有任何内容被….截断)将为 `0, 1, 3, 5, 3, 2, 4, 4, 4, 4, 2, 2, 1, 4, 2, 5, 5, 2, 3, 1` 到目前为止,我已经尝试实现这个解决方案,但它没有工作,因为解决方案是在一个列表,而不是一个Dataframe。
scala函数来统计特定的事件
任何帮助都将不胜感激。谢谢您!
cclgggtu

cclgggtu1#

下面是如何使用Spark functions ,可以使用spark转换给定的scala函数 functions 如下所示

import org.apache.spark.sql.functions._

df.withColumn("n_trip",
  array_max(
    transform(
      filter(
        split(array_join($"trip", " "), "co"),
        (col: Column) => (col =!= "" || col =!= null)
      ), (col: Column) => size(split(trim(col), " "))
    )
  ))
  .withColumn("n_trip", when($"n_trip".isNull, 0).otherwise($"n_trip"))
  .show(false)

输出:

+-----------+--------------------+------+
|passengerId|trip                |n_trip|
+-----------+--------------------+------+
|10096      |[co]                |0     |
|10351      |[pk]                |1     |
|10436      |[co, co, cn, tj, us]|3     |
|1090       |[dk, tj, jo, jo, ch]|5     |
|11078      |[pk, no, fr]        |3     |
|11332      |[sg, cn, co, bm]    |2     |
|11563      |[us, sg, th, cn]    |4     |
|1159       |[ca, cl, il, sg]    |4     |
|11722      |[dk, dk, pk, sg]    |4     |
|11888      |[au, se, ca, tj]    |4     |
|12394      |[dk, nl]            |2     |
|12529      |[no, be]            |2     |
|12847      |[cn]                |1     |
|13192      |[cn, tk, cg, uk]    |4     |
|13282      |[co, us, iq]        |2     |
|13442      |[cn, pk, jo, us, ch]|5     |
|13610      |[be, ar, tj, no, ch]|5     |
|13772      |[be, at]            |2     |
|13865      |[be, th, cn]        |3     |
|14157      |[sg]                |1     |
+-----------+--------------------+------+
j2datikz

j2datikz2#

您可以编写一个用户定义函数(udf)来计算所需的内容。有很多方法可以计算最长的序列。一个简单的方法是在 "co" ,计算每个子序列的大小并取最大值。

val longuest_seq = udf((x : Seq[String]) => {
    x.reduce(_ +" "+_)
     .split(" *co *")
     .map(_.count(_ == ' ') + 1)
     .max
})

val df = Seq(
    (1, Array("x", "y", "co", "z")),
    (2, Array("x")),
    (3, Array("co", "t")),
    (4, Array("a", "b", "c", "d", "co", "e"))
).toDF("id", "trip")

df.withColumn("n_trips", longuest_seq('trip)).show

这就产生了

+---+-------------------+-------+
| id|               trip|n_trips|
+---+-------------------+-------+
|  1|      [x, y, co, z]|      2|
|  2|                [x]|      1|
|  3|            [co, t]|      1|
|  4|[a, b, c, d, co, e]|      4|
+---+-------------------+-------+

相关问题