如何在pig中生成重复组?

5us2dqdw  于 2021-06-24  发布在  Pig
关注(0)|答案(1)|浏览(326)

我使用pig来准备数据,我遇到了一个看似简单但却无法解决的问题:例如,我有一列名字input:-

id |  name  
-------------
 1  |  Alicia
 2  |  Ana   
 3  |  Benita
 4  |  Berta 
 5  |  Bertha

我期待你的到来output:-(我们可以使用forloop功能来实现这一点吗?)

id     |  name  
--------------------------
 1_XX_1  |  Alicia_id_1
 2_XX_1  |  Ana_id_1   
 3_XX_1  |  Benita_id_1
 4_XX_1  |  Berta_id_1
 5_XX_1  |  Bertha_id_1

 1_XX_2  |  Alicia_id_2
 2_XX_2  |  Ana_id_2   
 3_XX_2  |  Benita_id_2
 4_XX_2  |  Berta_id_2
 5_XX_2  |  Bertha_id_2

 1_XX_3  |  Alicia_id_3
 2_XX_3  |  Ana_id_3   
 3_XX_3  |  Benita_id_3
 4_XX_3  |  Berta_id_3
 5_XX_3  |  Bertha_id_3
xqnpmsa8

xqnpmsa81#

您可以使用udf来实现这一点,这将为您提供一些关于输入想要被复制的次数的可重用性。下面的自定义项就可以做到这一点。

package pigexerciseudf;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class replicateinput extends EvalFunc<DataBag>
{
    public replicateinput()
    {

    }
    int rep_factor=0;
    public replicateinput(String a)
    {
        rep_factor=Integer.parseInt(a);
    }

    public DataBag exec(Tuple input) throws IOException
    {
        BagFactory bf=BagFactory.getInstance();
        DataBag output=bf.newDefaultBag();
            try
            {
            for(int i=1;i<=rep_factor;i++)
            {
                TupleFactory tp=TupleFactory.getInstance();
                Tuple t1=tp.newTuple(2);
                String key=(String)input.get(0);
                System.out.println("key="+key);
                String value=(String)input.get(1);
                String key_out=key+"_XX_"+i;
                String value_out=value+"_id_"+i;
                t1.set(0,key_out);
                t1.set(1,value_out);
                output.add(t1);
            }
            return output;
            }   
            catch(Exception e)
            {
                throw new IOException(e);
            }
    }

    public Schema outputschema(Schema input)
    {
        try
        {
        List<Schema.FieldSchema> mylist=new ArrayList<Schema.FieldSchema>();
        mylist.add(new Schema.FieldSchema("key_out",DataType.CHARARRAY));
        mylist.add(new Schema.FieldSchema("value_out",DataType.CHARARRAY));
        Schema tupleschema=new Schema(mylist);
        Schema bagschema=new Schema(new Schema.FieldSchema("pair",tupleschema,DataType.TUPLE));
        Schema returnbagsc=new Schema(new Schema.FieldSchema("pairs",bagschema,DataType.BAG));
        return returnbagsc;
        }
        catch(FrontendException e)
        {
            throw new RuntimeException("not able to defime the schema");
        }
    }
}

输入文件:
1,艾丽西亚
2,安娜
3,贝尼塔
4,伯塔
5,伯莎

REGISTER '/path/to/pigexerciseudf.jar';
define replicat pigexerciseudf.replicateinput('3');                                     
A = LOAD '/home/hduser/exer.dat' using PigStorage(',') as (a:chararray,b:chararray);    
B = FOREACH A GENERATE FLATTEN(replicat(a,b)) as (line:chararray) ;                     
dump B;

输出:
(1\ xx\ 1,alicia\ id\ 1)
(1\u xx\u 2,alicia\u id\u 2)
(1\ xx\ U 3,艾丽西亚\ id\ U 3)
(2\u xx\u 1,ana\u id\u 1)
(2×2,全名2)
(2×3,全名3)
(3\ xx\ U 1,benita\ U id\ U 1)
(3\ xx\ U 2,贝尼塔\ id\ U 2)
(贝尼塔3号)
(4\ xx\ 1,伯塔id\ 1)
(4\ xx\ U 2,伯塔id\ U 2)
(4\ xx\ U 3,伯塔id\ U 3)
(5号,伯莎1号)
(5\ xx\ U 2,伯莎\ id\ U 2)
(5号,伯莎3号)

相关问题