lead/lag函数

vmjh9lq9  于 2021-06-25  发布在  Pig
关注(0)|答案(3)|浏览(393)

apache pig中是否有类似于sql中的lead/lag函数的函数?或者任何可以回溯到前一行记录的pig函数?

2w3kk1z5

2w3kk1z51#

是的,有预定义的功能。请参阅piggybank中的over()和stitch()方法。over()在文档中列出了一些示例。

dfddblmv

dfddblmv2#

好吧,这是我的第一次尝试。请注意,我今天刚开始学习如何编写自定义项。
maven的pom.xml文件包含:

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>2.0.0-cdh4.1.0</version>
</dependency>
...

java udf类:

import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;

public class GenericLag extends EvalFunc<String>{

    private String lagObject = null;

    @Override
    public String exec(Tuple input) throws IOException {
        try {
            String returnObject = getLagObject();
            setLagObject(input.get(0).toString());
            return returnObject;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    public String getLagObject() {
        return lagObject;
    }
    public void setLagObject(String lagObject) {
        this.lagObject = lagObject;
    }

}

起初,我用 Object 而不是 String 你在上面看到的每一处“字符串”,但我收到了这个错误: ERROR org.apache.pig.tools.grunt.Grunt - ERROR 2080: Foreach currently does not handle type Unknown 我不得不发出 setLagObject(input.get(0).toString()); 而不是 setLagObject(input.get(0); 否则我会收到如下错误: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String java.lang.ClassCastException: java.lang.Double cannot be cast to java.lang.String java.lang.ClassCastException: org.joda.time.DateTime cannot be cast to java.lang.String 以下是我在pig中的用法:

REGISTER /path/to/compiled/file.jar
DEFINE LAG fully.qualified.domain.name.GenericLag();

A = LOAD '/hdfs/path/to/directory' USING PigStorage(',') AS (
    important_order_by_field:int
    ,second_important_order_by_field:string
    ,...
    ,string_field_to_lag:string
    ,int_field_to_lag:int
    ,date_field_to_lag:string
);
B = FOREACH A GENERATE
    important_order_by_field
    ,second_important_order_by_field
    ,...
    ,string_field_to_lag
    ,int_field_to_lag
    ,ToDate(date_field_to_lag, 'yyyy-MM-dd HH:mm:ss')
    ;
C = ORDER A BY important_order_by_field, second_important_order_by_field
D = FOREACH B GENERATE
    important_order_by_field
    ,second_important_order_by_field
    ,...
    ,LAG(string_field_to_lag) AS lag_string
    ,(int) LAG(int_field_to_lag) AS lag_int
    ,(date_field_to_lag IS NULL ? 
        null : 
        ToDate(SUBSTRING(REPLACE(LAG(date_field_to_lag), 'T', ' ') ,0,19), 'yyyy-MM-dd HH:mm:ss')) AS lag_date
    ;
DUMP D;

如果我最后一句话是这样的: ToDate(SUBSTRING(REPLACE(LAG(date_field_to_lag), 'T', ' ') ,0,19), 'yyyy-MM-dd HH:mm:ss') AS lag_date 它将返回以下错误 ERROR org.apache.pig.tools.grunt.Grunt - ERROR 1066: Unable to open iterator for alias LAGGED_RHODES. Backend error : null 检查日志时会发现: java.lang.NullPointerException at org.joda.time.format.DateTimeFormatterBuilder$NumberFormatter.parseInto(DateTimeFormatterBuilder.java:1200) 因为第一行将包含空值。

trnvg8h3

trnvg8h33#

这里有一个替代方案:

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;

public class GenericLag2 extends EvalFunc<Tuple>{

    private List<String> lagObjects = null;

    @Override
    public Tuple exec(Tuple input) throws IOException {
        if (lagObjects == null) {
            lagObjects = new ArrayList<String>();
            return null;
        }
        try {
            Tuple output = TupleFactory.getInstance().newTuple(lagObjects.size());
            for (int i = 0; i < lagObjects.size(); i++) {
                output.set(i, lagObjects.get(i));
            }

            lagObjects.clear();

            for (int i = 0; i < input.size(); i++) {
                lagObjects.add(input.get(i).toString());
            }
            return output;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    @Override
    public Schema outputSchema(Schema input) {
        Schema tupleSchema = new Schema();
        try {
            for (int i = 0; i < input.size(); i++) {
                tupleSchema.add(new FieldSchema("lag_" + i, DataType.CHARARRAY));
            }
            return new Schema(new FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), tupleSchema, DataType.TUPLE));
        } catch (FrontendException e) {
            e.printStackTrace();
            return null;
        }
    }
}

我想这会更快,但我不确定,因为您必须执行以下操作:

... 
C = ORDER A BY important_order_by_field, second_important_order_by_field
D = FOREACH B GENERATE
    important_order_by_field
    ,second_important_order_by_field
    ,...
    ,FLATTEN(LAG(
        string_field_to_lag
        ,int_field_to_lag
        ,date_field_to_lag
    ))
    ;
E = FOREACH D GENERATE
    important_order_by_field
    ,second_important_order_by_field
    ,...
    ,string_field_to_lag
    ,(int) int_field_to_lag
    ,(date_field_to_lag IS NULL ? 
        null :
        ToDate(SUBSTRING(REPLACE(date_field_to_lag, 'T', ' '), 0, 19),  'yyyy-MM-dd HH:mm:ss')) 
        as date_field_to_lag
;
DUMP E;

相关问题