如何在apache pig输出文件中均匀分布数据?

vaj7vani  于 2021-06-25  发布在  Pig
关注(0)|答案(2)|浏览(299)

我有一个pig拉丁语脚本,它接收一些xml,使用xpath udf提取一些字段,然后存储结果字段:

REGISTER udf-lib-1.0-SNAPSHOT.jar;
DEFINE XPath com.blah.udfs.XPath();

docs = LOAD '$input' USING com.blah.storage.XMLLoader('root') as (content:chararray);

results = FOREACH docs GENERATE XPath(content, 'root/id'), XPath(content, 'root/otherField'), content;

store results into '$output';

注意,我们在集群上使用pig-0.12.0,所以我从pig-0.14.0中删除了xpath/xmloader类,并将它们放在自己的jar中,以便在0.12中使用它们。
上面的脚本运行良好,并生成我要查找的数据。但是,它生成1900多个零件文件,每个文件中只有几MB。我了解了默认的\u parallel选项,所以我将其设置为128以尝试获取128个partfiles。我最后不得不添加一个片段来强制一个reduce阶段来实现这一点。我的脚本现在看起来像:

set default_parallel 128;
REGISTER udf-lib-1.0-SNAPSHOT.jar;
DEFINE XPath com.blah.udfs.XPath();

docs = LOAD '$input' USING com.blah.storage.XMLLoader('root') as (content:chararray);

results = FOREACH docs GENERATE XPath(content, 'root/id'), XPath(content, 'root/otherField'), content;

forced_reduce = FOREACH (GROUP results BY RANDOM()) GENERATE FLATTEN(results);
store forced_reduce into '$output';

同样,这会产生预期的数据。另外,我现在得到128个零件文件。我现在的问题是数据在零件文件中的分布不均匀。有些有8兆,有些有100兆。我在按random():)对它们进行分组时应该预料到这一点。
我的问题是,在保持零件文件大小均匀的情况下,限制零件文件数量的首选方法是什么?我对pig/pig拉丁语不太熟悉,我想我是用一种完全错误的方式来做这件事的。
p、 我之所以关心部件文件的数量,是因为我想用spark处理输出,我们的spark cluster似乎用较少的文件做得更好。

k10s72fa

k10s72fa1#

从第一个代码片段开始,我假设它是map only作业,因为您没有使用任何聚合。
设置pig.maxcombinedsplitsize属性,而不是使用reducer

REGISTER udf-lib-1.0-SNAPSHOT.jar;
    DEFINE XPath com.blah.udfs.XPath();

    docs = LOAD '$input' USING com.blah.storage.XMLLoader('root') as (content:chararray);

    results = FOREACH docs GENERATE XPath(content, 'root/id'), XPath(content, 'root/otherField'), content;

    store results into '$output';        
exec;
        set pig.maxCombinedSplitSize 1000000000; -- 1 GB(given size in bytes)
        x = load '$output' using PigStorage();
        store x into '$output2' using PigStorage();

pig.maxcombinedsplitsize-设置此属性将确保每个Map器读取大约1 gb的数据,并且以上的代码作为标识Map器作业工作,这有助于以1gb的部分文件块写入数据。

pokxtpni

pokxtpni2#

我仍在寻找一种直接从pig脚本执行此操作的方法,但目前我的“解决方案”是在spark进程中重新划分数据,spark进程对pig脚本的输出起作用。我使用 RDD.coalesce 函数来重新平衡数据。

相关问题