我有一个带有数组类型列的Dataframe,其中行之间的元素数不同,如下面输入Dataframe的gps\u array\u size所示。
我需要通过将“requestid”字段嵌入到每个元组中(如输出Dataframe中所示),向一个一次包含5000个元素的外部服务发布一个http请求。
输入Dataframeschema:-
root
|-- requestid: string (nullable = true)
|-- GPS_Array: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- timestamp: double (nullable = true)
| | |-- GPSLatitude: double (nullable = true)
| | |-- GPSLongitude: double (nullable = true)
|-- GPS_Array_Size: long (nullable = true)
输入dataframe:-
requestidgps\ U arraygps\ U array\ U sizeaaa[{“timestamp”:a1,“gpslatitude”:a1,“gpslatitude”:a1},{“timestamp”:a2,“gpslatitude”:a2},{“timestamp”:a6431,“gpslatitude”:a6431bbb[{“timestamp”:b1,“gpslatitude”:b1},{“timestamp”:b2,“gpslatitude”:b2,“gpslatitude”:b2},{“timestamp”:b1876,“gpslatitude”:b11876,“gpslatitude”:b11876}]11876ccc[{“timestamp”:c1,“gpslatitude”:c1,“gpslatitude”:c1},{“timestamp”:c2,“gpslatitude”:c2},{“timestamp”:c763,“gpslatitude”:c763,“gpslatitude”:c763}]763dd[{“timestamp”:d1,“gpslatitude”:d1},{“timestamp”:d2,“gpslatitude”:d2},…{“timestamp”:d5187,“gpslatitude”:d5187,“gpslatitude”:d5187}]5187eee[{“timestamp”:e1,“gpslatitude”:e1,“gpslatitude”:e1},{“timestamp”:e2,“gpslatitude”:e2,“gpslatitude”:e2},{“timestamp”:e1023,“gpslatitude”:e1023,“gpslatitude”:e1023}]1023
输出Dataframeschema:-
root
|-- New_GPS_Array: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- requestid: string (nullable = true)
| | |-- timestamp: double (nullable = true)
| | |-- GPSLatitude: double (nullable = true)
| | |-- GPSLongitude: double (nullable = true)
|-- New_GPS_Array_Size: long (nullable = true)
输出dataframe:-
新的\u gps\u阵列新的\u gps\u阵列\u大小[{“requestid”:“aaa\u 1”,“timestamp”:a1,“gpslatitude”:a1,“gpslongitude”:a1},{“requestid”:“aaa\u 1”,“timestamp”:a2,“gpslatitude”:a2,“gpslongitude”:a2},{“requestid”:“aaa\u 1”,“timestamp”:a5000,“gpslatitude”:a5000]5000[{“requestid”:“aaa\u 2”,“timestamp”:a5001,“gpslatitude”:a5001,“gpslongitude”:a5001},{“requestid”:“aaa\u 2”,“timestamp”:a5002,“gpslatitude”:a5002,“gpslatitude”:a5002},{“requestid”:“aaa\u 2”,“timestamp”:a6431,“gpslatitude”:a6431,“gpslatitude”:a6431},{“requestid”:“bbb\u 1”,“timestamp”:b1,“gpslatitude”:b1,“gpslatitude”:b1},{“requestid”:“bbb\u 1”,“timestamp”:b3569,“gpslatitude”:b3569,“gpslatitude”:b3569}]5000[{“requestid”:“bbb\u 2”,“timestamp”:b3570,“gpslatitude”:b3570,“gpslngitude”:b3570},{“requestid”:“bbb\u 2”,“timestamp”:b3571,“gpslatitude”:b3571,“gpslngitude”:b3571},{“requestid”:“bbb\u 2”,“timestamp”:b8569,“gpslatitude”:b8569,“gpslngitude”:b8569}]5000[{“requestid”:“bbb\u 3”,“timestamp”:b8570,“gpslatitude”:b8570,“gpslngitude”:b8570},{“requestid”:“bbb\u 3”,“timestamp”:b8571,“gpslatitude”:b8571,“gpslngitude”:b8571},{“requestid”:“bbb\u 3”,“timestamp”:b11876,“gpslatitude”:b11876,“gpslngitude”:b11876},{“requestid”:“ccc\u 1”,“timestamp”:c1,“gpslatitude”:c1,“gpslngitude”:c1},{“requestid”:“ccc\u 1”,“timestamp”:c763,“gpslatitude”:c763,“gpslngitude”:c763},{“requestid”:“ddd\u 1”,“timestamp”:d1,“gpslatitude”:d1,“gpslatitude”:d1},{“requestid”:“ddd\u 1”,“timestamp”:d930,“gpslatitude”:d930,“gpslatitude”:d930}]5000[{“requestid”:“ddd\u 2”,“timestamp”:d931,“gpslatitude”:d931,“gpslatitude”:d931},{“requestid”:“ddd\u 2”,“timestamp”:d932,“gpslatitude”:d932},{“requestid”:“ddd\u 2”,“timestamp”:d5187,“gpslatitude”:d5187,“gpslongitude”:d5187},{“requestid”:“eee\u 1”,“timestamp”:e1,“gpslatitude”:e1,“gpslongitude”:e1},{“requestid”:“eee\u 1”,“timestamp”:e743,“gpslatitude”:e743,“gpslongitude”:e743}]5000[{“requestid”:“eee\u 2”,“timestamp”:e744,“gpslatitude”:e744},{“requestid”:“eee\u 2”,“timestamp”:e745,“gpslatitude”:e745,“gpslongitude”:e745},…{“requestid”:“eee\u 2”,“timestamp”:e1023,“gpslatitude”:e1023,“gpslongitude”:e1023}]280
如图所示,每个新批的requestid都需要更改。
我的解决方案是使用explode函数为gps\u数组中的每个元素创建一行,然后根据行号分配索引。所以如果爆炸后我有7亿行,那么1到7亿行。将索引除以5000得到商,然后对商进行分组。
行号0到4999将有商0,行号5000到9999将有商1,行号10000到14999将有商2…等等
explodeddf.withcolumn(“index”,row\u number().over(window.orderby(单调递增的\u id())).withcolumn(“id”,floor($“id”/5000).groupby(“id”).agg(collect\u list($“gps\u array”))
我相信索引部分会将所有记录带到一个spark worker节点,因为它必须在每个记录上自动递增。
能否更有效地将每个请求批处理成5000个元素?
版本:(spark 2.45和scala 2.11)
暂无答案!
目前还没有任何答案,快来回答吧!