java—如何根据状态更改事件以分布式方式计算flink的状态中有多少“客户机”?我需要有状态的对象

hm2xizp9  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(282)

我正在使用kafka->flink->elasticsearch在java中进行poc项目。
在Kafka上,会产生大量不可预测的事件,从0到数千个事件/秒,比如某个特定的主题。

{"gid":"abcd-8910-2ca4227527f9", "state":"stateA", "timestamp:1465566255, "other unusefull info":"..."}

flink将消耗这些事件,并应每秒钟搜索每个状态中的事件数,例如:

{"stateA":54, "stateB":100, ... "stateJ":34}

我有10个州: [Created, ... , Deleted] 平均寿命为15分钟。状态一秒钟可以改变两次。理论上可以增加新的状态。
为了每时每刻都沉入溪流,我想用Flink的时间窗https://flink.apache.org/news/2015/12/04/introducing-windows.html
问题是我需要有状态的对象,其中包含 guid->previous-state 以及 stateX->count 以便能够在新事件发生时增加/减少计数。
我找到一份关于有状态蒸汽处理的文件草稿https://cwiki.apache.org/confluence/display/flink/stateful+stream+processing
我对flink和流处理是新手,我还没有深入研究flink有状态流处理。在第一阶段,我考虑使用静态对象,但是当启动几个flink示例时,这种方法就行不通了。
我想问你:
你觉得这种方法怎么样?
flink适合这种流处理吗?
你解决这个问题的方法是什么?
另外,我也很欣赏窗口状态流解决方案(或其他解决方案)的一些代码片段。
谢谢,

dy1byipe

dy1byipe1#

像下面这样的怎么样?
它使用15分钟的窗口,之后窗口状态将被清除。它还使用一个自定义触发器,每秒计算一次窗口。对于windowing操作,有一个reducefunction只保留每个guid的最新状态,还有一个windowfunction发出(state,1)元组。然后我们按这个状态键,然后求和。我想这会给你你想要的结果。

val env = StreamExecutionEnvironment.getExecutionEnvironment()
val stream = env.addSource(new FlinkKafkaProducer(...))

val results = stream
  .keyBy(_.guid)
  .timeWindow(Time.minutes(15))
  .trigger(ProcessingTimeTriggerWithPeriodicFirings(1000))
  .apply(
    (e1, e2) => e2,
    (k, w, i, c: Collector[(String, Long)]) => {
      if (i.head != null) c.collect((i.head.state, 1))
    }
  )
  .keyBy(0)
  .timeWindow(Time.seconds(1))
  .sum(1)
  .addSink(new ElasticsearchSink<>(...))

env.execute("Count States")

具有周期性点火的ProcessingTimeTrigger定义如下:

object ProcessingTimeTriggerWithPeriodicFirings {
  def apply(intervalMs: Long) = {
    new ProcessingTimeTriggerWithPeriodicFirings(intervalMs)
  }
}

class ProcessingTimeTriggerWithPeriodicFirings(intervalMs: Long)
  extends Trigger[Event, TimeWindow] {

  private val startTimeDesc =
    new ValueStateDescriptor[Long]("start-time", classOf[Long], 0L)

  override def onElement(element: Event, timestamp: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
    val startTime = ctx.getPartitionedState(startTimeDesc)
    if (startTime.value == 0) {
      startTime.update(window.getStart)
      ctx.registerProcessingTimeTimer(window.getEnd)
      ctx.registerProcessingTimeTimer(System.currentTimeMillis() + intervalMs)
    }
    TriggerResult.CONTINUE
  }

  override def onProcessingTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
    if (time == window.getEnd) {
      TriggerResult.PURGE
    }
    else {
      ctx.registerProcessingTimeTimer(time + intervalMs)
      TriggerResult.FIRE
    }
  }

  override def onEventTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
}

相关问题