map转换性能sparkDataframevs rdd

gg0vcinb  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(336)

我有一个四节点hadoop集群(mapr),每个集群有40gb内存。我需要在大数据集(5亿行)的一个字段上“应用”一个函数。我的代码流程是从配置单元表中读取数据作为sparkDataframe,并对其中一列应用所需的函数,如下所示:

schema = StructType([StructField("field1", IntegerType(), False), StructField("field2", StringType(), False),StructField("field3", FloatType(), False)])
udfCos = udf(lambda row: function_call(row), schema)
result = SparkDataFrame.withColumn("temp", udfCos(stringArgument))

类似的rdd版本可能如下所示:

result = sparkRDD.map(lambda row: function_call(row))

我想提高这段代码的性能,确保代码以最大的并行度和较低的吞吐量运行--我需要帮助在我的问题上下文中使用spark概念,如“重新分区”“sparkconf中的并行度值”或其他方法。感谢您的帮助。
我的spark启动参数:

MASTER="yarn-client" /opt/mapr/spark/spark-1.6.1/bin/pyspark --num-executors 10 --driver-cores 10 --driver-memory 30g --executor-memory 7g --executor-cores 5 --conf spark.driver.maxResultSize="0" --conf spark.default.parallelism="150"
eulz3vhy

eulz3vhy1#

对于调整应用程序,您需要知道一些事情
1) 您需要监视您的应用程序,看您的集群是否利用不足,您创建的应用程序使用了多少资源
可以使用各种工具进行监控,例如ganglia从ganglia可以找到cpu、内存和网络使用情况。
2) 根据对cpu和内存使用情况的观察,您可以更好地了解应用程序需要什么样的调优
形成你的Spark点
在spark-defaults.conf中
您可以指定需要什么样的序列化,应用程序需要多少驱动程序内存和执行程序内存,甚至可以更改垃圾收集算法。
下面是几个示例,您可以根据自己的需求调整此参数

spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.executor.extraJavaOptions  -XX:MaxPermSize=2G -XX:+UseG1GC
spark.driver.extraJavaOptions    -XX:MaxPermSize=6G -XX:+UseG1GC

有关更多详细信息,请参阅http://spark.apache.org/docs/latest/tuning.html

相关问题