用另一个流丰富一个流

nsc4cvqm  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(205)

我想使用apache flink实现以下功能。我有一个主流,必须丰富的另一个流的数据。这个主流有属性为“site”和“timestamp”的元素。另一个流(我们称之为countrystream)具有属性“site”和“country”。countrystream应该跟踪站点使用的最新国家/地区。例如,如果 ("klm.com", "netherlands") 先到的是元组,过了一段时间 ("klm.com", "france") 到了,那么“klm.com”应该指向“法国”(因为这是后者)。所以,它应该保持一种状态。假设一个元组(“klm.com”,100)到达了主流。现在应该充实到 ("klm.com", 100, "france") . 如果在countrystream中找不到某个站点,则应该用“?”来充实它。比如说, ("stackoverflow.com", 150, "?") . 我该如何存档?

3yhwsihp

3yhwsihp1#

我找到了解决办法(花了我一些时间)。这有效率吗?可以改进吗?这是否意味着我的迭代流不能有检查点?

val env = StreamExecutionEnvironment.getExecutionEnvironment

val mainStream = env.fromElements("a", "a", "b", "a", "a", "b", "b", "a", "c", "b", "a", "c")
val infoStream = env.fromElements((1, "a", "It is F"), (2, "b", "It is B"), (3, "c", "It is C"), (4, "a", "Whoops, it is A"))
        .iterate(
            iteration => {
                (iteration, iteration)
            }
        )

mainStream
    .coGroup(infoStream)
        .where[String]((x: String) => x)
        .equalTo(_._2)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(1))) {
            (first: Iterator[String], second: Iterator[(Int, String, String)], out: Collector[(String, String)]) => {
                first.foreach((key: String) => {
                        val matchingRecords = second
                            .filter(_._2 == key)
                        if (matchingRecords.nonEmpty) {
                            val matchingRecord = matchingRecords.maxBy(_._1)
                            out.collect((matchingRecord._2, matchingRecord._3))
                        }
                    }
                )
            }
        }
    .print()

env.execute("proof_of_concept")

相关问题