pyspark流应用程序-完成每个批需要很长时间

tquggr8v  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(269)

我有一个spark流作业,它从kafka(48个分区)读取数据并写入delta表。我看到应用程序的伸缩性非常差。
我想知道这个应用程序的改进范围。
批次间隔-1分钟
spark提交配置

--conf "spark.streaming.dynamicAllocation.debug=true" \
--conf "spark.streaming.dynamicAllocation.delay.rounds=10" \
--conf "spark.streaming.dynamicAllocation.releaseRounds=5" \
--conf "spark.streaming.dynamicAllocation.minExecutors=5" \
--conf "spark.streaming.dynamicAllocation.maxExecutors=10" \
--conf "spark.streaming.dynamicAllocation.executorIdleTimeout=60s" \
--conf "spark.streaming.dynamicAllocation.scalingInterval=60" \
--conf "spark.streaming.dynamicAllocation.scalingUpRatio=1.2" \
--conf "spark.streaming.dynamicAllocation.scalingDownRatio=0.8" \
--conf "spark.streaming.dynamicAllocation.rememberBatchSize=1" \
--conf "spark.streaming.dynamicAllocation.reserveRate=0.2" \
--conf "spark.streaming.backpressure.enabled=True" \
--conf "spark.streaming.kafka.maxRatePerPartition=1000" \
--conf "spark.streaming.backpressure.pid.minRate=10000" \
--conf "spark.shuffle.service.enabled=true"  \
--conf "spark.shuffle.consolidateFiles=true"  \
--conf "spark.shuffle.spill=true"  \
--conf "spark.sql.shuffle.partitions=64" \
--conf "spark.cleaner.ttl=1200" \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"  \
--conf "spark.streaming.unpersist=true"  \
--conf "spark.dynamicAllocation.executorIdleTimeout=60s" \
--conf "spark.dynamicAllocation.cachedExecutorIdleTimeout=120s" \
--conf "spark.driver.memory=3G" \
--conf "spark.driver.cores=2"  \
--conf "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2"  \
--conf "spark.executor.memoryOverhead=2048"  \
--conf "spark.driver.memoryOverhead=1024" \
--conf "spark.default.parallelism=64" \
--conf "spark.eventLog.enabled=true" \
--num-executors=4 \
--executor-memory 4G \
--executor-cores 4 \

pyspark编写和读取代码

df1=spark.read.json(rdd2,schema = endpoints_instance_read_schema)
df1.cache()
df2 = df1.withColumn("natdetected",expr("CASE WHEN trim(natdetected) in ('1','true','True','t') THEN  true WHEN trim(natdetected) in ('0','false','False','f') THEN false ELSE cast(trim(natdetected) as boolean) END AS natdetected")) \
                    .withColumn("proxy",expr("CASE WHEN trim(proxy) in ('1','true','True','t') THEN  true WHEN trim(proxy) in ('0','false','False','f') THEN false ELSE cast(trim(proxy) as boolean)  END AS proxy")) \
                    .withColumn("SIPHTTPProxyTransport",expr("CASE WHEN trim(SIPHTTPProxyTransport) in ('1','true','True','t') THEN  true WHEN trim(SIPHTTPProxyTransport) in ('0','false','False','f') THEN false ELSE cast(trim(SIPHTTPProxyTransport) as boolean)  END AS SIPHTTPProxyTransport")) \
                    .withColumn('creationTime', F.from_unixtime((F.col('creationTime'))/1000, 'yyyy-MM-dd HH:mm:ss.SS').cast("timestamp")) \
                    .withColumn('leaveTime', F.from_unixtime((F.col('leaveTime'))/1000, 'yyyy-MM-dd HH:mm:ss.SS').cast("timestamp"))
df3 = df2.filter(df2.callguid.isNotNull())
df4 = df3.repartition(col("callguid"))

rank_spec = Window.partitionBy(col("callguid")).orderBy(desc("cdctimestamp"))
data_spec =Window.partitionBy(col("callguid")).orderBy(asc("cdctimestamp")).rowsBetween(Window.unboundedPreceding,Window.currentRow)

df5 = df4.withColumnRenamed("callguid", "callguid") \
                                .withColumnRenamed("creationTime", "creation_time") \
                                .withColumnRenamed("DisconnectReason", "disconnect_reason") \
                                .withColumnRenamed("leaveTime", "leave_time") \
                                .withColumnRenamed("localaddress", "local_address") \
                                .withColumnRenamed("meetingUuid", "meeting_uuid") \
                                .withColumnRenamed("mhaddress", "mh_address") \
                                .withColumnRenamed("mixaddress", "mixer_address") \
                                .withColumnRenamed("natdetected", "nat_detected") \
                                .withColumnRenamed("ProxyInfo", "proxy_info") \
                                .withColumnRenamed("SIPHTTPProxyTransport", "sip_proxy_transport") \
                                .withColumnRenamed("audiomedaddress", "audio_med_address") \
                                .withColumnRenamed("ReflectorAddress", "reflector_address") \
                                .withColumnRenamed("contentmedaddress", "content_med_address")

