spark-前一天的阻力值

fafcakar  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(348)

我每天都在更新 itemPrice 列的值 dailyRecords 在Dataframe中。下面是特定列的模式。

|-- dailyRecords: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dayId: integer (nullable = true)
 |    |    |-- itemPrice: double (nullable = true)
 |    |    |-- itemsPurchased: integer (nullable = true)
 |    |    |-- itemSku: string (nullable = true)

(请注意,可能还有许多其他列)。我们可以假设dayid可以是一个从1递增到365的序列。
所以对于每个 itemSkuitemsPurchased is greater than 0 以及 itemPrice is equal to 0 然后更新 itemPriceprevious dayId (例如:如果 dayId is 10 如果这个条件满足 itemPrice from dayId 9 ),否则请离开 itemPrice 就这样继续下一步 dayId 我们将非常感谢任何通向理想解决方案或可能方法的线索
谢谢!
在考虑注解之后,当要更有效地更新多个列时,另一种替代解决方案是:
上面的架构 dailyRecords 通过组合来自 another dataframe . 因此,考虑到评论,最好更新 itemPrice 当它位于一个单独的Dataframe中时,而不是在将列嵌套在一个列中之后执行此操作 dailyRecords .
下面是我要更新的解决方案 itemPrice 值使用 foldLeft .
https://stackoverflow.com/a/62307771/12322995
请注意我使用的这个解决方案 foldLeft 是因为我要更新的列比 itemPrice 在问题之外。

ntjbwcob

ntjbwcob1#

最好在执行此操作之前在另一个df中生成正确的itemprice struct 以及 collect_list 如下图所示:

scala> val anotherDF = List(
     | (1,10.11,5,"item1"),(2,15.45,3,"item1"),(3,0.0,3,"item1"),(4,17.50,4,"item1"),
     | (1,10.11,5,"item2"),(2,0.0,0,"item2"),(3,16.50,3,"item2"),(4,17.50,4,"item2"),
     | (1,20.20,5,"item3"),(2,0.0,3,"item3"),(3,30.50,3,"item3"),(4,0.0,4,"item3"),(5,0.0,4,"item3")
     | ).toDF("dayId","itemPrice","itemsPurchased","itemSku")
anotherDF: org.apache.spark.sql.DataFrame = [dayId: int, itemPrice: double ... 2 more fields]

scala> anotherDF.show
+-----+---------+--------------+-------+
|dayId|itemPrice|itemsPurchased|itemSku|
+-----+---------+--------------+-------+
|    1|    10.11|             5|  item1|
|    2|    15.45|             3|  item1|
|    3|      0.0|             3|  item1|
|    4|     17.5|             4|  item1|
|    1|    10.11|             5|  item2|
|    2|      0.0|             0|  item2|
|    3|     16.5|             3|  item2|
|    4|     17.5|             4|  item2|
|    1|     20.2|             5|  item3|
|    2|      0.0|             3|  item3|
|    3|     30.5|             3|  item3|
|    4|      0.0|             4|  item3|
|    5|      0.0|             4|  item3|
+-----+---------+--------------+-------+

scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> val ww = Window.partitionBy("itemSku").orderBy("dayId")
ww: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@4cb9f248

scala> anotherDF.withColumn("updatedPrice", when(col("itemPrice")===0 && col("itemsPurchased")>0, lag("itemPrice",1).over(ww)).otherwise(col("itemPrice"))).show
+-----+---------+--------------+-------+------------+
|dayId|itemPrice|itemsPurchased|itemSku|updatedPrice|
+-----+---------+--------------+-------+------------+
|    1|     20.2|             5|  item3|        20.2|
|    2|      0.0|             3|  item3|        20.2|
|    3|     30.5|             3|  item3|        30.5|
|    4|      0.0|             4|  item3|        30.5|
|    5|      0.0|             4|  item3|         0.0|
|    1|    10.11|             5|  item2|       10.11|
|    2|      0.0|             0|  item2|         0.0|
|    3|     16.5|             3|  item2|        16.5|
|    4|     17.5|             4|  item2|        17.5|
|    1|    10.11|             5|  item1|       10.11|
|    2|    15.45|             3|  item1|       15.45|
|    3|      0.0|             3|  item1|       15.45|
|    4|     17.5|             4|  item1|        17.5|
+-----+---------+--------------+-------+------------+

然后使用 updatedPriceanotherDF 作为你的 itemPrice .

hfyxw5xn

hfyxw5xn2#

方法 dragPricesFromPreviousDay 获取两个参数,一个是要更新的列的列表,另一个是需要对其执行更新的Dataframe。
我注意到了 .na.fill(0, Seq(priceCol)) 因为如果它添加的列没有前一天的值 null 当条件满足时。因此,我们可以通过用文本0或任何其他值填充空值来消除空值。

def dragPricesFromPreviousDay (PricesNeedsUpdateDF: DataFrame, AllColumnsToBeUpdated: List[String]): DataFrame ={

    import spark.implicits._
    val windowFunc = Window.partitionBy("itemSku").orderBy("dayId")

    val dragPricesFromPreviousDayDF = AllColumnsToBeUpdated.foldLeft(scoresNeedsUpdateDF) {
      (df, priceCol) =>
        df.withColumn(priceCol,
          when($"itemsPurchased".gt(0) && $"itemPrice".equalTo(0), lag($"$priceCol",1).over(windowFunc))
            .otherwise($"$priceCol"))
          .na.fill(0, Seq(priceCol))
    }
    dragPricesFromPreviousDayDF
  }

相关问题