spark-使用scala将数组列的所有元素分组成批

yrdbyhpb  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(190)

我有一个带有数组类型列的Dataframe,其中行之间的元素数不同,如下面输入Dataframe的gps\u array\u size所示。
我需要通过将“requestid”字段嵌入到每个元组中(如输出Dataframe中所示),向一个一次包含5000个元素的外部服务发布一个http请求。
输入Dataframeschema:-

root
 |-- requestid: string (nullable = true)
 |-- GPS_Array: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- timestamp: double (nullable = true)
 |    |    |-- GPSLatitude: double (nullable = true)
 |    |    |-- GPSLongitude: double (nullable = true)
 |-- GPS_Array_Size: long (nullable = true)

输入dataframe:-
requestidgps\ U arraygps\ U array\ U sizeaaa[{“timestamp”:a1,“gpslatitude”:a1,“gpslatitude”:a1},{“timestamp”:a2,“gpslatitude”:a2},{“timestamp”:a6431,“gpslatitude”:a6431bbb[{“timestamp”:b1,“gpslatitude”:b1},{“timestamp”:b2,“gpslatitude”:b2,“gpslatitude”:b2},{“timestamp”:b1876,“gpslatitude”:b11876,“gpslatitude”:b11876}]11876ccc[{“timestamp”:c1,“gpslatitude”:c1,“gpslatitude”:c1},{“timestamp”:c2,“gpslatitude”:c2},{“timestamp”:c763,“gpslatitude”:c763,“gpslatitude”:c763}]763dd[{“timestamp”:d1,“gpslatitude”:d1},{“timestamp”:d2,“gpslatitude”:d2},…{“timestamp”:d5187,“gpslatitude”:d5187,“gpslatitude”:d5187}]5187eee[{“timestamp”:e1,“gpslatitude”:e1,“gpslatitude”:e1},{“timestamp”:e2,“gpslatitude”:e2,“gpslatitude”:e2},{“timestamp”:e1023,“gpslatitude”:e1023,“gpslatitude”:e1023}]1023
输出Dataframeschema:-

root
 |-- New_GPS_Array: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- requestid: string (nullable = true)
 |    |    |-- timestamp: double (nullable = true)
 |    |    |-- GPSLatitude: double (nullable = true)
 |    |    |-- GPSLongitude: double (nullable = true)
 |-- New_GPS_Array_Size: long (nullable = true)

输出dataframe:-
新的\u gps\u阵列新的\u gps\u阵列\u大小[{“requestid”:“aaa\u 1”,“timestamp”:a1,“gpslatitude”:a1,“gpslongitude”:a1},{“requestid”:“aaa\u 1”,“timestamp”:a2,“gpslatitude”:a2,“gpslongitude”:a2},{“requestid”:“aaa\u 1”,“timestamp”:a5000,“gpslatitude”:a5000]5000[{“requestid”:“aaa\u 2”,“timestamp”:a5001,“gpslatitude”:a5001,“gpslongitude”:a5001},{“requestid”:“aaa\u 2”,“timestamp”:a5002,“gpslatitude”:a5002,“gpslatitude”:a5002},{“requestid”:“aaa\u 2”,“timestamp”:a6431,“gpslatitude”:a6431,“gpslatitude”:a6431},{“requestid”:“bbb\u 1”,“timestamp”:b1,“gpslatitude”:b1,“gpslatitude”:b1},{“requestid”:“bbb\u 1”,“timestamp”:b3569,“gpslatitude”:b3569,“gpslatitude”:b3569}]5000[{“requestid”:“bbb\u 2”,“timestamp”:b3570,“gpslatitude”:b3570,“gpslngitude”:b3570},{“requestid”:“bbb\u 2”,“timestamp”:b3571,“gpslatitude”:b3571,“gpslngitude”:b3571},{“requestid”:“bbb\u 2”,“timestamp”:b8569,“gpslatitude”:b8569,“gpslngitude”:b8569}]5000[{“requestid”:“bbb\u 3”,“timestamp”:b8570,“gpslatitude”:b8570,“gpslngitude”:b8570},{“requestid”:“bbb\u 3”,“timestamp”:b8571,“gpslatitude”:b8571,“gpslngitude”:b8571},{“requestid”:“bbb\u 3”,“timestamp”:b11876,“gpslatitude”:b11876,“gpslngitude”:b11876},{“requestid”:“ccc\u 1”,“timestamp”:c1,“gpslatitude”:c1,“gpslngitude”:c1},{“requestid”:“ccc\u 1”,“timestamp”:c763,“gpslatitude”:c763,“gpslngitude”:c763},{“requestid”:“ddd\u 1”,“timestamp”:d1,“gpslatitude”:d1,“gpslatitude”:d1},{“requestid”:“ddd\u 1”,“timestamp”:d930,“gpslatitude”:d930,“gpslatitude”:d930}]5000[{“requestid”:“ddd\u 2”,“timestamp”:d931,“gpslatitude”:d931,“gpslatitude”:d931},{“requestid”:“ddd\u 2”,“timestamp”:d932,“gpslatitude”:d932},{“requestid”:“ddd\u 2”,“timestamp”:d5187,“gpslatitude”:d5187,“gpslongitude”:d5187},{“requestid”:“eee\u 1”,“timestamp”:e1,“gpslatitude”:e1,“gpslongitude”:e1},{“requestid”:“eee\u 1”,“timestamp”:e743,“gpslatitude”:e743,“gpslongitude”:e743}]5000[{“requestid”:“eee\u 2”,“timestamp”:e744,“gpslatitude”:e744},{“requestid”:“eee\u 2”,“timestamp”:e745,“gpslatitude”:e745,“gpslongitude”:e745},…{“requestid”:“eee\u 2”,“timestamp”:e1023,“gpslatitude”:e1023,“gpslongitude”:e1023}]280
如图所示,每个新批的requestid都需要更改。
我的解决方案是使用explode函数为gps\u数组中的每个元素创建一行,然后根据行号分配索引。所以如果爆炸后我有7亿行,那么1到7亿行。将索引除以5000得到商,然后对商进行分组。
​行号0到4999将有商0,行号5000到9999将有商1,行号10000到14999将有商2…等等
explodeddf.withcolumn(“index”,row\u number().over(window.orderby(单调递增的\u id())).withcolumn(“id”,floor($“id”/5000).groupby(“id”).agg(collect\u list($“gps\u array”))
我相信索引部分会将所有记录带到一个spark worker节点,因为它必须在每个记录上自动递增。
能否更有效地将每个请求批处理成5000个元素?
版本:(spark 2.45和scala 2.11)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题