Hadoop之Pig

x33g5p2x  于2020-10-30 发布在 Pig  
字(10.5k)|赞(0)|评价(0)|浏览(773)

1**、Pig概述**

Pig是一个基于Hadoop的大规模数据分析平台,它提供的SQL-LIKE语言叫做Pig Latin,该语言的编译器会把类SQL的数据分析请求转换为一系列经过优化处理的MapReduce任务。Pig为复杂的海量数据并行计算提供了一个简单的操作和编程接口。用MapReduce进行数据分析,当业务比较复杂的时候,将会是一个很复杂的事情,比如你需要对数据进行很多预处理或转换,以便能够适用MapReduce的处理模式;另一方面,编写MapReduce程序,发布及运行作业将是一个比较耗时的事情。Pig的出现很好地弥补了这一不足,Pig能够让你专心于数据及业务本身,而不是纠结于数据的格式转换以及MapReduce程序的编写。从本质上来说,当你使用Pig进行处理时,Pig本身会在后台生成一系列的MapReduce操作来执行任务,但是这个过程对用户来说是透明的。

相比于Java版的MapReduce API,Pig为大型数据集的处理提供了更高层次的抽象,同时提供了更丰富的数据结构,而且一般都是多值和嵌套的数据结构。Pig还提供了一套更强大的数据变换操作,包括在MapReduce中被忽视的连接Join操作。从Pig的整个执行过程来看,主要包括以下两个部分:

用于描述数据流的语言,称为Pig Latin;
*
用于执行Pig Latin程序的执行环境,当前有两个环境:单JVM中的本地执行环境和Hadoop集群上的分布式执行环境。

在使用过程中,Pig具有多种特点,简单归纳如下:

丰富的运算——它提供了许多运算符来执行诸如join,sort,filter等操作;
*
易于编程——Pig Latin与SQL类似,如果你善于使用SQL,则很容易编写Pig脚本;
*
优化机会——Pig中的任务自动优化其执行,因此程序员只需要关注语言的定义;
*
可扩展性——使用现有的操作符,用户可以开发自己的功能来读取、处理和写入数据;
*
用户自定义函数——Pig提供了在其他编程语言(如Java)中创建用户自定义函数的功能,并且可以调用或嵌入到Pig脚本中;
*
可处理多种数据——Pig可用于分析各种数据,无论是结构化还是非结构化,最终将结果存储到HDFS中。

2**、Apache Pig pk Others**

Apache Pig pk MapReduce

下表列出了Apache Pig和MapReduce之间的主要区别。
Apache Pig

MapReduceApache Pig是一种数据流语言

MapReduce是一种数据处理模式Apache Pig是一种高级语言

MapReduce是低级和刚性的在Apache Pig中执行Join操作非常简单

在MapReduce中执行数据集之间的Join操作是非常困难的任何具备SQL基础知识的新手程序员都可以方便地使用Apache Pig

需要会使用Java进行编程,同时还需要掌握MapReduce编程模型Apache Pig使用多查询方法,从而在很大程序上减少代码的长度

MapReduce将需要几乎20倍的代码行数来执行相同的任务不需要编译,执行时,每个Apache Pig操作符都在内部转换为MapReduce作业

MapReduce作业具有很长的编译过程

Apache Pig pk SQL

下表列出了Apache Pig和SQL之间的主要区别。
Apache Pig

SQLPig Latin是一种程序语言

SQL是一种声明式语言在Apache Pig中,模式是可选的,可以存储数据而无需设计模式(如值存储为$01,$02等)

模式在SQL中是必需的Apache Pig中的数据模型是嵌套关系

SQL中使用的数据模型是平面关系Apache Pig为查询优化提供有限的机会

在SQL中有很多的机会进行查询优化

除了上面的区别,Apache Pig还有如下的不同:

允许在pipeline(流水线)中拆分;
*
允许开发人员在pipeline中的任何位置存储数据;
*
声明执行计划;
*
提供运算符来执行ETL(Extra提取,Transfor转换和Load加载)功能;

Apache Pig pk Hive

Apache Pig和Hive都用于创建MapReduce作业,在某些情况下,Hive以与Apache Pig类似的方式在HDFS上运行,下表列出了它们之间的主要区别。
Apache Pig

HiveApache Pig使用一种名为Pig Latin的语言(最初创建于Yahoo)

Hive使用一种名为HiveSQL的语言(最初创建于Facebook)Pig Latin是一种数据流语言

HiveSQL是一种查询处理语言Pig Latin是一个过程语言,它适合流水线范式

