pig连接包括必须用包外的值过滤的包

c90pui9n  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(316)

我来加速Pig和结合网络日志数据和股票定价历史从两个来源。将日期/时间标准化为时间戳,并对股票符号执行联接。时间戳不匹配。

jnd = JOIN web_time BY w_sym, stock_sort BY group;

该组包含一包特定于符号的股票数据。这是组合模式。
jnd:{web\u time::ip:chararray,web\u time::user:chararray,web\u time::w\u time:long,web\u time::url:chararray,stock\u sort::sort:{(sym:chararray,time:long,price:double)}
我需要使用web\u时间::w\u时间和时间过滤库存\u分拣袋,它不是完全匹配的。示例jnd数据如下所示。
(14.192.253.226,贪婪,1213201721000,“get/vlccf.html http/1.0”,{(vlccf,1265361975000,13.84),(vlccf,126526256000,14.16),(vlccf,1265192740000,14.44),(vlccf,1265099390000,14.48),(vlccf,1265028034000,14.5),(vlccf,1262678148000,13.76),(vlccf,1262607761000,13.82),(vlccf,1233832497000,16.9),(vlccf,1233740569000,16.96)(vlccf,884004754000,23.99,(vlccf,883720431000,23.57)})
使用$2中的值,最终我需要过滤除一个条目以外的所有条目,但目前我正在尝试删除时间戳较小的元组。

flake = FOREACH jnd {
    fits = FILTER jnd BY (w_time > time);
    GENERATE ip, user, w_time, url, fits;
    }

上面的方法不起作用,步骤1是删除所有时间戳小于所需时间(w\u time)的包元组。时间不是团队的一部分。这真的需要一个自定义项还是我遗漏了一些简单的东西?我停滞不前。

开发环境

apache pig版本0.15.0.2.4.0.0-169(rexported)编译于2016年2月10日,07:50:04 hadoop 2.7.1.2.4.0.0-169 subversiongit@github.com:hortonworks/hadoop.git-r 26104d8ac833884c8776473823007f17 4节点hortonworks集群
任何意见都将不胜感激。

az31mfrm

az31mfrm1#

我认为在你的foreach中,你需要过滤股票。不是jnd。并且过滤应该通过jnd.w\u time>time来完成。我设法写出了整个流程;没有自定义项。见下文。
获取了两个文件:
xact.txt文件:

VLCCF,1265361975000,13.84
VLCCF,1265262560000,14.16
VLCCF,1265192740000,14.44
VLCCF,1265099390000,14.48
VLCCF,1265028034000,14.5
VLCCF,1262678148000,13.76
VLCCF,1262607761000,13.82
VLCCF,1233832497000,16.9
VLCCF,1233740569000,16.96
VLCCF,884004754000,23.99
VLCCF,883720431000,23.5

股票.txt
14.192.253.226,贪婪,1213201721000,“get/vlccf.html http/1.0”,vlccf

stock = load 'stock.txt' using PigStorage(',') as (
ip:chararray,
user:chararray,
w_time:long,
url:chararray,
symbol:chararray
);

xact = load 'xact.txt' using PigStorage(',') as (
symbol:chararray,
time:long,
price:double
);

xact_grouped = foreach(group xact by symbol) generate
    group, xact;

joined = join stock by symbol, xact_grouped by group;

filtered = foreach joined {
    grp = filter xact by time < joined.w_time;
    generate ip, grp;
};

dump filtered;

给了我
(14.192.253.226,{(vlccf,884004754000,23.99),(vlccf,883720431000,23.5)})
编辑:或者

stock = load 'stock.txt' using PigStorage(',') as (
ip:chararray,
user:chararray,
w_time:long,
url:chararray,
symbol:chararray
);

xact = load 'xact.txt' using PigStorage(',') as (
symbol:chararray,
time:long,
price:double
);

joined = join stock by symbol, xact by symbol;

joined_filtered = foreach (filter joined by time < w_time) generate
    ip as ip,
    user as user,
    w_time as w_time,
    stock::symbol as symbol,
    time as time,
    price as price;

grouped = foreach (group joined_filtered by (ip, user, w_time)) generate
    flatten(group),
    joined_filtered;

相关问题