minby不返回任何结果

3mpgtkmj  于 2021-06-24  发布在  Storm
关注(0)|答案(0)|浏览(110)

我已经建立了从Kafka数据检索风暴拓扑。我想构建一个聚合,其中一个字段上的每个批的最小计数。我尝试在流上使用maxby函数,但是它不显示任何结果,尽管数据在系统中流动,输出函数与其他聚合一起工作。如何以不同的方式实现,或者在当前的实现中可以修复哪些内容?
以下是我当前的实现:

val tridentTopology = new TridentTopology()

    val stream = tridentTopology.newStream("kafka_spout",
      new KafkaTridentSpoutOpaque(spoutConfig))
      .map(new ParserMapFunction, new Fields("created_at", "id", "text", "source", "timestamp_ms",
        "user.id", "user.name", "user.location", "user.url", "user.description", "user.followers_count",
        "user.friends_count", "user.favorite_count", "user.lang", "entities.hashtags"))
      .maxBy("user.followers_count")
      .map(new OutputFunction)

我的自定义输出函数:

class OutputFunction extends MapFunction{

  override def execute(input: TridentTuple): Values = {
    val values = input.getValues.asScala.toList.toString
    println(s"TWEET: $values")
    new Values(values)
  }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题