无法将pig元组传递给python udf

zour9fqk  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(280)

我有master.txt,它有10k条记录,所以每一行都是一个元组&所有的元组都需要传递给python udf。由于它有多条记录,所以在存储p2preportmap时会出现以下错误。请帮忙
错误如下:
无法打开别名p2preportmap的迭代器。后端错误:org.apache.pig.backend.executionengine.executeption:错误0:标量在输出中有多行。第一个:(010301,mts,mm),第二个:(010b06,mts,tn)(共同原因:“join”然后“foreach…”。。。generate foo.bar“应该是”foo::bar“)
清管器脚本如下:

REGISTER 'smsiuc_udf.py' using streaming_python as smsiuc_udfs;
cdrs = load '2016040111*' USING PigStorage('|','-tagFile') ;

mastergtrec = load 'master.txt' USING PigStorage(',','-tagFile');

mastergt = FOREACH mastergtrec GENERATE (chararray) UPPER($1) as opcdpc, (chararray) UPPER($2) as gtoptname,(chararray) UPPER($3) as gtoptcircle;

mastergttup = FOREACH mastergt generate TOTUPLE(opcdpc,gtoptname,gtoptcircle) as mstgttup;

cdrrecord = FOREACH cdrs GENERATE (chararray) UPPER($1) as aparty, (chararray) UPPER($2) as bparty,$3 as smssentdate,$4 as smssenttime,($29=='6' ? 'S' : 'F') as status,(chararray) UPPER($26) as srcgt,(chararray) UPPER($27) as destgt,($12=='405899136999995' ? 'MTSDEL-CDMA' : ($12=='919875089998' ? 'MTSRAJ-GSM' : ($12=='405899150999995' ? 'MTSCHN-CDMA' : $12) ) ) as smscgt, (chararray)$0 as cdrfname,(chararray) $13 as prepost;

filteredp2pcdrs = FILTER cdrrecord by smsiuc_udfs.pullp2pcdrs(aparty,bparty,srcgt,destgt) and status == 'S' and SUBSTRING(smssentdate,4,6) == '$MON';

groupp2pcdrs = GROUP filteredp2pcdrs by (srcgt,destgt,aparty,bparty,smscgt,status,prepost);

distinctp2pcdrs= FOREACH groupp2pcdrs {
uniq = DISTINCT filteredp2pcdrs.(srcgt,destgt,aparty,bparty,smscgt,status,prepost);
GENERATE FLATTEN(group),COUNT(uniq) as cnt;
};

p2preportmap = FOREACH distinctp2pcdrs GENERATE smsiuc_udfs.p2preport(srcgt,destgt,aparty,bparty,mastergttup ),smscgt,status,prepost,cnt
vkc1a9a2

vkc1a9a21#

这可以通过添加一个虚拟列然后分组来完成。
dummy=foreach p2preportmap生成1,$0,$1。。。。
分组=按$0分组

mec1mxoz

mec1mxoz2#

我举一个例子,我有两个关系a和b

1,2,3
3,4,5
4,5,6

b

1
2
3
1
2
3
1
2
3

现在我想要一个PythonUDF,它可以查找打印输出的第一列,如下所示。

((1,{(1,2,3)}))
((2,))
((3,{(3,4,5)}))
((1,{(1,2,3)}))
((2,))
((3,{(3,4,5)}))
((1,{(1,2,3)}))
((2,))
((3,{(3,4,5)}))

首先我把一个列按第一列分组,然后按1分组,这样我就有一行了

c = group A by $0
e = group c by 1

python自定义项如下所示

def pythonudf(value,map):
    print map
    temp = None
    for a in map:
        if a[0] == value:
            temp = a[1]
    return value,temp

现在你用这个自定义项

D = foreach B generate myudf.pythonudf($0,e.$1);

相关问题