我是spark的初学者,我被困在如何使用dataframe发出sql请求。
我有以下两个Dataframe。
df_zones
+-----------------+-----------------+----------------------+---------------------+
|id |geomType |geom |rayon |
+-----------------+-----------------+----------------------+---------------------+
|30 |Polygon |[00 00 00 00 01 0...] |200 |
|32 |Point |[00 00 00 00 01 0.. ] |320179 |
+-----------------+-----------------+----------------------+---------------------+
df_tracking
+-----------------+-----------------+----------------------+
|idZones |Longitude |Latitude |
+-----------------+-----------------+----------------------+
|[30,50,100,] | -7.6198783 |33.5942549 |
|[20,140,39,] |-7.6198783 |33.5942549 |
+-----------------+-----------------+----------------------+
我想执行以下请求。
"SELECT zones.* FROM zones WHERE zones.id IN ("
+ idZones
+ ") AND ((zones.geomType='Polygon' AND (ST_WITHIN(ST_GeomFromText(CONCAT('POINT(',"
+ longitude
+ ",' ',"
+ latitude
+ ",')'),4326),zones.geom))) OR ( (zones.geomType='LineString' OR zones.geomType='Point') AND ST_Intersects(ST_buffer(zones.geom,(zones.rayon/100000)),ST_GeomFromText(CONCAT('POINT(',"
+ longitude
+ ",' ',"
+ latitude
+ ",')'),4326)))) "
我真的被卡住了,我应该加入两个Dataframe还是什么?我尝试用id和idzone连接两个Dataframe,如下所示:
df_tracking.select(explode(col("idZones").as ("idZones"))).join(df_zones,col("idZones").equalTo(df_zones.col("id")));
但在我看来,加入并不是正确的选择。
我需要你的帮助。
谢谢您
1条答案
按热度按时间y53ybaqx1#
你可以转换
df_tracking.idZones eg: [20, 140, 39]
变成一个Array()
类型和用途array_contains()
在连接一系列元素的同时,使事情变得更简单。示例代码:
输出:
edit-1:在上面的继续中,通过定义
SPARK UDF's
下面的代码片段为您提供了一个简单的想法。输出: