带有mapgroupstate的结构化流,导致gc和性能问题

w1jd8yoj  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(340)

在我们的应用程序中,我们使用mapgroupwithstate的结构化流,并结合kafka的读取。
启动应用程序后,在最初的批处理期间,如果我看到kafka lastprogress几乎每秒65k,则性能良好。几批之后,性能完全降低到每秒2000个左右。
在mapgroupwithstate函数中,基本上是对状态存储中的值进行更新和比较(下面提供了代码片段)。
Kafka偏移量-100000
启动应用程序后,在最初的批处理期间,如果我看到kafka lastprogress几乎每秒65k,则性能良好。几批之后,性能完全降低到每秒2000个左右。
如果我们看到其中一个executor的线程转储,那么除了sparkui的阻塞线程外,没有可疑的


其中一个执行者的gc统计如下
gc后没有发现太大的差异

代码段

case class MonitoringEvent(InternalID: String, monStartTimestamp: Timestamp, EndTimestamp: Timestamp, Stream: String, ParentID: Option[String])

val df = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", Config.uatKafkaUrl)
      .option("subscribe", Config.interBranchInputTopic)
      .option("startingOffsets", "earliest")
      .option("failOnDataLoss", "true")
      .option("maxOffsetsPerTrigger", "100000")
      .option("request.required.acks", "all")
      .load()
      .selectExpr("CAST(value AS STRING)")    

        val me: Dataset[MonitoringEvent] = df.select(from_json($"value", schema).as("data")).select($"data.*").as[MonitoringEvent]

        val IB =   me.groupByKey(x => (x.ParentID.getOrElse(x.InternalID)))
        .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(IBTransformer.mappingFunctionIB _)
        .flatMap(x => x)

    val IBStream = IB
      .select(to_json(struct($"*")).as("value"), $"InternalID".as("key"))
      .writeStream
      .format("kafka")
      .queryName("InterBranch_Events_KafkaWriter")
      .option("kafka.bootstrap.servers", Config.uatKafkaUrl)
      .option("topic", Config.interBranchTopicComplete)
      .option("checkpointLocation", Config.interBranchCheckPointDir)
      .outputMode("update")
      .start()

object IBTransformer extends Serializable {

  case class IBStateStore(InternalID: String, monStartTimestamp: Timestamp)

  def mappingFunctionIB(intrKey: String, intrValue: Iterator[MonitoringEvent], intrState: GroupState[IBStateStore]): Seq[MonitoringEvent] = {

    try {
      if (intrState.hasTimedOut) {
            intrState.remove()
               Seq.empty
      } else {

        val events = intrValue.toSeq
        if (events.map(_.Status).contains(Started)) {

           val tmp = events.filter(x => (x.Status == Started && x.InternalID == intrKey)).head
           val toStore = IBStateStore(tmp.InternalID, tmp.monStartTimestamp)
            intrState.update(toStore)
            intrState.setTimeoutDuration(1200000) 
        }
        val IB = events.filter(_.ParentID.isDefined)
        if (intrState.exists && IB.nonEmpty) {
          val startEvent = intrState.get
          val IBUpdate = IB.map {x => x.copy(InternalID = startEvent.InternalID, monStartTimestamp = startEvent.monStartTimestamp) }
          IBUpdate.foreach(id => intrState.update((IBStateStore(id.InternalID, id.monStartTimestamp)))) // updates the state with new IDs
          IBUpdate
        } else {
          Seq.empty
        }
      }
    }

    catch
    .
    .
    .       
  }
}

使用的执行程序数-8执行程序内存-8g驱动程序内存-8g
我在spark提交脚本中提供的java选项和内存

--executor-memory 8G \
 --executor-cores 8 \
 --num-executors 4 \
 --driver-memory 8G \
--driver-java-options "-Dsun.security.krb5.debug=true -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=UTC -Dconfig.file=configIB.conf -Dlog4j.configuration=IBprocessor.log4j.properties" \

尝试在java选项中使用g1gc,但没有改进。我们持有的钥匙也小于提供的尺寸,所以不确定哪里出了问题。
有什么改进性能和消除gc问题的建议吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题