HiveSQL是一种声明性语言Apache Pig可以处理结构化,非结构化和半结构化数据

Hive主要用于结构化数据

3**、Pig的体系结构**

当需要使用Apache Pig执行特定任务时,程序员首先使用Pig Latin语言编写Pig脚本,并使用任意的执行机制(如Grunt Shell,UDFs,Embedded)来执行它们。执行后,这些脚本将通过应用Pig框架的一系列转换来生成所需的输出。在Apache Pig内部,会将这些脚本转换为一系列MapReduce作业,因此,它使得程序员的工作变得简单。Apache Pig的体系结构如下图所示:

Parser**(解析器):**最初,Pig脚本由解析器处理,它检查脚本的语法,类型检查和其他杂项检查,解析器的输出将是DAG(有向无环图),它表示Pig Latin语句和逻辑运算符,在DAG中,脚本的逻辑运算表示为节点,数据流表示为边;

Optimizer**(优化器):**逻辑计划(DAG)传递到逻辑优化器,逻辑优化器执行逻辑优化,例如投影和下推;

Compiler**(编译器):**编译器将优化的逻辑计划编译为一系列MapReduce作业;

Execution Engine**(执行引擎):**最后,MapReduce作业按照排好的顺序依次提交到Hadoop,这些MapReduce作业在Hadoop上执行,产生所需的结果;

4**、Pig Latin的数据模型**

Pig Latin的数据模型是完全嵌套的,它允许复杂的非原子数据类型,例如map和tuple,下图给出了PigLatin数据模型的图形表示:

Atom**(原子):**Pig Latin中的任何单个值,不论其数据类型,都称为Atom,它存储为字符串,可以用作字符串和数字。int,long,float,double,chararray和bytearray是Pig的原子值,一条数据或一个简单的原子值被被称为字段,如:“Special”或“29”;

Tuple**(元组):**由有序字段集合形成的记录称为元组,字段可以是任何类型,元组与RDBMS表中的行类似,如:(Special,29);

Bag**(包):**一个包是一组无序的元组,换句话说,元组(非唯一)的集合被称为包,每个元组可以有任意数量的字段(灵活模式)。包由“{}”表示,它类似于RDBMS中的表,但是与RDBMS中的表不同,不需要每个元组包含相同数量的字段,或者相同位置(列)中的字段具有相同的类型,如:{(Special,29),(Iris,28)}。包可以是关系中的字段,在这种情况下,它被称为内包(inner bag),如:{(Special,29),{(101,zhangsan@163.com),(102,lisi@163.com)}};

