Spark不可变的分布式数据集合DataFrames

x33g5p2x  于2021-03-14 发布在 其他  
字(11.4k)|赞(0)|评价(0)|浏览(156)

DataFrame是一个不可变的分布式数据集合,它的结构类似于关系数据库中的表的命名列。 作为SchemaRDD在Apache Spark 1.0中作为实验性功能引入,它们作为Apache Spark 1.3版本的一部分重命名为DataFrames。对于熟悉Python Pandas DataFrame或R DataFrame的读者来说,Spark DataFrame是一个类似的概念,它允许用户轻松处理结构化数据(例如,数据表); 也有一些差异所以也请不要期望过高。
通过将结构强加到分布式数据集合上,这允许Spark用户在Spark SQL中查询结构化数据或使用表达式方法(而不是lambdas)。 在本章中,我们将使用这两种方法,都会包含代码示例。通过构建结构数据,这使得Apache Spark引擎(特别是Catalyst Optimizer)能够显着提高Spark查询的性能。 在Spark的早期API(即RDD)中,由于Java JVM和Py4J之间的通信开销,在Python中执行查询可能会显著减慢。


如果您熟悉在以前版本的Spark(即Spark 1.x)中使用DataFrame,您会注意到在Spark 2.0中我们使用SparkSession而不是SQLContext。各种Spark contexts:HiveContext,SQLContext,StreamingContext和SparkContext已在SparkSession中合并在一起。 这样,你就可以将此 session作为读取数据,处理元数据,配置和群集资源管理的入口点。有关更多信息,请参阅How to use SparkSession in Apache
Spark 2.0 (http://bit.ly/2br0Fr1)


在本章中,您将了解以下内容:
  • Python到RDD的通信方式
  • 对Spark 的 Catalyst Optimizer 快速了解
  • 用DataFrames加速PySpark
  • 创建DataFrame
  • 简单的DataFrame查询
  • 与RDD互操作
  • 使用DataFrame API进行查询
  • 使用Spark SQL查询
  • 使用DataFrames实现准点飞行表现的应用

Python to RDD communications

每当使用RDD执行PySpark程序时,执行作业可能会有很大的开销。 如下图所示,在PySpark驱动程序中,Spark Context使用Py4j使用JavaSparkContext启动JVM。任何RDD的transformations 最初都映射到Java中的PythonRDD对象。
将这些任务推送到Spark Worker之后,PythonRDD对象使用管道启动Python子进程,以发送要在Python中处理的代码和数据:

虽然这种方法允许PySpark将数据处理分发给多个worker上的多个Python子进程,但正如您所看到的,Python和JVM之间存在大量的context切换和通信开销。

Catalyst Optimizer refresh

正如第1章“Understanding Spark”中所述,Spark SQL引擎如此之快的主要原因之一是Catalyst Optimizer。 对于具有数据库背景的读者,此图看起来类似于关系数据库管理系统(RDBMS)的逻辑/物理规划器和成本模型/基于成本的优化:

这一点的重要性在于,与立即处理查询相反,Spark引擎的Catalyst Optimizer编译并优化逻辑计划,并具有成本优化器,可确定生成的最有效的物理计划。
作为Project Tungsten的一部分,通过生成字节代码(code generation or codegen)而不是解释每行数据,可以进一步提高性能。 有关Tungsten的更多详细信息,请参阅第1章“Understanding Spark”中的“Project Tungsten”部分。
如前所述,优化器基于函数式编程构造,其设计考虑了两个目的:简化向Spark SQL添加新的优化技术和功能的过程,并允许外部开发人员扩展优化器(例如,添加data-source-specifc规则,支持新数据类型等)。

Speeding up PySpark with DataFrames

与没有被优化的RDD查询相比,DataFrames和Catalyst Optimizer(以及Project Tungsten)的重要性在于PySpark查询的性能提升。如下图所示,在引入DataFrame之前,Python查询速度通常是使用RDD的相同Scala查询的两倍。通常,查询性能的这种下降是由于Python和JVM之间的通信开销造成的:

来源:Source: Introducing DataFrames in Apache-spark for Large Scale Data Science at http://bit.ly/2blDBI1

使用DataFrames,不仅Python性能有了显着提高,而且Python,Scala,SQL和R之间的性能也都相差无几。


值得注意的是,虽然使用DataFrames,PySpark通常会更快,但也有一些例外。 最突出的一个是使用Python UDF,这导致Python和JVM之间的往返通信。 注意,如果计算是在RDD上完成的,这将是最糟糕的情况。
翻译备注,在 Apache Spark 中使用 UDF:https://www.iteblog.com/archives/2038.html


Python可以利用Spark中的性能优化,即使Catalyst Optimizer的代码库是用Scala编写的。 基本上,它是一个大约2,000行代码的Python包,它允许PySpark DataFrame查询显着更快。
总而言之,Python DataFrames(以及SQL,Scala DataFrames和R DataFrames)都能够使用Catalyst Optimizer(根据以下更新的图表):

Creating DataFrames

通常,您将通过使用SparkSession导入数据(或在PySpark shell中调用spark)来创建DataFrame。


注意:在Spark 1.x版本中,您通常必须使用sqlContext。


在以后的章节中,我们将讨论如何将数据导入本地文件系统,Hadoop分布式文件系统(HDFS)或其他云存储系统(例如,S3或WASB)。
首先,我们将通过生成数据来创建DataFrame,而不是访问文件系统。 在这种情况下,我们将首先创建stringJSONRDD RDD,然后将其转换为DataFrame。 此代码段以JSON格式创建由游泳者(他们的ID,姓名,年龄和眼睛颜色)组成的RDD。

Generating our own JSON data

下面,我们将生成最初的stringJSONRDD RDD:

stringJSONRDD = sc.parallelize(("""
{ "id": "123",
"name": "Katie",
"age": 19,
"eyeColor": "brown"
}""",
"""{
"id": "234",
"name": "Michael",
"age": 22,
"eyeColor": "green"
}""",
"""{
"id": "345",
"name": "Simone",
"age": 23,
"eyeColor": "blue"
}""")
)

现在我们已经创建了RDD,我们将使用SparkSession read.json方法(即spark.read.json(...))将其转换为DataFrame。 我们还将使用.createOrReplaceTempView方法创建一个临时表。


在Spark 1.x中,此方法是.registerTempTable,它作为Spark 2.x的一部分被弃用。


Creating a DataFrame

以下是创建DataFrame的代码:

swimmersJSON = spark.read.json(stringJSONRDD)

Creating a temporary table

以下是创建临时表的代码:

swimmersJSON.createOrReplaceTempView("swimmersJSON")

如前几章所述,许多RDD操作都是transformations,在执行 action操作之前不会执行transformations。 例如,在前面的代码片段中,sc.parallelize是一个transformation,它只有在使用spark.read.json从RDD转换为DataFrame时才执行。请注意,在此下文的代码段笔记本(左下角附近)的屏幕截图中,直到包含spark.read.json操作的第二个单元格才会执行Spark作业。
为了进一步强调这一点,在下图的右窗格中,我们提供了DAG执行图。
在下面的屏幕截图中,您可以看到Spark作业的parallelize操作来自生成RDD stringJSONRDD的第一个单元格,而map和mapPartitions操作是创建DataFrame所需的操作:

Spark UI of the DAG visualization of the spark.read.json(stringJSONRDD) job.

在下面的屏幕截图中,您可以看到parallelize操作的stage来自生成RDD stringJSONRDD的第一个单元格,而map和mapPartitions操作是创建DataFrame所需的操作:

Spark UI of the DAG visualization of the stages within the spark.read.json(stringJSONRDD) job

值得注意的是,parallelize,map和mapPartitions都是RDD transformations。 在DataFrame操作中包含spark.read.json(在本例中),不仅是RDD transformations,还包括将RDD转换为DataFrame的action。这是一个重要的提示,因为即使你正在执行DataFrame操作,要调试这些操作,还需要记住,你需要了解Spark UI中的RDD操作。
请注意,创建临时表是DataFrame transformation,在执行DataFrame action之前不会执行(例如,在下一节中要执行的SQL查询中)。


DataFrame transformations和actions类似于RDD transformations和actions,因为有一组操作是惰性的( transformations)。 但是,与RDD相比,DataFrames操作并不是那么懒惰,主要是由于Catalyst Optimizer。


Simple DataFrame queries

现在已经创建了swimmersJSON DataFrame,我们将能够运行DataFrame API以及针对它的SQL查询。 让我们从一个简单的查询开始,显示DataFrame中的所有行。

DataFrame API query

要使用DataFrame API执行此操作,您可以使用


```方法,该方法将前n行打印到控制台:
*运行.show()方法将默认显示前10行。*

DataFrame API

swimmersJSON.show()

输出如下所示:
![](http://img.saoniuhuo.com/images/202110/84651633587348450.jpg)
## **SQL query**
如果你更喜欢编写SQL语句,可以编写以下查询:

spark.sql("select * from swimmersJSON").collect()

输出如下所示:
![](http://img.saoniuhuo.com/images/202110/60991633587348807.jpg)
我们使用.collect()方法,该方法将所有记录作为Row对象列表返回。 请注意,您可以对DataFrame和SQL查询使用collect()或show()方法。只要确保如果你使用.collect(),这是一个小的DataFrame,因为它将返回DataFrame中的所有行,并将它们从执行程序移回驱动程序。 您可以使用

```take(<n>)

```或

```show(<n>)

```,它允许您通过指定

```<n>

```来限制返回的行数:
# **Interoperating with RDDs**
将现有RDD转换为DataFrame(或dataset[T])有两种不同的方法:使用反射推断模式,或以编程方式指定模式。前者允许您编写更简洁的代码(当您的Spark应用程序已经知道架构时),而后者允许DataFrames的列和它的数据类型仅在运行时显示。 注意,反射参考的是模式反射而不是Python反射。
## **Inferring the schema using reflection**
在构建DataFrame并运行查询的过程中,我们跳过了自动定义此DataFrame的模式这一事实。最初,通过将键/值对列表作为** kwargs传递给类Row来构造Row对象。 然后,Spark SQL将Row对象的RDD转换为DataFrame,其中键是列,数据类型是通过对数据进行采样来推断的。
***kwargs构造允许你在运行时将可变数量的参数传递给方法。*
回到代码,在最初创建swimmersJSON DataFrame之后,在不指定模式的情况下,您将通过使用printSchema()方法注意到模式定义:

Print the schema

swimmersJSON.printSchema()

输出如下:
![](http://img.saoniuhuo.com/images/202110/81341633587349206.jpg)
但是,如果我们想要指定模式,因为在这个例子中,我们知道id实际上是long而不是字符串?
## **Programmatically specifying the schema**
在这种情况下,让我们通过引入Spark SQL数据类型(pyspark.sql.types)以编程方式指定模式,并为此示例生成一些.csv数据:

Import types

from pyspark.sql.types import *

Generate comma delimited data

stringCSVRDD = sc.parallelize([
(123, 'Katie', 19, 'brown'),
(234, 'Michael', 22, 'green'),
(345, 'Simone', 23, 'blue')
])

首先,我们将根据下面的[schema]变量将模式编码为字符串。 然后我们将使用StructType和StructField定义模式:

```# Specify schema
schema = StructType([
    StructField("id", LongType(), True),
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("eyeColor", StringType(), True)
])

注意,StructField类按以下方式细分:

  • name: 字段的名字
  • dataType: 字段的数据类型
  • nullable: 指示此字段的值是否可以为null
    最后,我们将我们创建的模式(schema)应用于stringCSVRDD RDD(即生成的.csv数据)并创建临时视图,以便我们可以使用SQL查询它:
# Apply the schema to the RDD and Create DataFrame
swimmers = spark.createDataFrame(stringCSVRDD, schema)
# Creates a temporary view using the DataFrame
swimmers.createOrReplaceTempView("swimmers")

在这个例子中,我们对模式进行了细粒度控制,并且可以指定id是一个long(而不是上一节中的字符串):

swimmers.printSchema()

输出如下:

在许多情况下,可以推断出模式(根据上一节),就不需要指定模式,如前面的示例所示。

Querying with the DataFrame API

如上一节所述,您可以先使用collect(),show()或take()来查看DataFrame中的数据(最后两个包括限制返回行数的选项)。

Number of rows

要获取DataFrame中的行数,可以使用count()方法:

swimmers.count()

Out[13]: 3

Running flter statements

要运行filter语句,可以使用filter子句; 在下面的代码片段中,我们使用select子句来指定要返回的列:

# Get the id, age where age = 22
swimmers.select("id", "age").filter("age = 22").show()
# Another way to write the above query is below
swimmers.select(swimmers.id, swimmers.age).filter(swimmers.age == 22).
show()

此查询的输出是仅选择id和age列,其中age = 22:

如果我们只想取回具有以眼睛颜色的单词以字母b开头的游泳者的名字,我们可以使用类似SQL的语法,如下面的代码所示:

# Get the name, eyeColor where eyeColor like 'b%'
swimmers.select("name", "eyeColor").filter("eyeColor like 'b%'").
show()

输出如下:

Querying with SQL

让我们运行相同的查询,除了这次,我们将使用针对相同DataFrame的SQL查询来执行此操作。 回想一下,这个DataFrame是可访问的,因为我们为 swimmers执行了.createOrReplaceTempView方法。

Number of rows

以下是使用SQL获取DataFrame中的行数的代码段:

spark.sql("select count(1) from swimmers").show()

输出如下所示:

Running filter statements using the where Clauses

要使用SQL运行filter语句,可以使用where子句,如以下代码段所示:

# Get the id, age where age = 22 in SQL
spark.sql("select id, age from swimmers where age = 22").show()

此查询的输出是仅选择age = 22的id和age列:

与DataFrame API查询一样,如果我们想要找回眼睛颜色单词以字母b开头的swimmers 的名字,我们也可以使用类似的语法:

spark.sql(
"select name, eyeColor from swimmers where eyeColor like 'b%'").show()

输出如下所示:

使用Spark SQL和DataFrame时的一个重要注意事项是,虽然使用CSV,JSON和各种数据格式很容易,但Spark SQL分析查询最常见的存储格式是Parquet文件格式。它是许多其他数据处理系统支持的columnar 格式,Spark SQL支持读取和写入Parquet文件,自动保留原始数据的模式。

DataFrame scenario – on-time flight performance

为了展示您可以使用DataFrames进行的查询类型,让我们看一下按时飞行性能的用例。 我们将分析航空公司准点性能和航班延误原因:准时数据集,并将其加入机场数据集。机场数据集是从开放航班机场,航空公司和航线获得的数据,以更好地理解与飞行延迟相关的变量。

Preparing the source datasets

我们将首先通过指定其路径位置并使用SparkSession导入它们来处理源机场和飞行性能数据集:

# Set File Paths
flightPerfFilePath =
"/databricks-datasets/flights/departuredelays.csv"
airportsFilePath =
"/databricks-datasets/flights/airport-codes-na.txt"
# Obtain Airports dataset
airports = spark.read.csv(airportsFilePath, header='true',
inferSchema='true', sep='\t')
airports.createOrReplaceTempView("airports")
# Obtain Departure Delays dataset
flightPerf = spark.read.csv(flightPerfFilePath, header='true')
flightPerf.createOrReplaceTempView("FlightPerformance")
# Cache the Departure Delays dataset
flightPerf.cache()

请注意,我们使用CSV读取器(spark.read.csv)导入数据,该读取器适用于任何指定的分隔符(请注意,机场数据是tab分隔的,飞行性能数据是逗号分隔的)。 最后,我们缓存飞行数据集,以便后续查询更快。

Joining flight performance and airports

DataFrames / SQL的一个常见任务是连接两个不同的数据集; 它通常是要求更高的操作之一(从性能角度来看)。 使用DataFrames,默认情况下会包含许多针对这些联接的性能优化:

# Query Sum of Flight Delays by City and Origin Code
# (for Washington State)
spark.sql("""
    select a.City, f.origin, sum(f.delay) as Delays
    from FlightPerformance f
        join airports a
            on a.IATA = f.origin
    where a.State = 'WA'
    group by a.City, f.origin
    order by sum(f.delay) desc"""
).show()

在我们的场景中,我们正在查询华盛顿州的城市和原始代码的总延误。 这将要求通过国际航空运输协会(IATA)代码将飞行性能数据与机场数据相结合。 查询的输出如下:

使用笔记本(如Databricks,iPython,Jupyter和Apache Zeppelin),您可以更轻松地执行和可视化您的查询。 在以下示例中,我们将使用Databricks笔记本。 在我们的Python笔记本中,我们可以使用%sql函数在该笔记本单元格中执行SQL语句:

%sql
-- Query Sum of Flight Delays by City and Origin Code (for Washington
State)
select a.City, f.origin, sum(f.delay) as Delays
    from FlightPerformance f
        join airports a
            on a.IATA = f.origin
where a.State = 'WA'
group by a.City, f.origin
order by sum(f.delay) desc

这与上一个查询相同,由于格式化,所以更易于阅读。 在我们的Databricks笔记本示例中,我们可以快速将这些数据可视化为条形图:

Visualizing our flight-performance data

让我们继续可视化我们的数据,但按照美国大陆的所有州进行细分:

%sql
-- Query Sum of Flight Delays by State (for the US)
select a.State, sum(f.delay) as Delays
    from FlightPerformance f
        join airports a
            on a.IATA = f.origin
where a.Country = 'USA'
group by a.State

输出条形图如下:

但是,将这些数据视为地图会更酷; 单击图表左下角的条形图图标,您可以从许多不同的原始导航中进行选择,包括地图:

DataFrames的主要好处之一是信息的结构类似于表格。 因此,无论您使用的是笔记本电脑还是您喜欢的BI工具,您都可以快速查看数据。

Spark Dataset API

在讨论了Spark DataFrames之后,让我们快速回顾一下Spark Dataset API。 在Spark Spark 1.6中引入,Spark Dataset的目标是提供一个API,允许用户轻松地表达域对象的转换,同时还提供强大的Spark SQL执行引擎的性能和好处。作为Spark 2.0版本的一部分(如下图所示),DataFrame API将合并到Dataset API中,从而统一所有库中的数据处理功能。 由于这种统一,开发人员现在学习或记忆的概念较少,并且使用单个高级且类型安全的API(称为Dataset):

从概念上讲,Spark DataFrame是泛型对象Dataset [Row]的集合的别名,其中Row是通用的无类型JVM对象。 相比之下,Dataset 是强类型JVM对象的集合,由您在Scala或Java中定义的案例类决定。最后一点特别重要,因为这意味着PySpark不支持Dataset API,因为缺少类型增强功能。 请注意,对于PySpark中不可用的Dataset API部分,可以通过转换为RDD或使用UDF来访问它们。

Summary

使用Spark DataFrames,Python开发人员可以使用更简单的抽象层,该层也可能显着更快。 Python最初在Spark中运行速度较慢的一个主要原因是Python子流程和JVM之间有一个的通信层。对于Python DataFrame用户,我们有一个围绕Scala DataFrames的Python包装器,它避免了Python子进程/ JVM通信开销。 Spark DataFrames通过我们在本章中回顾的Catalyst Optimizer和Project Tungsten进行了许多性能增强。
在本章中,我们还回顾了如何使用Spark DataFrames并使用DataFrames处理准时飞行性能方案。
在本章中,我们通过生成数据或利用现有数据集来创建和使用DataFrame。
在下一章中,我们将讨论如何转换和理解你自己的数据。

相关文章