如何通过几个超大会话稳定spark流应用程序?

s4chpxco  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(138)

我正在运行一个基于mapwithstate数据流函数的spark流应用程序。应用程序根据记录中的会话id字段将输入记录转换为会话。
会话就是具有相同id的所有记录。然后我在会话级别执行一些分析,以找到异常分数。
我无法稳定我的应用程序,因为在很长一段时间(超过1小时)内,每个批处理的会话数量越来越大。我的理解是,单个会话(键值对)总是由spark中的单个核心处理。我想知道我是否弄错了,是否有解决方案来缓解这个问题并使流应用程序稳定。
我正在Yarn上使用hadoop2.7.2和spark1.6.1。更改批处理时间、阻塞间隔、分区数、执行器数和执行器资源并不能解决这个问题,因为单个任务使应用程序总是阻塞。然而,过滤那些超长会话解决了这个问题。
下面是我正在使用的updatestate函数的代码:

val updateState = (batchTime: Time, key: String, value: Option[scala.collection.Map[String,Any]], state: State[Seq[scala.collection.Map[String,Any]]]) => {
    val session = Seq(value.getOrElse(scala.collection.Map[String,Any]())) ++ state.getOption.getOrElse(Seq[scala.collection.Map[String,Any]]())
    if (state.isTimingOut()) {
      Option(null)
    } else {
      state.update(session)
      Some((key,value,session))
    }
  }

还有Map呼叫:

def updateStreamingState(inputDstream:DStream[scala.collection.Map[String,Any]]): DStream[(String,Option[scala.collection.Map[String,Any]], Seq[scala.collection.Map[String,Any]])] ={//MapWithStateDStream[(String,Option[scala.collection.Map[String,Any]], Seq[scala.collection.Map[String,Any]])] = {
    val spec = StateSpec.function(updateState)
    spec.timeout(Duration(sessionTimeout))
    spec.numPartitions(192) 
    inputDstream.map(ds => (ds(sessionizationFieldName).toString, ds)).mapWithState(spec)
  }

最后,我将为每个数据流应用一个特征计算会话,定义如下:

def computeSessionFeatures(sessionId:String,sessionRecords: Seq[scala.collection.Map[String,Any]]): Session = {  
    val features = Functions.getSessionFeatures(sessionizationFeatures,recordFeatures,sessionRecords)
    val resultSession = new Session(sessionId,sessionizationFieldName,sessionRecords)
    resultSession.features = features
    return resultSession
  }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题