flink:如何将不推荐使用的fold转换为aggregate?

yvgpqqbh  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(387)

我遵循flink的快速入门示例:监视wikipedia编辑流。
这个例子是用java编写的,我用scala实现它,如下所示:

/**
 * Wikipedia Edit Monitoring
 */
object WikipediaEditMonitoring {
  def main(args: Array[String]) {
    // set up the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)

    val result = edits.keyBy( _.getUser )
      .timeWindow(Time.seconds(5))
      .fold(("", 0L)) {
        (acc: (String, Long), event: WikipediaEditEvent) => {
          (event.getUser, acc._2 + event.getByteDiff)
        }
      }

    result.print

    // execute program
    env.execute("Wikipedia Edit Monitoring")
  }
}

然而 fold flink中的函数已被弃用,并且 aggregate 建议使用函数。

但是我没有找到关于如何转换不推荐的 foldaggregrate .
你知道怎么做吗?可能不仅仅是申请 aggregrate .
更新
我还有另一个实现,如下所示:

/**
 * Wikipedia Edit Monitoring
 */
object WikipediaEditMonitoring {
  def main(args: Array[String]) {
    // set up the execution environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)

    val result = edits
      .map( e => UserWithEdits(e.getUser, e.getByteDiff) )
      .keyBy( "user" )
      .timeWindow(Time.seconds(5))
      .sum("edits")

    result.print

    // execute program
    env.execute("Wikipedia Edit Monitoring")
  }

  /**Data type for words with count */
  case class UserWithEdits(user: String, edits: Long)
}

我也想知道如何使用自定义的实现 AggregateFunction .
更新
我遵循了以下文档:aggregatefunction,但有以下问题:
在接口的源代码中 AggregateFunction 对于1.3版,您将看到 add 真的回来了 void :

void add(IN value, ACC accumulator);

但是对于版本1.4 AggregateFunction ,正在返回:

ACC add(IN value, ACC accumulator);

我该怎么处理?
我使用的flink版本是 1.3.2 这个版本的文档没有 AggregateFunction ,但artifactory中还没有1.4版。

cwtwac6a

cwtwac6a1#

你会发现一些文档 AggregateFunction 在Flink1.4文档中,包括一个示例。
1.3.2中包含的版本仅限于与可变累加器类型一起使用,其中add操作修改累加器。Flink1.4已经修复了这个问题,但是还没有发布。

13z8s7eq

13z8s7eq2#

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource}

class SumAggregate extends AggregateFunction[WikipediaEditEvent, (String, Int), (String, Int)] {
  override def createAccumulator() = ("", 0)

  override def add(value: WikipediaEditEvent, accumulator: (String, Int)) = (value.getUser, value.getByteDiff + accumulator._2)

  override def getResult(accumulator: (String, Int)) = accumulator

  override def merge(a: (String, Int), b: (String, Int)) = (a._1, a._2 + b._2)
}

object WikipediaAnalysis extends App {
  val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  val edits: DataStream[WikipediaEditEvent] = see.addSource(new WikipediaEditsSource())

  val result: DataStream[(String, Int)] = edits
    .keyBy(_.getUser)
    .timeWindow(Time.seconds(5))
    .aggregate(new SumAggregate)
//    .fold(("", 0))((acc, event) => (event.getUser, acc._2 + event.getByteDiff))
  result.print()

  result.map(_.toString()).addSink(new FlinkKafkaProducer08[String]("localhost:9092", "wiki-result", new SimpleStringSchema()))
  see.execute("Wikipedia User Edit Volume")
}

相关问题