Map**(映射):**映射(或数据映射)是一组key-value对,key需要时chararray类型,且应该是唯一的,value可以是任何类型,它由“[]”表示,如:[name/#Special,age/#29];

Relation**(关系):**一个关系是一个元组的包,Pig Latin中的关系是无序的(不能保证按任何特定顺序处理元组)。

5**、Apache Pig的安装及工作模式**

Apache Pig的安装非常简单,将其安装包解压后,再设置下环境变量便可以进行使用,需要注意的是,Apache Pig有两种工作模式,分别是本地模式(操作的是Linux本地目录)和集群模式(操作的是Hadoop集群,需要连接到HDFS)。下面开始介绍Apache Pig的安装和配置。

首先,将Apache Pig的安装包pig-0.17.0.tar.gz上传到主机hadoop221的/root/tools目录下,运行命令tar -zxvfpig-0.17.0.tar.gz -C /root/training/,解压到/root/training目录下;然后,为Pig设置环境变量,运行命令vi/root/.bash_profile,在文件末尾添加如下内容:

PIG_HOME=/root/training/pig-0.17.0

export PIG_HOME

PATH=$PIG_HOME/bin:$PATH

export PATH

保存退出,最后运行source/root/.bash_profile使环境变量生效。接下来启动Pig,运行命令pig –x local,以本地模式运行Pig,输出的关键log信息如下图所示,可以很明显地看到此时Pig运行于本地模式(关键log:Connecting to hadoop file system at: file:///)。

通常情况下,一般都是将Pig运行于集群模式。如果要将Pig运行于集群模式,需要额外设置一个环境变量,该环境变量的名称必须为PIG_CLASSPATH,其路径为Hadoop配置文件所在的目录。运行命令vi /root/.bash_profile,在文件末尾添加如下内容:

PIG_CLASSPATH=/root/training/hadoop-2.7.3/etc/Hadoop

export PIG_CLASSPATH

保存退出,运行命令source /root/.bash_profile,使环境变量生效。运行命令pig,以集群模式启动Apache Pig,输出的log信息如下图所示,可以明显看到此时Pig运行于集群模式(关键log:Connectingto hadoop file system at: hdfs://hadoop221:9000)。

有意思的是,在Pig客户端可直接操作HDFS文件系统,操作方式基本跟在Linux文件系统上一样。运行命令ls /,查看HDFS根目录下的所有文件;运行命令ls /input,查看input目录下的所有文件;运行命令cat /input/file1.txt,查看file1.txt文件的内容,如下图所示:

除此之外,还可以在Pig客户端直接操作Linux本地文件系统。运行命令sh pwd,查看当前处于Linux文件系统的哪个目录;运行命令sh ls,查看Linux系统当前目录下所有文件,如下图所示:

这里简单总结下在Pig中操作HDFS常用的一些命令:

Ls,cd,cat,mkdir,pwd等,跟Linux命令完全一样;
*
copyFromLocal(上传文件),copyToLocal(下载文件);
*
register,define(部署Pig自定义函数的jar包需要使用到的两个命令);
*
等等。

6**、Pig Latin详解**

Pig Latin是一种面向数据分析处理的轻量级脚本语言,可以进行排序、过滤、求和、分组、关联等常用操作,还可以自定义函数。Pig Latin让用户每次只输入一条单独的语句,这条单独的语句只执行一个简单的数据处理,这区别于SQL,SQL要求用户一次性输入一条完成所有计算任务的语句。Pig Latin对需要处理超大数据量的技术人员很有用,在使用Pig Latin做数据分析的时候,不需要经过耗时的数据加载过程,经过分析后的数据还可以以不同的格式输出,以适应不同的应用。

A**、Pig Latin的常用操作**

加载与存储
load

导入外部文件中的数据,存入关系中store

将一个关系存储到文件系统中dump

将关系打印到控制台

过滤

filter

按条件筛选关系中的行distinct

去除关系汇总的重复行foreach

对于集合的每个元素,生成或删除字段stream

使用外部程序对关系进行变换(如将Python程序嵌入到Pig中使用)sample

从关系中随机取样

分组与连接

join

连接两个或多个关系cogroup

在两个或多个关系中分组group

在一个关系中对数据分组cross

获取两个或更多关系字段乘积(叉乘)

排序

order

根据一个或多个字段对某个关系进行排序limit

限制关系的元组个数

合并与分割

union

合并两个或多个关系split

把某个关系切分成两个或多个关系

诊断操作

describe

打印关系的模式explain

打印逻辑和物理计划illustrate

使用生成的输入子集显示逻辑计划的试运行结果

UDF****操作

register

在Pig运行时环境中注册一个jar文件define

为UDF、流式脚本或命令规范新建别名

Pig Latin****命令操作

kill

中止某个MapReduce任务exec

在一个新的Grunt Shell程序中以批处理模式运行一个脚本run

在当前Grunt外壳程序中运行程序quit

退出解释器set

设置Pig选项

Pig Latin****表达式

类型

表达式

描述

示例字段

$n

第n个字段

$0字段

d

字段名

year投影

c.$n,c.f

c.f在关系、包或元组中的字段

user.$0,

user.yearMap查找

m/#k

在映射m中键k对应的值

items‘Coat’类型转换

(t)f

将字段t转换成f类型

(int)age函数型平面化

Fn(f1,f2,…)

在字段上应用函数

fn isGood

(quality)函数型平面化

FLATTEN(f)

从包和元组中去除嵌套

flatten(group)

B**、Pig Latin数据类型**

int (32位有符号整数)
*
long (64位有符号整数)
*
float (32位浮点数)
*
double (64位浮点数)
*
chararray (UTF16格式的字符数组)
*
bytearray (字节数组)
*
tuple(元组):(1, 'world') //可以是任何类型的字段序列
*
bag(包):{(1, 'world'), (2)} //元组的无序多重集合(允许重复元组)
*
map(键值对):['a' 'world'] //一组键值对,键必须是字符数组

C**、Pig Latin常用的内置函数**

计算函数
avg

计算包中项的平均值concat

把两个字节数组或者字符数组连接成一个count

计算包中非空值的个数count_star

计算包中项的个数,包括空值diff

计算两个包的差max

计算包中项的最大值min

计算包中项的最小值size

计算一个类型的大小,数值型的大小为1;

对于字符数组,返回字符的个数;

对于字节数组,返回字节的个数;

对于元组,包,映射,返回其中项的个数。sum

计算一个包中项的值的总和TOKENIZE

对一个字符数组进行标记解析,并把结果词放入一个包

过滤函数

isempty

判断一个包或映射是否为空

加载存储函数

PigStorage

用字段分隔文本格式加载或存储关系,这是默认的存储函数BinStorage

从二进制文件加载一个关系或者把关系存储到二进制文件BinaryStorage

从二进制文件加载只是包含一个类型为bytearray的字段的元组到关系,或以这种格式存储一个关系TextLoader

从纯文本格式加载一个关系PigDump

用元组的tostring()形式存储关系

7**、Pig Latin数据分析实践**

在使用Pig Latin进行数据分析之前,需要先启动historyserver,因为Pig在Hadoop上执行任务后需要与historyserver通信,解析执行日志以确定任务执行是否成功。运行命令mr-jobhistory-daemon.shstart historyserver,启动historyserver,然后在浏览器地址栏中输入192.168.12. 221:19888/jobhistory,看到的界面如下图所示,可以看到所有历史执行的MapReduce任务。

下面使用员工表employee和部门表department为例,进行Pig Latin数据分析实践。这两张表的内容如下图所示:

创建员工表emp

运行命令pig,以集群模式启动Pig,然后运行命令emp = load '/data/employee' as(empno,ename,job,mgr,hiredate,sal,comm,deptno);创建emp表,并运行命令describe emp;查看表结构(schema),运行结果如下图所示,这里需要注意的是,在创建表时并没有指定列的类型,Pig默认都为bytearray。

然而,大部分情况下,在创建表的时候,需要指定列的类型,同时指定列的分隔符(Pig默认的分隔符是制表符,使用using PigStorage命令可以设定分隔符),重新运行命令emp = load '/data/employee' using PigStorage(',')as(empno:int,ename:chararray,job:chararray,mgr:int,hiredate:chararray,sal:int,comm:int,deptno:int);创建emp表,并运行命令describe emp;查看emp表的结构,运行结果如下图所示,可以看到,此时emp表各列的数据类型不尽相同。

查询员工信息(员工号、姓名和薪水)

运行命令emp1= foreach emp generate empno,ename,sal;从员工表中查询员工号、员工姓名以及员工薪水,可以看到该命令执行后,并不会立即触发计算,显示查询结果,再运行命令dump emp1;触发计算,可以看到此时会生成一个MapReduce任务,等待一小段时间后,查询结果显示在屏幕上,运行结果如下图所示:

查询员工信息(按照月薪排序)

运行命令emp2 = order emp by sal;查询员工信息并按照月薪排序,然后运行命令dump emp2;触发计算,运行结果如下图所示:

分组查询(查询每个部门的最高工资)

要完成该项查询任务,可以分两步进行操作:1、对数据表进行查询分组;2、查询每个组中员工薪水的最大值。运行命令emp3 = group emp bydeptno;对emp表进行查询分组,然后运行命令emp31= foreach emp3 generate group,MAX(emp.sal);查询emp31表每个组中员工薪水的最大值,最后运行命令dump emp31;触发计算,运行结果如下图所示:

查询指定部门(如20号部门)的员工

运行命令emp4 = filter emp bydeptno == 20;查询20号部门所有员工信息,然后运行命令dump emp4;触发计算,运行结果如下图所示:

多表查询(查询部门名称和员工姓名)

运行命令dept = load'/data/department' using PigStorage(',')as(deptno:int,dname:chararray,loc:chararray);创建部门表dept,然后运行命令result51 = join emp by deptno,dept by deptno;联合emp表和deptno表中的信息,最后运行命令result52 = foreach result51 generate dept::dname,emp::ename;查询部门名称和员工姓名,并运行命令dump result52;触发计算,运行结果如下图所示:

集合运算(查询10号和20号部门的员工信息)

运行命令emp61= filter emp by deptno==10;查询emp表中部门号为10的员工信息,然后运行命令emp62 = filter emp by deptno==20;查询emp表中部门号为20的员工信息,最后运行命令result6 = union emp61,emp62;求部门号为10和部门号为20的员工信息的并集,并运行命令dump result6;触发计算,运行结果如下图所示:

使用Pig Latin实现WordCount

准备实验数据,创建文件data.txt,在其中输入相应的字符内容,如下图所示:

运行命令mydata= load '/input/data.txt' as(line:chararray);加载data.txt文件中的数据并创建数据表mydata,运行命令words = foreach mydata generateflatten(TOKENIZE(line)) as word;将字符串分割成单个的单词,运行命令groupword= group words by word;对单词进行分组,运行命令countword = foreachgroupword generate group,COUNT(words);统计每组中单词的数量,最后运行命令dumpcountword;触发计算,运行结果如下图所示:

8**、Pig自定义函数**

在Pig中,支持使用Java、Python以及Javascript三种语言编写UDF,而在这三种语言中,Java自定义函数最为成熟,其它两种功能比较有限。在开发自定义函数之前,需要做一些准备工作,新建一个Java Project工程,并从Pig的安装路径下及hadoop的安装路径下拷贝对应的jar包到Eclipse项目工程中,需要的jar包分别位于如下路径:

/root/training/pig-0.14.0/pig-0.14.0-core-h2.jar
*
/root/training/pig-0.14.0/lib
*
/root/training/pig-0.14.0/lib/h2
*
/root/training/hadoop-2.4.1/share/hadoop/common
*
/root/training/hadoop-2.4.1/share/hadoop/common/lib

将所有这些目录下的jar拷贝到新建项目中,并添加到编译路径(Addingto build path)即可。

A**、自定义运算函数**

这里实现的自定义运算函数,其功能是:根据员工的薪水,判断其薪水的级别,若薪水低于1500,返回级别Grade A;若薪水大于等于1500且小于等于3000,返回级别GradeB;若薪水高于3000,则返回级别Grade C。自定义运算函数时,需要继承Pig提供的父类EvalFunc,并重写exec方法,在该方法中实现具体的计算逻辑,而在使用Pig Latin语句进行查询操作时,通过tuple来传递相应的参数值。代码如下图所示:

代码编写完成后,将其导出为pigfunction.jar,并上传到主机hadoop221的/root/temp目录下。运行命令register /root/temp/pigfunction.jar;注册该jar包,运行命令emp1 = foreach emp generateempno,ename,sal,pig.CheckSalaryGrade(sal);查询所有员工信息并判断其薪水级别,最后运行命令dump emp1触发计算,运行结果如下图所示:

B**、自定义过滤函数**

这里实现的自定义过滤函数,其功能是:查询薪水高于3000的员工,若薪水高于3000,则查询出该员工的信息;反之,则不查询该员工的信息。自定义过滤函数时,需要继承Pig提供的父类FilterFunc,并重写exec方法,在该方法中实现具体的判断逻辑,同样地,在使用Pig Latin语句进行查询操作时,通过tuple来传递相应的参数值。代码如下图所示:

这里的运行方式跟前面完全一致,也需要导出jar,并注册该jar,这几个步骤不再赘述。运行命令emp2 = filter emp bypig.IsSalaryTooHigh(sal);查询所有薪水高于3000的员工,并运行命令dump emp2触发计算,运行结果如下图所示:

C**、自定义加载函数**

在开发自定义加载函数之前,需要注意的是,由于加载的数据来源于HDFS,因此还需要导入开发MapReduce程序所需要使用到的相关jar包,前面已经实践编写过MapReduce程序,也应该知道如何获取到这些jar包,这里不再阐述。

相比于自定义运算函数和自定义过滤函数,自定义加载函数要复杂一些,这里实现的自定义加载函数,其功能是:将输入数据的每一行中的每个单词单独读取作为一个tuple(也就是单独作为一行),这样后续对表进行查询操作时就可以直接进行分组,而不再需要进行分词操作。在默认情况下,Pig会将读取的一行数据作为一个tuple,同时按照默认的分隔符(制表符)进行切分,得到的每个字段对应存入到该tuple中的field。自定义加载函数时,需要继承Pig提供的父类LoadFunc,同时还要重写四个方法,以实现从HDFS文件系统指定目录中读取数据,并按照自定义规则逻辑生成相应表bag。代码如下图所示:

代码编写完成后,导出为pigfunction.jar包,并注册该jar包。运行命令emp3 = load '/input/data.txt' using pig.MyLoadFunction();从HDFS的/input/data.txt目录下读取数据,按照自定义加载函数功能逻辑生成相应的表bag,最后运行命令dump emp3;触发计算,运行结果如下图所示:

到这里,Apache Pig的相关知识介绍完毕,有兴趣的朋友可以自己去网上查找相关资料,进一步学习。下期待续……

参考文献:

——《百度百科》

——《CSDN博客》

——《潭州大数据课程课件》

转自https://mp.weixin.qq.com/s/FLo1KjM4ck34G4_xohSFIA

相关文章

微信公众号