pyspark通过限制行数来分割密钥

c8ib6hqw  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(410)

我有一个数据框,有3列,如下所示:

+-------+--------------------+-------------+
|  id   |      reports       |      hash   |
+-------+--------------------+-------------+
|abc    | [[1,2,3], [4,5,6]] |     9q5     |
|def    | [[1,2,3], [4,5,6]] |     9q5     |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |
|lmn    | [[1,2,3], [4,5,6]] |     abc     |
|opq    | [[1,2,3], [4,5,6]] |     abc     |
|rst    | [[1,2,3], [4,5,6]] |     abc     |
+-------+--------------------+-------------+

现在我的问题是我需要限制每个散列的行数。
我在想我可以转换散列。 9q5 in 9q5_1 对于前1k排, 9q5_2 对于第二个1k,依此类推,对于散列中的每个值。
有一个类似的帖子,但它是不同的,那里的Dataframe是分裂的,我想保留一个单一的和改变键值。
关于如何做到这一点有什么建议吗?谢谢

vd8tlhqk

vd8tlhqk1#

我找到了解决办法。我使用window函数创建一个新列,其中geohash列中的每个值都有一个增量索引。然后,我应用一个udf函数,该函数根据原始的geohash和索引合成我需要的新哈希值geohash'\ux。

partition_size_limit = 10
generate_indexed_geohash_udf = udf(lambda geohash, index: "{0}_{1}".format(geohash, int(index / partition_size_limit)))
window = Window.partitionBy(df_split['geohash']).orderBy(df_split['id'])
df_split.select('*', rank().over(window).alias('index')).withColumn("indexed_geohash", generate_indexed_geohash_udf('geohash', 'index'))

结果是:

+-------+--------------------+-------------+-------------+-----------------+
|  id   |      reports       |      hash   |    index    | indexed_geohash |
+-------+--------------------+-------------+-------------+-----------------+
|abc    | [[1,2,3], [4,5,6]] |     9q5     |      1      |     9q5_0       |
|def    | [[1,2,3], [4,5,6]] |     9q5     |      2      |     9q5_0       |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |      3      |     9q5_0       |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |      4      |     9q5_0       |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |      5      |     9q5_0       |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |      6      |     9q5_0       |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |      7      |     9q5_0       |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |      8      |     9q5_0       |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |      9      |     9q5_0       |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |      10     |     9q5_1       |
|ghi    | [[1,2,3], [4,5,6]] |     9q5     |      11     |     9q5_1       |
|lmn    | [[1,2,3], [4,5,6]] |     abc     |      1      |     abc_0       |
|opq    | [[1,2,3], [4,5,6]] |     abc     |      2      |     abc_0       |
|rst    | [[1,2,3], [4,5,6]] |     abc     |      3      |     abc_0       |
+-------+--------------------+-------------+-------------+-----------------+

编辑:史蒂文的回答也非常有效

partition_size_limit = 10
window = Window.partitionBy(df_split['geohash']).orderBy(df_split['id'])
df_split.select('*', rank().over(window).alias('index')).withColumn("indexed_geohash", F.concat_ws("_", F.col("geohash"), F.floor((F.col("index") / F.lit(partition_size_limit))).cast("String")))

相关问题