Flink 将其数据发送到异步功能后清除状态是否安全?

btxsgosb  于 6个月前  发布在  Apache
关注(0)|答案(1)|浏览(68)

我想构建一个应用程序,它对数据进行一些聚合,并使用计时器将聚合发送到一个异步步骤,该步骤将其转储到其他地方。在onTimer函数中发送数据后,我清除状态。
比如说

@Override
public void onTimer(long timestamp, KeyedProcessFunction<KEY, IN, Aggregation>.OnTimerContext ctx, Collector<Aggregation> out) throws Exception {
    out.collect(new Aggregation(ctx.getCurrentKey(), aggregation.get()));
    aggregation.clear();
}

字符串
该流将被传递到AsyncDataStream,如下所示:

SingleOutputStreamOperator<Aggregation> aggregations;
AsyncDataStream.unorderedWaitWithRetry(aggregations, new AsyncDatabaseRequest(), 10, TimeUnit.SECONDS, 1000, asyncRetryStrategy).addSink(new DiscardingSink<>());


在将聚合发送到目标步骤后清除状态是否安全?如果它无法将数据写入目标,会发生什么?

3qpi33ja

3qpi33ja1#

如果工作流本身失败,那么Flink的exactly once模式(假设你已经正确配置了)将确保数据不会被丢弃。这可能意味着一条记录被多次写入外部服务,所以你必须处理这种情况。
因此,如果写入失败导致工作流失败,您应该没有问题。
如果您不希望写入失败导致工作流终止/重新启动,那么您可以自行决定如何不丢弃任何数据。例如,您仍然可以从BLOG函数生成结果,但带有错误信息,然后将BLOG函数的流拆分为OK & failure流,并对failure流进行特殊处理。

相关问题