pyspark 在Apache Spark中,有没有一种方法可以强制DataFrame在特定节点上执行?

euoag5mw  于 5个月前  发布在  Spark
关注(0)|答案(1)|浏览(47)

首先,让我描述一下我的设置。我有两台通过以太网连接的PC。PC A执行主节点和工作节点功能,而PC B仅作为工作节点运行。由于某些限制,我不能使用HDFS,HBase或数据库等分布式存储系统。因此,我需要直接从本地文件夹(/root/data)创建DataFrames,其中A和B都有。
A在/root/data中有从1.txt到2000.txt的文件,而B在/root/data中有从2001.txt到4000.txt的文件。我的任务涉及使用Pandas_udf计算每个文本文件中出现的字符。生成的DataFrame应该由4000行组成,字符计数作为列值。重要的是,文件1到2000.txt必须在A上处理,而文件2001到4000.txt在B。下面是我的代码。

spark = SparkSession.builder.config(conf=conf).appName("wordcount").getOrCreate()

file_paths = [f"/root/data/{i}.txt" for i in range(1, 4001)]

data = [(path,) for path in file_paths]
df = spark.createDataFrame(data, ["path"])

df = df.withColumn("count", my_udf_wordCount(df['path']))

save_path = "/root/data/result"
df.write.format('com.databricks.spark.csv') \
        .mode('overwrite') \
        .option("header", "true") \
        .save(save_path)

@pandas_udf(IntegerType())
def my_udf_wordCount(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for path_series in iterator:
        for path in path_series:
            file_path = path.strip()
            with open(file_path, 'r') as file:
                text = file.read()
                words = text.split()
                word_count = len(words)
                yield pd.Series(word_count)

字符串
当然,如果我像这样提交代码,它会导致错误,因为对1到4000.txt的处理会混合到节点A和B中。我会遇到错误,因为4000个请求会在A和B之间混合,(例如,发送到节点A的/root/data/2001 - 4000.txt路径之一或发送到B的1 - 2001.txt路径)。x1c 0d1x
我想要的是基于“path”列将任务分发到节点,该列对应于4000个DataFrame中的每个DataFrame中的文件。具有“/root/data/2.txt”的任务应该分发到节点A,而具有“/root/data/3000.txt”的任务应该发送到节点B。如何修改代码来实现这一点?
此外,从1到4000的任务都应该从单个DataFrame执行,并且任务必须作为节点A和节点B的集群提交。没有单独的选项可以为节点A或节点B提交作业。
如何将任务分发到DataFrame中“Path”对应的路径上存在文件的节点?

r6vfmomb

r6vfmomb1#

尝试通过向Scala的SparkContext.makeRDD()函数传递(file_name,node_host)元组来创建RDD,并使用RDD.mapPartitions()函数处理文件。

更新:我无法让sc._jsc.makeRDD()在PySpark中工作:它失败了“Py 4JException:Method makeRDD([class java.util.ArrayList])does not exist”错误。请尝试让它工作或自己用其他方法创建ParallelCollectionRDD。如果失败了,我建议如果可以的话使用Scala API采取相同的方法;它是Spark中的主要语言,拥有最完整的API。

# processes a single file, returns an iterator with a single (path, count) tuple.
def word_count(iterator):
    path = next(iterator)
    with open(path) as f:
       ...

# creates a partition for each row/path and assignes a preferred location to it. 
count_by_path = sc._jsc.makeRDD([("/root/data/1.txt", ["host_1"]), ... , ("/root/data/4000.txt", ["host_2"])])
        .mapPartitions(word_count)

spark.createDataFrame(count_by_path, ['path', 'count'])
    .write
    . ...

字符串
请参阅下面的资源了解更多信息。

相关问题