pyspark 如何在spark框架中不使用udf在两个列表之间生成点积?

camsedfj  于 4个月前  发布在  Spark
关注(0)|答案(1)|浏览(44)

我有一组物联网数据,在python作业中由azure Databricks转换。Databricks集群是13.3 LTS(包括Apache Spark 3.4.1,Scala 2.12)Standard_DS3_v2
我把这些信息放进一个增量表里。
我从delta表中检索数据,并在for循环中的2小时帧内开始将数据与其本身进行对抗,通过这样做,我正在进行大量的对抗(n*n)。
问题是udf可以工作,但速度很慢,1000条消息需要15分钟,而我需要在150万条消息的流量下停留不到10分钟。
我对udf做了以下操作:

for row in rows:
    try:
    mse = udf(lambda x : sum( (a - b)*(a - b) for a, b in zip(x[:-1] , row["enc"][:-1]))/450 )
    df_compare = df_compare.withColumn('diff_enc',sqrt(mse(df_entrance_compare.enc)))

字符串
其中rows是一个具有相同模式的df_compare和row[“enc”]的数组,并且df_compare.enc包含每个单元格的451个元素的列表:
| 列编码|
| --|
| [1.0、2.0、3.0、4.0、.]|
| [1.0、2.0、3.0、4.0、.]|
有没有一个更聪明更快的方法来使这个计算更快使用Spark?
停止使用Databricks而使用noslql数据库并在函数中进行计算可能是一个更好的主意吗?

ltqd579y

ltqd579y1#

如果你可以升级到Spark 3.5.0,你将可以使用reduce,它允许你用纯Spark函数来表达均方误差计算,没有UDF:

df = spark.sql("select array(1,2,2,3,3,4) as x, array(1,2,3,4,5,6) as enc")
df.withColumn("mse", reduce(arrays_zip(col("x"), col("enc")),
                     lit(0.0),
                     lambda acc, e: acc + pow(e["x"] - e["enc"], 2) / array_size(col("x")))
             ).show()
+------------------+------------------+------------------+
|                 x|               enc|               mse|
+------------------+------------------+------------------+
|[1, 2, 2, 3, 3, 4]|[1, 2, 3, 4, 5, 6]|1.6666666666666665|
+------------------+------------------+------------------+

字符串

相关问题