spark-dataframe-to-dict与set

ajsxfq5m  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(519)

我的sparkDataframe的输出有问题。文件的大小可以从几gb到50 gb以上

SparkDF = spark.read.format("csv").options(header="true", delimiter="|", maxColumns="100000").load(my_file.csv)

这给了我正确的数据框,我想要的。但根据要求,我需要将列名和集合中与该键相关的所有值作为键。
例如:

df = {'col1': ['1', '2', '3', '4'], 'col2': ['Jean', 'Cecil', 'Annie', 'Maurice'], 'col3': ['test', 'aaa', 'bbb', 'ccc','ddd']}
df = pd.DataFrame(data=d)

最后应该给我:

{'col1': {'1', '2', '3', '4'},'col2': {'Jean', 'Cecil', 'Annie', 'Maurice'},'col3': {'test', 'aaa', 'bbb', 'ccc','ddd'}

我实现了以下功能:

def columnDict(dataFrame):
    colDict = dict(zip(dataFrame.schema.names, zip(*dataFrame.collect())))
    return colDict if colDict else dict.fromkeys(dataFrame.schema.names, ())

但是,它返回了一个dict,其中一个元组作为值,而不是我所需要的集合。
我想要么把字典中的元组转换成一个集合,要么直接把字典作为一个集合作为我函数的输出。
编辑:
完整要求:
除了上面提到的字典之外,还有一本包含类似数据的字典,供检查。
意味着我加载到spark df并转换为字典的文件包含必须对照另一个字典检查的数据。
我们的目标是对照check字典检查dict(加载的文件)中的每个键,首先检查它们是否存在,然后检查它们的值是否是check值的子集。
如果我把check数据加载到一个dataframe中,它会是这样的:(注意,我可能无法改变它是dict的事实,我将看看是否可以从dict修改为spark df)

df = {'KeyName': ['col1', 'col2', 'col3'], 'ValueName': ['1, 2, 3, 4', 'Jean, Cecil, Annie, Maurice, Annie, Maurice', 'test, aaa, bbb, ccc,ddd,eee']}
df = pd.DataFrame(data=df)
print(df)

      KeyName                                    ValueName
0    col1                                   1, 2, 3, 4
1    col2  Jean, Cecil, Annie, Maurice, Annie, Maurice
2    col3                  test, aaa, bbb, ccc,ddd,eee

所以最后,我的文件中的数据应该是一行的子集,该行与我的dict具有相同的键名。
我有点被遗留代码卡住了,我正努力将其迁移到spark databricks。
编辑2:希望这能奏效。我上传了两个文件和修改过的数据:https://filebin.net/1rnnvqn2b0ww7qc8
fakedata.csv包含我用上述代码加载的数据,必须是第二个代码的子集。fakedatachecker.csv包含实际完整可用集的数据
编辑3:忘记添加fakedata中的所有空字符串以及fakedatachecker中的空字符串都不应考虑在内

eoxn13cs

eoxn13cs1#

所以我不确定我是否完全理解了你的用例。但是让我们试着写一个初稿。
据我所知,你有一个包含所有数据的第一个文件。以及一个文件检查器,其中的键必须位于data foreach列中。并且应该过滤掉数据中存在的其他键。
这可以通过初始数据和数据检查器之间的内部连接来完成。如果数据检查器中没有太多的键,spark应该自动广播数据检查器Dataframe以优化连接。
这里是代码的初稿,这还不是完全自动化的等待您的第一个问题和意见。

首先,让我们导入所需的函数和数据:

from pyspark.sql.functions import col
from pyspark.sql import Window

spark.sql("set spark.sql.caseSensitive=true")

data = (
    spark
    .read
    .format("csv")
    .options(header=True, delimiter="|", maxColumns="100000")
    .load("FakeData.csv")
    .na.drop()
)

data_checker = (
    spark
    .read
    .format("csv")
    .options(header=True, delimiter="|", maxColumns="100000")
    .load("FakeDataChecker.csv")
    .na.drop(subset=["ValueName"])
)

我们根据需要删除空值,您可以使用 subset 关键字

然后让我们准备连接Dataframe

data_checker_date = data_checker.filter(col("KeyName") == "DATE").select(col("ValueName").alias("date"))
data_checker_location = data_checker.filter(col("KeyName") == "LOCATION").select(col("ValueName").alias("location"))
data_checker_location_id = data_checker.filter(col("KeyName") == "LOCATIONID").select(col("ValueName").alias("locationid"))
data_checker_type = data_checker.filter(col("KeyName") == "TYPE").select(col("ValueName").alias("type"))

我们需要在连接期间对列使用别名,以避免重复的列名。我们在删除列时指定区分大小写的选项,这样就不会删除大写的初始列。

最后,我们通过内部连接过滤掉数据检查器中不存在的所有键:

(
    data
    .join(data_checker_date, data.DATE == data_checker_date.date)
    .join(data_checker_location, data.LOCATION == data_checker_location.location)
    .join(data_checker_location_id, data.LOCATIONID == data_checker_location_id.locationid)
    .join(data_checker_type, data.TYPE == data_checker_type.type)
    .drop("date", "location", "locationid", "type")
    .show()
)

在接下来的步骤中,我们可以通过检索列的不同键名(例如:“date”、“location”等)来实现自动化,这样以后就不必复制粘贴代码4次或x次。
在以下方面:

from pyspark.sql.functions import collect_set

distinct_keynames = data_checker.select(collect_set('KeyName').alias('KeyName')).first()['KeyName']

for keyname in distinct_keynames:
    etc... implement the logic of chaining joins

相关问题