flink aggregatefunction累加器慢

hof1towb  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(211)

我在用flink做数据聚合代码,比如

.keyby()
.timewidow()
.aggregate(new AggCount(), new AggWindow())

class AggCountNew() extends AggregateFunction[((String, String, Int), Message), OutMessage, OutMessage] {

  private val logger = LoggerFactory.getLogger(getClass)

  override def createAccumulator(): OutMessage = OutMessage(null, null, 0, new mutable.OpenHashMap[String, Long](128), new mutable.OpenHashMap[String, Long](32768), Set(), Set())

  override def add(value: ((String, String, Int), OutMessage), accumulator: OutMessage): OutMessage = {
    val dataMap= accumulator.dataMap
    dataMap(value._2.device) = dataMap.getOrElse[Long](value._2.device, 0) + 1
    accumulator
  }

  override def getResult(accumulator: HbaseOneDayMessage): HbaseOneDayMessage = {
    return accumulator
  }

  override def merge(a: Message, b: Message): HbaseOneDayMessage = {
    a
  }
}

当datamap有超过1000个密钥时,吞吐量很低

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题