我正在运行一个基于mapwithstate数据流函数的spark流应用程序。应用程序根据记录中的会话id字段将输入记录转换为会话。
会话就是具有相同id的所有记录。然后我在会话级别执行一些分析,以找到异常分数。
我无法稳定我的应用程序,因为在很长一段时间(超过1小时)内,每个批处理的会话数量越来越大。我的理解是,单个会话(键值对)总是由spark中的单个核心处理。我想知道我是否弄错了,是否有解决方案来缓解这个问题并使流应用程序稳定。
我正在Yarn上使用hadoop2.7.2和spark1.6.1。更改批处理时间、阻塞间隔、分区数、执行器数和执行器资源并不能解决这个问题,因为单个任务使应用程序总是阻塞。然而,过滤那些超长会话解决了这个问题。
下面是我正在使用的updatestate函数的代码:
val updateState = (batchTime: Time, key: String, value: Option[scala.collection.Map[String,Any]], state: State[Seq[scala.collection.Map[String,Any]]]) => {
val session = Seq(value.getOrElse(scala.collection.Map[String,Any]())) ++ state.getOption.getOrElse(Seq[scala.collection.Map[String,Any]]())
if (state.isTimingOut()) {
Option(null)
} else {
state.update(session)
Some((key,value,session))
}
}
还有Map呼叫:
def updateStreamingState(inputDstream:DStream[scala.collection.Map[String,Any]]): DStream[(String,Option[scala.collection.Map[String,Any]], Seq[scala.collection.Map[String,Any]])] ={//MapWithStateDStream[(String,Option[scala.collection.Map[String,Any]], Seq[scala.collection.Map[String,Any]])] = {
val spec = StateSpec.function(updateState)
spec.timeout(Duration(sessionTimeout))
spec.numPartitions(192)
inputDstream.map(ds => (ds(sessionizationFieldName).toString, ds)).mapWithState(spec)
}
最后,我将为每个数据流应用一个特征计算会话,定义如下:
def computeSessionFeatures(sessionId:String,sessionRecords: Seq[scala.collection.Map[String,Any]]): Session = {
val features = Functions.getSessionFeatures(sessionizationFeatures,recordFeatures,sessionRecords)
val resultSession = new Session(sessionId,sessionizationFieldName,sessionRecords)
resultSession.features = features
return resultSession
}
暂无答案!
目前还没有任何答案,快来回答吧!