Spark弹性分布式数据集(RDD)的内部工作方式

x33g5p2x  于2021-03-14 发布在 其他  
字(9.9k)|赞(0)|评价(0)|浏览(286)

弹性分布式数据集(RDD)是不可变JVM对象的分布式集合,允许您非常快速地执行计算,它们是Apache Spark的支柱。
顾名思义,数据集是分布式的; 它根据某个键拆分为块并分发到执行程序节点。 这样做可以非常快速地对这些数据集运行计算。另外,正如第1章“Understanding Spark”中已经提到的,RDD跟踪(日志)应用于每个块的所有transformations,以加速计算,并在出现问题并且丢失部分数据时提供回退; 在这种情况下,RDD可以重新计算数据。data lineage是防止数据丢失的另一道防线,是数据复制的补充。

本章涵盖以下主题:
  • RDD的内部工作方式
  • 创建RDDs
  • 全局范围与本地范围
  • Transformations
  • Actions

Internal workings of an RDD

RDD是并行运行的。 这是在Spark中工作的最大优势:每个transformation都是并行执行的,以便大幅提高速度。对数据集的transformation是惰性的。 这意味着只有在调用数据集上的action时才会执行任何transformation。 这有助于Spark优化执行。例如,考虑以下非常常见的步骤,分析师通常会这样做以熟悉数据集:

  1. 计算某列中不同值的出现次数
  2. 选择那些以A开头的数据
  3. 将结果输出到屏幕上
    就像前面提到的步骤一样简单,如果只有以字母A开头的item是有意义的,那么计算所有其他item的不同字符是没有意义的。因此,Spark不会按照前面几点执行,而会只计算以A开头的items的个数,然后将结果输出到屏幕上。
    我们用几行代码来讲述这个例子。首先,我们命令Spark使用

.map(lambda v: (v,1))

映射A的值,然后选择那些以'A'开头的记录(使用


.filter(lambda val: val.startswith('A'))

)。如果我们调用

.reduceByKey(operator.add)

方法,它将reduce the dataset并使用add方法(在此示例中为count),来计算每个键的出现次数。 所有这些步骤都会**转换(transform)**成dataset。
其次,我们调用

.collect()

方法。 此步骤是在dataset上的 action,它会最终计算dataset的不同元素的个数。实际上,该action可能会颠倒transform的顺序,并在map之前首先过滤数据,从而导致将较小的数据集传递给reducer。

Creating RDDs

有两种方法可以在PySpark中创建RDD:你可以使用

.parallelize(...)

方法,将一个集合(list或一些元素的array)转换成RDDs:


