我在用flink做数据聚合代码,比如
.keyby()
.timewidow()
.aggregate(new AggCount(), new AggWindow())
class AggCountNew() extends AggregateFunction[((String, String, Int), Message), OutMessage, OutMessage] {
private val logger = LoggerFactory.getLogger(getClass)
override def createAccumulator(): OutMessage = OutMessage(null, null, 0, new mutable.OpenHashMap[String, Long](128), new mutable.OpenHashMap[String, Long](32768), Set(), Set())
override def add(value: ((String, String, Int), OutMessage), accumulator: OutMessage): OutMessage = {
val dataMap= accumulator.dataMap
dataMap(value._2.device) = dataMap.getOrElse[Long](value._2.device, 0) + 1
accumulator
}
override def getResult(accumulator: HbaseOneDayMessage): HbaseOneDayMessage = {
return accumulator
}
override def merge(a: Message, b: Message): HbaseOneDayMessage = {
a
}
}
当datamap有超过1000个密钥时,吞吐量很低
暂无答案!
目前还没有任何答案,快来回答吧!