数据集有一个从其主排序键构建的分区键在查询时,spark不会搜索特定分区中的一个键并将其全部读取

qq24tv8q  于 2021-07-12  发布在  Spark
关注(0)|答案(2)|浏览(408)

使用apachespark,我创建了一些关于法国城市的会计数据。
这个 Dataset 具有以下主要字段:

city_code, establishment_id, account_number, amount, city name,  department  
29045    , 2904521051      , 6105          , 23.51 , Dirinon  ,  29
29046    , 2907425498      , 4031          , 17.20 , Douarnenez, 29

它是按 orderBy("city_code", "establishment_id", "account_number") 然后被一个 repartition(col("department")) 他们在法国有100多个部门: 0195 ,以及一些特殊情况,如 2A , 2B ,和 971 , 972 , 973 , 974 , 976 . 部门是由三个字符组成的字符串。
保存在 Parquet 文件夹。
我看了一下Parquet文件夹,发现里面有200个街区。
我有点惊讶:我不是应该找到大约100个吗?每个部门一个?
然后,我尝试一个查询。我要这个城市的会计数据 29046 . 它是函数中唯一一个返回其所有机构的相关帐户的参数。
我收到他们很好,但我的日志告诉我,我的Parquet文件的所有块都是红色的这样做。我原以为只需要一个:包含分区的那个 29 .
我开始怀疑自己:但我为什么要相信这一点?apachespark怎么知道任何一个城市的代码的形状 29 必须在有代码的分区中搜索 29 ?
我不太懂分区,这里。我把一边的键和另一边的分区键搞混了:它们的链接不是那么紧密,我相信。
我在这里写了多少错误,我应该怎么做才能达到我想要的结果?
我已经尝试了麦克在回答中提出的改变。
comptes 首先,我用一个Parquet文件执行了这些操作,没有 substr(city_code, 1, 2) 对于分区:

comptes = comptes.orderBy("codeCommune", "siret", "numeroCompte");
comptes = comptes.withColumn("partitionCommune", substring(col("codeCommune"), 1, 2));
comptes = comptes.repartition(col("partitionCommune"));
comptes.write().parquet("myStore");

然后是对 codeCommune 29046

Dataset<Row> comptes = session.read().parquet("myStore");
comptes.where(col("codeCommune").equalTo("29046")).show();

数据集的生成和磁盘写入(200块)需要697秒。
城市代码的查询尝试 29046 读取所有的200个块并取9。
然后,我将编写parquet文件的行与这个行交换,并重新运行parquet文件的生成和查询。现在它正在创建一个带有分区的Parquet文件:

comptes.write().partitionBy("partitionCommune").parquet("myStore");

数据集的生成及其在磁盘上的写入(近500个块)需要875秒。
查询 city_code 29046读取所有500个块,也需要大约9秒。
(编辑:我有一个bug orderBy 在写入Parquet文件之前,transform不是最后一个,我的数据没有被排序,导致文件结尾有20000个块,因为试图将未排序的数据作为分区写入!)
所以,它起作用了。

pgky5nke

pgky5nke1#

从医生那里,如果你知道的话 df.repartition 基于列,并且不指定分区数,它将生成默认的分区数,即200。所以你有200个街区。
作者的行为( .write.partitionBy )与Dataframe略有不同。它将创建的分区(也称为块/文件)的数量等于您指定的分区列中的不同值的数量( N ). 如果不指定分区,默认行为是将Dataframe的每个分区转储到一个单独的文件中。如果您指定了一个,那么行为将是将Dataframe的每个分区转储到 N 单独的文件(不转储空分区)。
要实现所需的功能,方法是指定Dataframe和写入程序的分区。例如

df.repartition('col').write.partitionBy('col').parquet('path')

请注意,我想你可以按城市代码的前两位和部门进行划分。

df.withColumn('city_code_first_two', F.substring('city_code', 1, 2)).repartition('department', 'city_code_first_two').write.partitionBy('department', 'city_code_first_two').parquet('path')

这将产生相同数量的文件,前提是文件之间存在1:1的关系 department 以及 city_code_first_two .

qcuzuvrc

qcuzuvrc2#

“我希望spark在特定分区中搜索密钥,但事实并非如此”
spark不跟踪哪些键将被分配到哪个分区。这意味着,如果在用于分区的列中筛选某个特定值,它仍然必须遍历所有分区。
按列对数据集进行分区只会确保列中的相同值将被分配到同一分区,这可以在处理数据时提高性能,因为每个分区都是在单个执行器上进行窄转换的。
请记住,spark是一个分布式计算引擎,而不是一个可转位的数据存储。
如另一个答案中所述,您可以在存储数据时按一列或多列对数据进行分区。此外,您还可以存储数据。
顺便说一句,如果您发现200太多,您可以随时指定分区的数量

df.repartition(#ofPartitions, Column... partitionExprs)

相关问题