hadoop pig脚本-union with condition

btqmn9zl  于 2021-06-24  发布在  Pig
关注(0)|答案(2)|浏览(278)

我对Pig是完全陌生的。我想使用iid字段合并两个文件a和b,但是我不想输出有a没有的iid(来自b)。这似乎很简单,但我不知道如何正确地做。
下面是我的示例代码,它只包含union:

a = load '$input' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
b = load '$data' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
out = union onschema a,b;
singled = distinct out;
ordered = order singled by iid;
store ordered into '$output';

下面的示例数据只有3列来描述我的期望。请注意,字段实际上是以制表符分隔的。
样本数据a:

1  Name   Tom Linkon
1  Title  Professor
2  Name   Whatever
2  Title  Worker

样本数据b:

1  City  New York
2  City  Columbus
3  City  Fake fake
4  City  Blah Bla

样本输出

1  Name   Tom Linkon
1  Title  Professor
1  City   New York
2  Name   Whatever
2  Title  Worker
2  City   Columbus

非常感谢你的帮助!

zlwx9yxi

zlwx9yxi1#

使用 COGROUP 使用相同的键组织记录,但避免 JOIN 的不受欢迎的交叉积。那么 FILTER 看包里有没有 b 的记录是空的,拆分回两个关系,并执行 UNION :

a = load '$input' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
b = load '$data' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
c = COGROUP a BY iid, b BY iid;
c_filt = FILTER c BY NOT IsEmpty(b);
a_new = FOREACH c_filt GENERATE group AS iid, FLATTEN(a);
b_new = FOREACH c_filt GENERATE group AS iid, FLATTEN(b);
out = UNION ONSCHEMA a_new, b_new;
singled = DISTINCT out;
STORE (ORDER singled BY iid) INTO '$output';

然而,我不喜欢这个解决方案——对于这样一个简单的操作来说,它有太多的行和新的关系。真正需要的是一种把两个袋子合二为一的方法。Pig显然没有提供这个(尽管如果有,请回答这个问题)。不过,您可以编写一个简单的自定义项:

public class MERGE extends EvalFunc<DataBag> {
    public DataBag exec(Tuple input) throws IOException {
        DataBag b = new DefaultDataBag();
        try {
            if (input != null)
                for (int i = 0; i < input.size(); i++)
                    b.addAll((DataBag) input.get(i));
        } catch (Exception e) { return null; }
        return b;
    }
}

有了这个自定义项,解决方案就变成了:

a = load '$input' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
b = load '$data' as (iid:int, field:chararray, v1:chararray, v2:chararray, v3:chararray);
c = FOREACH (COGROUP a BY iid, b BY iid) GENERATE group AS iid, MERGE(a,b) AS bag;
out = FOREACH c {
    uniq = DISTINCT bag;
    GENERATE iid, FLATTEN(bag);
};
STORE (ORDER out BY iid) INTO '$output';

这种方法的另一个优点是,如果您有多个输入,则不需要执行多个输入 FOREACH 在那之后 COGROUP . 只需在下面的语句中添加更多参数 MERGE :

c = FOREACH (COGROUP a BY iid, b BY iid, ..., z BY iid)
    GENERATE group AS iid, MERGE(a,b,...,z) AS bag;
js5cn81o

js5cn81o2#

这个应该能解决你的问题:

f1 = LOAD '/user/hadoop/f1' USING PigStorage('\t') AS (id_f1:int, key_f1:chararray, value_f1:chararray);
f2 = LOAD '/user/hadoop/f2' USING PigStorage('\t') AS (id_f2:int, key_f2:chararray, value_f2:chararray);
f3 = JOIN f1 by id_f1 LEFT OUTER, f2 BY id_f2;
f4 = FOREACH f3 GENERATE id_f1, key_f1, value_f1;
f5 = FOREACH f3 GENERATE id_f2, key_f2, value_f2;
f6 = UNION f4, f5;
f7 = DISTINCT f6;
f8 = ORDER f7 BY $0;
DUMP f8;

相关问题