data = sc.parallelize(
[('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12), 
('Amber', 9)])

或者是,你可以引用位于本地或外部的一个文件或多个文件:


data_from_file = sc.\
    textFile('/Users/drabast/Documents/PySpark_Data/VS14MORT.txt.gz', 4)

sc.textFile(..., n) 中的最后一个参数指定数据集分成的分区数。
一条经验法则是:将集群中的每个数据集分成两个到四个分区
Spark可以从多种文件系统中读取:本地系统:如NTFS,FAT或Mac OS Extended(HFS +),或分布式系统:如HDFS,S3,Cassandra等等
警惕数据集的读取或保存位置:路径不能包含特殊字符[]。 请注意,这也适用于存储在Amazon S3或Microsoft Azure Data Storage上的路径。
支持多种数据格式:Text, parquet, JSON, Hive tables, 以及使用JDBC驱动程序从关系数据库中读取的数据。请注意,Spark可以自动使用压缩数据集(如前面示例中的Gzipped)。根据数据的读取方式,数据对象的表示方式略有不同。 当我们从文件读取的数据表示为MapPartitionsRDD,使用

.paralellize(...)

读取一个集合时,则为ParallelCollectionRDD。

Schema

RDD是无模式数据结构(与DataFrame不同,我们将在下一章中讨论)。 因此,在使用RDD时,使用Spark并行化(parallelizing)数据集(例如在以下代码片段中)是非常合适的:


data_heterogenous = sc.parallelize([
    ('Ferrari', 'fast'),
    {'Porsche': 100000},
    ['Spain','visited', 4504]
]).collect()

所以,我们几乎可以混合任何东西:元组,字典或列表,Spark永远不会抱怨。
一旦你

.collect()

数据集(也就是说,运行一个action将它带回驱动程序),你可以像在Python中一样访问对象中的数据:


data_heterogenous[1]['Porsche']

这将会产生如下结果:


100000

.collect() 方法将RDD的所有元素返回给驱动程序,并将其序列化为列表。

Reading from files

从文本文件中读取时,文件中的每一行都构成RDD的一个元素.
data_from_file.take(1)命令将产生以下(有些不可读)输出:

为了使其更具可读性,让我们创建一个元素列表,所以每行代表一系列值。

Lambda expressions

在这个例子中,我们将从data_from_file看起来神秘的记录中提取有用的信息。
The code can be found here: https://github.com/drabastomek/learningPySpark/tree/master/Chapter03/LearningPySpark_Chapter03.ipynb.
首先,让我们在下面的代码的帮助下定义该方法,该代码将不可读的行解析成我们可以使用的东西:


def extractInformation(row):
    import re
    import numpy as np
    selected_indices = [
        2,4,5,6,7,9,10,11,12,13,14,15,16,17,18,
        ...
        77,78,79,81,82,83,84,85,87,89
    ]
    record_split = re\
        .compile(
            r'([\s]{19})([0-9]{1})([\s]{40})
            ...
            ([\s]{33})([0-9\s]{3})([0-9\s]{1})([0-9\s]{1})')
    try:
        rs = np.array(record_split.split(row))[selected_indices]
    except:
        rs = np.array(['-99'] * len(selected_indices))
    return rs

这里需要提醒一点。 定义纯Python方法会降低应用程序的速度,因为Spark需要在Python解释器和JVM之间不断地来回切换。 只要你可以,你应该使用内置的Spark函数。
接下来,我们导入必要的模块:re模块,因为我们将使用正则表达式来解析记录,以及NumPy,以便于一次选择多个元素。
最后,我们创建一个Regex对象来提取指定的信息并通过它解析行。
解析记录后,我们尝试将列表转换为NumPy数组并返回它; 如果失败,我们返回一个默认值列表-99,所以我们知道这条记录没有正确解析。
我们可以使用.flatMap(...)隐式过滤掉格式错误的记录,并返回空列表[]而不是-99。点击查看详细信息:http://stackoverflow.com/questions/34090624/remove-elements-from-spark-rdd
现在,我们将使用

extractInformation(...)

方法来分割和转换我们的数据集。 请注意,我们只将方法的signature 传递给

.map(...)

:**该方法将在每个分区中一次将RDD的一个元素移交给

extractInformation(...)

方法**:


data_from_file_conv = data_from_file.map(extractInformation)

运行data_from_file_conv.take(1)将产生以下结果(缩写):

Global versus local scope

作为具有前瞻性的PySpark用户,您需要习惯的一件事就是Spark固有的并行性。 即使你熟悉Python,在PySpark中执行脚本也需要改变你的想法。
Spark可以以两种模式运行:本地和集群。 当你在本地运行Spark时,你的代码可能与你目前习惯使用的Python没有什么不同:??原文见下?? 数据和代码可以在单独的工作进程之间复制。

Changes would most likely be more syntactic than anything else but with an added twist that data and code can be copied between separate worker processes.

但是,如果你不小心,使用相同的代码并将其部署到群集可能会引起很多麻烦。 这需要了解Spark如何在集群上执行作业。
在群集模式下,当提交作业执行时,作业将发送到驱动程序(或主节点)。 驱动程序节点为job创建DAG(请参阅第1章,Understanding Spark)并确定哪个执行程序(或工作程序)节点将运行特定tasks。
然后,驱动程序指示workers执行他们的任务,并在完成后将结果返回给驱动程序。 然而,在此之前,驱动程序准备每个任务的闭包:驱动程序上的一组变量和方法,供worker在RDD上执行其task。
这组变量和方法在执行程序的contex中本质上是静态的,也就是说,每个执行程序从驱动程序中获取变量和方法的副本。如果在运行任务时,执行程序更改这些变量或覆盖方法,则不会影响其他执行程序的副本或驱动程序的变量和方法。这可能会导致一些意外行为和运行错误,有时可能很难追查。

Transformations

Transformations会塑造你的数据集。对dataset中的值可以采用方法的包括 mapping, fltering, joining, and
transcoding。在本节中,我们将展示RDD上可用的一些transformations

The .map(...) transformation

可以说你最常使用的是

.map(...)

转换。 该方法应用于RDD的每个元素:对于data_from_file_conv数据集,您可以将其视为每行的转换。
在本示例中,我们将创建一个新的数据集,将死亡年份转换为数值形式:


data_2014 = data_from_file_conv.map(lambda row: int(row[16]))

运行

data_2014.take(10)

将会产生如下结果:

你还可以取更多列,但是必须将它们打包成元组,字典或列表的形式。 让我们将该行的第17个元素同时取出来,以便我们可以确定我们的

.map(...)

按预期工作:


data_2014_2 = data_from_file_conv.map(
    lambda row: (row[16], int(row[16]):)
data_2014_2.take(5)

上面的代码将产生以下结果:

The .flter(...) transformation

另一个最常用的transformation是

.filter(...)

方法,它允许您从数据集中选择符合指定标准的条件的元素。 例如,从data_from_file_conv数据集中,让我们计算2014年在一次事故中死亡的人数:


data_filtered = data_from_file_conv.filter(
    lambda row: row[16] == '2014' and row[21] == '0')
data_filtered.count()

请注意,上述命令可能需要一段时间,具体取决于计算机的速度。 对我们来说,返回结果花了两分多钟

The .flatMap(...) transformation

.flatMap(...)

方法与

.map(...)

的工作方式类似,但它返回展平结果而不是列表。 如果我们执行以下代码:


data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))
data_2014_flat.take(10)

它将会产生如下结果:

您可以将此结果与先前生成data_2014_2的命令的结果进行比较。另请注意,如前所述,当您需要解析输入时,

.flatMap(...)

方法可用于过滤掉一些格式错误的记录。内在原理就是,

.flatMap(...)

方法将每一行视为一个列表,然后简单地将所有记录添加到一起; 通过传递一个空列表,丢弃格式错误的记录。

The .distinct(...) transformation

此方法返回指定列中的不同值的列表。 如果您想了解数据集或对其进行验证,这非常有用。 让我们检查性别列是否仅包含男和女; 这将验证我们是否正确解析了数据集。 我们运行以下代码:


distinct_gender = data_from_file_conv.map(
    lambda row: row[5]).distinct()
distinct_gender.collect()

此代码将生成以下输出:

首先,我们只提取包含性别的列。 接下来,我们使用

.distinct()

方法仅选择列表中的不同的值。 最后,我们使用

.collect()

方法将返回值输出。
请注意,这是一种代价高的方法,应该谨慎使用,并且只在必要时使用,因为它会shuffles the data around。

The .sample(...) transformation

.sample(...)方法返回数据集中的随机样本。 第一个参数表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,第二个参数确定抽样率,第三个参数是伪随机数生成器的种子:


fraction = 0.1
data_sample = data_from_file_conv.sample(False, fraction, 666)

在此示例中,我们从原始数据集中选择了10%的随机样本。 为了确认这一点,让我们打印数据集的大小:


print('Original dataset: {0}, sample: {1}'\
.format(data_from_file_conv.count(), data_sample.count()))

结果如下:

The .leftOuterJoin(...) transformation

.leftOuterJoin(...),就像在SQL中一样,根据在两个数据集中找到的值连接两个RDD,并从左RDD返回记录,并将右侧记录附加在两个RDD相匹配的位置:


rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])
rdd3 = rdd1.leftOuterJoin(rdd2)

