首先,让我描述一下我的设置。我有两台通过以太网连接的PC。PC A执行主节点和工作节点功能,而PC B仅作为工作节点运行。由于某些限制,我不能使用HDFS,HBase或数据库等分布式存储系统。因此,我需要直接从本地文件夹(/root/data)创建DataFrames,其中A和B都有。
A在/root/data中有从1.txt到2000.txt的文件,而B在/root/data中有从2001.txt到4000.txt的文件。我的任务涉及使用Pandas_udf计算每个文本文件中出现的字符。生成的DataFrame应该由4000行组成,字符计数作为列值。重要的是,文件1到2000.txt必须在A上处理,而文件2001到4000.txt在B。下面是我的代码。
spark = SparkSession.builder.config(conf=conf).appName("wordcount").getOrCreate()
file_paths = [f"/root/data/{i}.txt" for i in range(1, 4001)]
data = [(path,) for path in file_paths]
df = spark.createDataFrame(data, ["path"])
df = df.withColumn("count", my_udf_wordCount(df['path']))
save_path = "/root/data/result"
df.write.format('com.databricks.spark.csv') \
.mode('overwrite') \
.option("header", "true") \
.save(save_path)
@pandas_udf(IntegerType())
def my_udf_wordCount(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
for path_series in iterator:
for path in path_series:
file_path = path.strip()
with open(file_path, 'r') as file:
text = file.read()
words = text.split()
word_count = len(words)
yield pd.Series(word_count)
字符串
当然,如果我像这样提交代码,它会导致错误,因为对1到4000.txt的处理会混合到节点A和B中。我会遇到错误,因为4000个请求会在A和B之间混合,(例如,发送到节点A的/root/data/2001 - 4000.txt路径之一或发送到B的1 - 2001.txt路径)。x1c 0d1x
我想要的是基于“path”列将任务分发到节点,该列对应于4000个DataFrame中的每个DataFrame中的文件。具有“/root/data/2.txt”的任务应该分发到节点A,而具有“/root/data/3000.txt”的任务应该发送到节点B。如何修改代码来实现这一点?
此外,从1到4000的任务都应该从单个DataFrame执行,并且任务必须作为节点A和节点B的集群提交。没有单独的选项可以为节点A或节点B提交作业。
如何将任务分发到DataFrame中“Path”对应的路径上存在文件的节点?
1条答案
按热度按时间r6vfmomb1#
尝试通过向Scala的SparkContext.makeRDD()函数传递(file_name,node_host)元组来创建RDD,并使用RDD.mapPartitions()函数处理文件。
更新:我无法让
sc._jsc.makeRDD()
在PySpark中工作:它失败了“Py 4JException:Method makeRDD([class java.util.ArrayList])does not exist”错误。请尝试让它工作或自己用其他方法创建ParallelCollectionRDD。如果失败了,我建议如果可以的话使用Scala API采取相同的方法;它是Spark中的主要语言,拥有最完整的API。字符串
请参阅下面的资源了解更多信息。