如何在pig拉丁语中执行“updatewhere”操作,该操作涉及来自两个别名的未知记录数的变量?

9ceoxa92  于 2021-06-25  发布在  Pig
关注(0)|答案(2)|浏览(262)

我对Pig不熟悉。下面是一些我要实现的伪代码:

FOREACH split_records {
  UPDATE updated_volume SET 
  open=updated_volume.open*split_records.multiply_by/split_records.divide_by,
  close=updated_volume.close*split_records.multiply_by/split_records.divide_by

  WHERE split_records.symbol=updated_volume.symbol AND 
  updated_volume.date < split_records.split_date
}

以下是我到目前为止关于Pig的代码:

FOREACH split_records {
  SPLIT updated_volume INTO split_yes IF updated_volume.symbol==split_records.symbol AND 
  updated_volume.date < split_records.splitDate, split_no IF 
  updated_volume.symbol!=split_records.symbol OR 
  updated_volume.date > split_Records.splitDate;

  updated_splits = FOREACH split_yes GENERATE
  symbol,
  date,
  (split_yes.open*split_records.multiply_by/split_records.divide_by) AS open,
  (split_yes.close*split_records.multiply_by/split_records.divide_by) AS close;

  updated_volume = UNION updated_splits, split_no;
};

上面的代码给了我一个错误:不匹配的输入'split'需要generate,所以它肯定不起作用。但基本上,我尝试模拟一个“update..where”操作,其中where条件依赖于一个变量,该变量是遍历另一组记录的结果,其长度/计数未知。
我有一个模糊的印象,即pig不是那种用于迭代的语言,所以我对实现这一点的任何方法都持开放态度。

t30tvxxf

t30tvxxf1#

我认为这段代码做了一些与您试图做的事情类似的事情。对于更新的\u卷中的每条记录,它将应用对其进行后期处理的所有相应的拆分\u记录。

cgrp = COGROUP updated_volume BY symbol, split_records BY symbol;
SPLIT cgrp INTO
    did_split IF SIZE(split_records) > 0,
    did_not_split OTHERWISE;

-- reflatten data for symbols that did not split
not_updated     =   FOREACH did_not_split GENERATE
                        FLATTEN(updated_volume);

-- update data for symbols that did split
to_be_updated   =   FOREACH did_split GENERATE
                        FLATTEN(updated_volume) AS (symbol, volume_date, open, close),
                        split_records;
updated         =   FOREACH to_be_updated {
                        applicable_splits = FILTER split_records BY date >= volume_date;
                        GENERATE
                            symbol, volume_date AS date,
                            -- NOTE: you would have to write a quick udf
                            --       in jython (or java) to calculate the product
                            --       of a bag of numbers
                            open * my_udfs.product(split_records.multiply_by) / PRODUCT(split_records.divide_by)
                            AS open,
                            close * my_udfs.product(split_records.multiply_by) / PRODUCT(split_records.divide_by)
                            AS close;
                    }

updated_volume  = UNION updated, not_updated;
ndh0cuux

ndh0cuux2#

你能用(条件)吗?true:对于“打开”和“关闭”列为false。

相关问题