在rdd3上运行

.collect(...)

将产生以下内容:

这是另一种代价较高方法,应该谨慎使用,并且只在必要时使用,因为它 shuffles the data around会造成性能损失。
你在这里看到的是来自RDD rdd1的所有元素以及来自RDD rdd2的相应值。 如您所见,值“a”在rdd3中显示两次,“a”在RDD rdd2中显示两次。来自rdd1的值b仅显示一次,并与来自rdd2的值“6”连接。 有两个地方值得注意:rdd1中的值'c'在rdd2中没有相应的键,因此返回的元组中的值显示为None,并且,由于我们执行左外连接,因此来自 rdd2的'd'消失了,和预测的一致。
如果我们使用

.join(...)

方法,我们就只能得到'a'和'b'的值,因为只有这两个值在这两个RDD之间相交。 运行以下代码:


rdd4 = rdd1.join(rdd2)
rdd4.collect()

它将产生以下输出:

另一个有用的方法是

.intersection(...)

,它返回两个RDD中相同的记录。 执行以下代码:


rdd5 = rdd1.intersection(rdd2)
rdd5.collect()

结果为:

The .repartition(...) transformation

重新分区数据集会更改数据集分区的分区数。 这个功能应该谨慎使用,只有在真正需要的时候才能使用,因为它会 shuffles the data around,这实际上会对性能方面造成显着影响:


rdd1 = rdd1.repartition(4)
len(rdd1.glom().collect())

上面的代码打印出4,即为新的分区数。
与.collect()相比,.glom()方法生成一个列表,其中每个元素是指定分区中的数据集的所有元素的一个列表; 返回的主列表包含与分区数一样多的元素个数。

Actions*

与transformtations相比,actions在数据集上执行计划好的task;一旦你完成转换你的数据,就可以执行transformtations。这可能不包含任何transformtation操作(例如,.take(n)将只返回RDD中的n条记录,即使您没有对其进行任何transformtations)又可能是需要执行整个transformtation链。

The .take(...) method

这可以说是我们用过的最有用的方法(就像.map(...)方法一样)。 该方法优先于.collect(...),因为它只返回单个数据分区中的前m行,而不是像 .collect(...),它返回整个RDD。 处理大型数据集时,这一点尤为重要:


data_first = data_from_file_conv.take(1)

