PySpark中向量间夹角的点积计算

bgtovc5b  于 4个月前  发布在  Spark
关注(0)|答案(1)|浏览(49)

我尝试在Databricks中使用PySpark计算两个向量之间的Angular 。这在Python中是一个相当琐碎的任务,但我似乎无法在Pyspark中创建一个有效的方法,主要的阻塞点是点积的计算。
由于我无法在Pyspark中计算点积,我选择使用numpy函数在UDF中实现我的方法,但结果没有我希望的那么快。我希望任何关于如何使用更多PySpark本身而不是依赖于numpy来实现这一点的输入。

import pandas as pd
import numpy as np

@udf("float")
def calculateAngle(x1, y1, x2, y2, x3, y3):
    a = np.array([x1,y1])
    b = np.array([x2,y2])
    c = np.array([x3,y3])

    ba = a - b
    bc = c - b

    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
    returnValue = np.degrees(np.arccos(cosine_angle))
    return returnValue.item()

data = {'x1':1.23, 'y1':3.23, 'x2':1.25, 'y2':3.2, 'x3':1.3, 'y3':2.8,}
df = pd.DataFrame(data, index=[0]) 
df = spark.createDataFrame(df)
df=df.withColumn("angle", calculateAngle('x1', 'y1', 'x2', 'y2', 'x3', 'y3'))
df=df.toPandas()
df

字符串

py49o6xq

py49o6xq1#

Python UDF很慢:Spark functions vs UDF performance?。2维向量之间的Angular 计算可以用Spark表示:

from pyspark.sql import Column
from pyspark.sql import functions as F

data = {'a1': 0, 'a2': 1, 'b1': 0, 'b2': 0, 'c1': 1, 'c2': 1}
_vals = [tuple(v for k, v in data.items())]
_schema = [k for k, v in data.items()]
df = spark.createDataFrame(_vals, _schema)

ba1 = F.col('a1') - F.col('b1')
ba2 = F.col('a2') - F.col('b2')
bc1 = F.col('c1') - F.col('b1')
bc2 = F.col('c2') - F.col('b2')
dot_product = ba1 * bc1 + ba2 * bc2

ba_length = F.sqrt((ba1 ** 2) + (ba2 ** 2))
bc_length = F.sqrt((bc1 ** 2) + (bc2 ** 2))

angle = F.acos(dot_product / (ba_length * bc_length))

df = df.withColumns({
    'angle_radians': angle,
    'angle_degrees': F.degrees(angle),
})
df.show()

# +---+---+---+---+---+---+------------------+-----------------+
# | a1| a2| b1| b2| c1| c2|     angle_radians|    angle_degrees|
# +---+---+---+---+---+---+------------------+-----------------+
# |  0|  1|  0|  0|  1|  1|0.7853981633974484|45.00000000000001|
# +---+---+---+---+---+---+------------------+-----------------+

字符串

相关问题