df6 = df5.withColumn('creation_time_var', last("creation_time", True).over(data_spec)) \
                                .withColumn('disconnect_reason_var', last("disconnect_reason", True).over(data_spec)) \
                                .withColumn('leave_time_var', last("leave_time", True).over(data_spec)) \
                                .withColumn('local_address_var', last("local_address", True).over(data_spec)) \
                                .withColumn('meeting_uuid_var', last("meeting_uuid", True).over(data_spec)) \
                                .withColumn('mh_address_var', last("mh_address", True).over(data_spec)) \
                                .withColumn('mixer_address_var', last("mixer_address", True).over(data_spec)) \
                                .withColumn('nat_detected_var', last("nat_detected", True).over(data_spec)) \
                                .withColumn('delta_partition_var', last("delta_partition", True).over(data_spec)) \
                                .withColumn('cdctimestamp_var', last("cdctimestamp", True).over(data_spec)) \
                                .withColumn('proxy_var', last("proxy", True).over(data_spec)) \
                                .withColumn('proxy_info_var', last("proxy_info", True).over(data_spec)) \
                                .withColumn('sip_proxy_transport_var', last("sip_proxy_transport", True).over(data_spec)) \
                                .withColumn('audio_med_address_var', last("audio_med_address", True).over(data_spec)) \
                                .withColumn('reflector_address_var', last("reflector_address", True).over(data_spec)) \
                                .withColumn('content_med_address_var', last("content_med_address", True).over(data_spec)) \
                                .withColumn('rank', row_number().over(rank_spec))

df7 = df6.filter(df6.rank == 1)
DeltaTable.forPath(spark, HDFS_DIR).alias("t").merge(df7.alias("s"), "s.callguid = t.callguid and s.meeting_uuid = t.meeting_uuid and s.delta_partition = t.delta_partition").whenMatchedUpdate( \
                                set = \
                                    { \
                                        "cdctimestamp": coalesce("s.cdctimestamp_var","t.cdctimestamp") \
                                        ,"creation_time": coalesce("s.creation_time_var","t.creation_time") \
                                        ,"disconnect_reason": coalesce("s.disconnect_reason_var","t.disconnect_reason") \
                                        ,"leave_time": coalesce("s.leave_time_var","t.leave_time") \
                                        ,"local_address": coalesce("s.local_address_var","t.local_address") \
                                        ,"mh_address": coalesce("s.mh_address_var","t.mh_address") \
                                        ,"mixer_address": coalesce("s.mixer_address_var","t.mixer_address") \
                                        ,"nat_detected": coalesce("s.nat_detected_var","t.nat_detected") \
                                        ,"proxy": coalesce("s.proxy_var","t.proxy") \
                                        ,"proxy_info": coalesce("s.proxy_info_var","t.proxy_info") \
                                        ,"sip_proxy_transport": coalesce("s.sip_proxy_transport_var","t.sip_proxy_transport") \
                                        ,"audio_med_address": coalesce("s.audio_med_address_var","t.audio_med_address") \
                                        ,"reflector_address": coalesce("s.reflector_address_var","t.reflector_address") \
                                        ,"content_med_address": coalesce("s.content_med_address_var","t.content_med_address") \
                                        }) \
                                        .whenNotMatchedInsert(values = \
                                        { \
                                        "callguid": "s.callguid" \
                                        ,"delta_partition": "s.delta_partition" \
                                        ,"cdctimestamp": "s.cdctimestamp_var" \
                                        ,"creation_time": "s.creation_time_var" \
                                        ,"disconnect_reason": "s.disconnect_reason_var" \
                                        ,"leave_time": "s.leave_time_var" \
                                        ,"local_address": "s.local_address_var" \
                                        ,"meeting_uuid": "s.meeting_uuid_var" \
                                        ,"mh_address": "s.mh_address_var" \
                                        ,"mixer_address": "s.mixer_address_var" \
                                        ,"nat_detected": "s.nat_detected_var" \
                                        ,"proxy": "s.proxy_var" \
                                        ,"proxy_info": "s.proxy_info_var" \
                                        ,"sip_proxy_transport": "s.sip_proxy_transport_var" \
                                        ,"audio_med_address": "s.audio_med_address_var" \
                                        ,"reflector_address": "s.reflector_address_var" \
                                        ,"content_med_address": "s.content_med_address_var" \
                                        } \
                                        ).execute()

以下是spark ui的屏幕截图。在此处输入图像描述在此处输入图像描述

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题