如果你想要一些随机化的记录,你可以使用.takeSample(...)代替,它有三个参数:首先是是否是有放回抽样,第二个是指定要返回的记录数,第三个是随机数发生器的种子:


data_take_sampled = data_from_file_conv.takeSample(False, 1, 667)

The .collect(...) method

此方法将RDD的所有元素返回给驱动程序。 由于我们刚刚对此提出了警告,我们不会在此重复。

The .reduce(...) method

.reduce(...)方法使用指定的方法减少RDD的元素。
您可以使用它来汇总RDD的元素:


rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)

总和的结果为15。
我们首先使用

.map(...)

转换创建rdd1的所有值的列表,然后使用

.reduce(...)

方法来处理结果。 每个分区上的

reduce(...)

方法运行求和方法(此处表示为lambda)并将总和返回到发生最终聚合的驱动程序节点。
这里需要注意一点。 作为reducer传递的函数需要是关联的,也就是说,当元素的顺序改变时,结果不会改变,并且可交换,即改变操作的顺序也不会改变结果。
关联性规则的例子是(5 + 2)+ 3 = 5 +(2 + 3),并且可交换的是5 + 2 + 3 = 3 + 2 + 5.因此,你需要注意你传递给reducer的是哪些函数 。
如果忽略上述规则,可能会遇到bug(假设您的代码完全运行)。 例如,假设我们有以下RDD(只有一个分区!):


data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 1)

如果我们要以我们希望的方式,将当前结果除以后续结果以reduce数据,我们期望值为10:


works = data_reduce.reduce(lambda x, y: x / y)

但是,如果要将数据分区为三个分区,结果将是错误的:


data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 3)
data_reduce.reduce(lambda x, y: x / y)

产生的结果为0.004
.reduceByKey(...)方法的工作方式与.reduce(...)方法类似,但它逐个键地执行减少:


data_key = sc.parallelize(
    [('a', 4), ('b', 3), ('c', 2), ('a', 8), ('d', 2), ('b', 1), ('d', 3)], 4)
data_key.reduceByKey(lambda x, y: x + y).collect()

上面代码产生以下结果:

The .count(...) method

.count(...)方法计算RDD中的元素总数。 使用以下代码:


data_reduce.count()

此代码产生的结果为6,即:data_reduce RDD中的确切元素总数。

.count(...)

方法产生与以下方法相同的结果,但不需要将整个数据集移动到驱动程序:


len(data_reduce.collect()) # WRONG -- DON'T DO THIS!

如果数据集采用键值形式,则可以使用

.countByKey()

方法获取不同键的计数。 运行以下代码:


data_key.countByKey().items()

这行代码会产生如下结果:

The .saveAsTextFile(...) method

顾名思义,

.saveAsTextFile(...)

将RDD保存到文本文件:每个分区到一个单独的文件夹:


data_key.saveAsTextFile(
    '/Users/drabast/Documents/PySpark_Data/data_key.txt')

要重新读该文件,需要解析它,因为所有行都被视为字符串:


def parseInput(row):
    import re
    pattern = re.compile(r'\(\'([a-z])\', ([0-9])\)')
    row_split = pattern.split(row)
    return (row_split[1], int(row_split[2]))

data_key_reread = sc \
    .textFile(
        '/Users/drabast/Documents/PySpark_Data/data_key.txt') \
    .map(parseInput)
    data_key_reread.collect()

读取的键列表与我们最初的键匹配:

The .foreach(...) method

这是一种以迭代方式将相同的函数应用于RDD的每个元素的方法; 与

.map(..)

相比,

.foreach(...)

方法以逐个方式将defned函数应用于每个记录。 当您想要将数据保存到PySpark本身不支持的数据库时,它非常有用。
在这里,我们将使用它来打印所有存储在data_key RDD中的记录(到CLI - 而不是Jupyter Notebook):


def f(x):
    print(x)
data_key.foreach(f)

如果您现在跳转到CLI,您应该看到打印出的所有记录。 请注意,每次顺序有可能不同。

Summary

RDD是Spark的支柱; 这种 无模式数据结构(schema-less data structures) 是我们将在Spark中处理的最基本的数据结构。
在本章中,我们介绍了通过

.parallelize(...)

方法以及从文本文件中读取数据来从文本文件创建RDD的方法。 此外,还显示了处理非结构化数据的一些方法。
Spark中的Transformations 是惰性的 ——它们仅在调用action时应用。 在本章中,我们讨论并介绍了最常用的transformations
和actions; ; PySpark文档包含更多方法 http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD 。
Scala和Python RDD之间的一个主要区别是速度:Python RDD可能比它们的Scala速度慢得多
在下一章中,我们介绍一种数据结构——DataFrames,该结构使PySpark的应用程序与Scala编写的应用程序具有相同的表现。

相关文章