如何在join条件下用一个函数合并两个Dataframe?

deikduxw  于 2021-05-18  发布在  Spark
关注(0)|答案(2)|浏览(415)

我想用一个条件合并两个Dataframe。
在这个条件下,有一个计算浮点的函数。此函数的输入是转换为浮点的字符串。
这是我的密码:

def calculateStraightLineDistance(lat1, lon1, lat2, lon2):
    p = pi/180
    a = 0.5 - cos((lat2-lat1)*p)/2 + cos(lat1*p) * cos(lat2*p) * (1-cos((lon2-lon1)*p))/2
    return 12742 * asin(sqrt(a))

join_condition = [calculateStraightLineDistance(float(sd.lat_soc), float(sd.lon_soc),
                                               float(ad.lat_agen), float(ad.lon_agen)) <= 50.0 ]

merged_data = sd.join(ad, join_condition, 'left')

但是,此代码引发以下错误:

TypeError                                 Traceback (most recent call last)
<ipython-input-30-7e6c5d9332f8> in <module>()
      9 
     10 
---> 11 join_condition = [calculateStraightLineDistance(float(sd.lat_soc), float(sd.lon_soc),
     12                                                float(ad.lat_agen), float(ad.lon_agen)) <= 50.0 ]
     13 

TypeError: float() argument must be a string or a number, not 'Column'

我不确定这里有什么问题。以前的条件,没有任何铸造和功能,工作良好。例如:

join_condition = [ad.lat_agen >=  sd.lat_soc]

我做错什么了?
编辑:

ad= spark.sql(agencies_query)
ad.lat_agen.cast("float")
ad.printSchema()

将打印:

|-- lat_agen: string (nullable = true)
 |-- lon_agen: string (nullable = true)
q5lcpyga

q5lcpyga1#

你正在应用一个函数 float 到一列 sd.lat_soc . float 正如消息明确指出的,只接受字符串或数字,而不接受列对象。如果要将列的内容转换为浮点值,请使用: sd.lat_soc.cast("float") .

osh3o9ms

osh3o9ms2#

尽管史蒂文的答案是我在网上能找到的,但我还是没能成功。
然而,我是如何做到这一点的:

ad = ad.withColumn("lat_gen", ad.lat_agen.cast(FloatType()))

printschema提供预期的输出:

ad.printschema()
``` `|-- lat_agen: float (nullable = true` )

相关问题