Scala spark - flatmap或替代函数

koaltpgm  于 6个月前  发布在  Scala
关注(0)|答案(2)|浏览(61)

我刚从apache flink开始使用spark,想寻求一些关于转换需求的建议。在flink中,我会用有状态的flatmap来管理这个问题,但不确定是否同样适用于spark,或者有更好的替代方案。
我阅读从一个增量表,并需要写入输出到另一个
鉴于下表格式
| 时间|名称|值|
| --|--|--|
| 2023-11-01T12:51| ID| 75B|
| 2023-11-01T12:52|压力| 5 |
| 2023-11-01T12:56|电阻| 20 |
| 2023-11-01T12:57| ID| 55C|
| 2023-11-01T12:57|压力| 10 |
我要求输出为
| 时间|名称|值|ID|
| --|--|--|--|
| 2023-11-01T12:52|压力| 5 |75B|
| 2023-11-01T12:56|电阻| 20 |75B|
| 2023-11-01T12:57|压力| 10 |55C|
从本质上讲,将每个值与当时的活动ID对齐,所有数据都是时间序列。
所以我的问题是
1.这种对齐的最佳方法是什么?是有状态的平面Map,还是通过简单的数据框转换进行管理
1.如果是flatmap,则是实现这类程序的一个很好的资源

t5fffqht

t5fffqht1#

可以使用row_number函数将唯一标识符分配给每行,并通过Window使用范围找到前一个ID行。接收到的连接到自身的结构:

val df = Seq(
  ("2023-11-01T12:51", "ID", "75B"),
  ("2023-11-01T12:52", "Pressure", "5"),
  ("2023-11-01T12:56", "Resistance", "20"),
  ("2023-11-01T12:57", "ID", "55C"),
  ("2023-11-01T12:57", "Pressure", "10")
).toDF("Time", "Name", "Value")

val withCurrentRowWindow = Window.orderBy("Time")
  .rangeBetween(Window.unboundedPreceding, Window.currentRow)

val withIdRowDetected = df
  .withColumn("row_number", row_number().over(Window.orderBy("Time")))

  .withColumn("id_row_number", max(
    when($"Name" === lit("ID"), $"row_number").otherwise(null)
  ).over(withCurrentRowWindow))

withIdRowDetected.show(false)

val result = withIdRowDetected
  .alias("non_ids")
  .where($"Name" =!= lit("ID"))
  .join(withIdRowDetected
    .alias("ids")
    .where($"Name" === lit("ID")),
    $"non_ids.id_row_number" === $"ids.row_number", "inner")
  .select($"non_ids.Time", $"non_ids.Name", $"non_ids.Value", $"ids.Value".alias("ID"))

字符串
输出量:

+----------------+----------+-----+----------+-------------+
|Time            |Name      |Value|row_number|id_row_number|
+----------------+----------+-----+----------+-------------+
|2023-11-01T12:51|ID        |75B  |1         |1            |
|2023-11-01T12:52|Pressure  |5    |2         |1            |
|2023-11-01T12:56|Resistance|20   |3         |1            |
|2023-11-01T12:57|ID        |55C  |4         |4            |
|2023-11-01T12:57|Pressure  |10   |5         |4            |
+----------------+----------+-----+----------+-------------+

+----------------+----------+-----+---+
|Time            |Name      |Value|ID |
+----------------+----------+-----+---+
|2023-11-01T12:52|Pressure  |5    |75B|
|2023-11-01T12:56|Resistance|20   |75B|
|2023-11-01T12:57|Pressure  |10   |55C|
+----------------+----------+-----+---+


注意:使用没有分区的窗口,性能可能会很差。

kokeuurv

kokeuurv2#

下面是我使用窗口函数的解决方案:

import org.apache.spark.sql.functions._

val data = Seq((5, "Pressure", "10"), (4, "ID", "55C"), (3, "Resistance", "20"), (2, "Pressure", "5"), (1, "ID", "75B"))
val inputDf = spark.createDataFrame(d1).toDF("time", "name", "value")
inputDf.createOrReplaceTempView("t")
val sql = """
select time, name, value, MAX(time_tmp) OVER (ORDER BY time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as ID
from (
  select *, CASE name WHEN "ID" THEN time ELSE -1 END as time_tmp
  from t
) as t2
"""
val outputDf = spark.sql(sql)

inputDf.show()
outputDf.show()

字符串
输出量:

+----+----------+-----+
|time|      name|value|
+----+----------+-----+
|   5|  Pressure|   10|
|   4|        ID|  55C|
|   3|Resistance|   20|
|   2|  Pressure|    5|
|   1|        ID|  75B|
+----+----------+-----+

+----+----------+-----+---+
|time|      name|value| ID|
+----+----------+-----+---+
|   1|        ID|  75B|  1|
|   2|  Pressure|    5|  1|
|   3|Resistance|   20|  1|
|   4|        ID|  55C|  4|
|   5|  Pressure|   10|  4|
+----+----------+-----+---+

相关问题