python—使用spark将值数组格式化为串联字符串

hjzp0vay  于 2021-07-14  发布在  Spark
关注(0)|答案(2)|浏览(263)

我有一个spark数据框 sdf 数百万行包含gps坐标:

+------------------+-------------------+------+
|          latitude|          longitude|radius|
+------------------+-------------------+------+
| 37.75243634842733|-122.41924881935118|    10|
| 37.75344580658182|-122.42006421089171|    10|
| 37.75405656449232|  -122.419216632843|    10|
|37.753649393112184|-122.41784334182738|    10|
| 37.75409897804892| -122.4169099330902|    10|
|37.753937806404586|-122.41549372673035|    10|
| 37.72767062183685| -122.3878937959671|    10|
| 37.72710631810977| -122.3884356021881|    10|
| 37.72605407110467|-122.38841414451599|    10|
| 37.71141865080228|-122.44688630104064|    10|
|37.712199505873926|-122.44474053382874|    10|
|37.713285899241896|-122.44361400604248|    10|
| 37.71428740401767|-122.44260549545288|    10|
|37.712810604103346|-122.44156479835509|    10|
| 37.75405656449232| -122.4169099330902|    10|
|37.753649393112184|-122.41549372673035|    10|
+------------------+-------------------+------+

我想将lat lon值格式化为一个串联的字符串(用分号分隔),以便将这些坐标提供给(osrm)请求。我可以通过以下方式实现这一目标:


# 1. Converting spark df to pandas df

pdf = sdf.toPandas()

# 2. Setting coordinates

coords = pdf[['latitude', 'longitude']].values
coords

# 3. Formatting NumPy array of (lat, long) coordinates into a concatenated string formatted for the OSRM server

def format_coords(coords: np.ndarray) -> str:
    coords = ";".join([f"{lon:f},{lat:f}" for lat, lon in coords])
    return coords

format_coords(coords)

# 4. Desired output:

'-122.419249,37.752436;-122.420064,37.753446;-122.419217,37.754057;-122.417843,37.753649;-122.416910,37.754099;-122.415494,37.753938;-122.387894,37.727671;-122.388436,37.727106;-122.388414,37.726054;-122.446886,37.711419;-122.444741,37.712200;-122.443614,37.713286;-122.442605,37.714287;-122.441565,37.712811;-122.416910,37.754057;-122.415494,37.753649'

尽管这适用于较小的数据集,但此工作流(尤其是将pandas转换为sparkDataframe)需要花费很长时间。有没有一种解决方案可以直接从sparkDataframe读取坐标而不必首先将其转换为pandas?

cnh2zyt3

cnh2zyt31#

您可以按如下方式聚合Dataframe:

import pyspark.sql.functions as F

output = df.agg(
    F.concat_ws(';', 
        F.collect_list(
            F.format_string('%f,%f', 'longitude', 'latitude')
        )
    )
).head()[0]

print(output)

# '-122.419249,37.752436;-122.420064,37.753446;-122.419217,37.754057;-122.417843,37.753649;-122.416910,37.754099;-122.415494,37.753938;-122.387894,37.727671;-122.388436,37.727106;-122.388414,37.726054;-122.446886,37.711419;-122.444741,37.712200;-122.443614,37.713286;-122.442605,37.714287;-122.441565,37.712811;-122.416910,37.754057;-122.415494,37.753649'
pokxtpni

pokxtpni2#

您可以使用一个有弹性的分布式数据集来实现这一点,我不确定在您的情况下这会有多大的性能:

sdf2 = sdf.rdd.map(lambda x: f"{x.longitude},{x.latitude}")
print(";".join(sdf2.collect()))

当我在一个只有几行的示例上运行时,它返回:

-122.41924881935118,37.75243634842733;-122.42006421089171,37.75344580658182[...}

相关问题