flink和beam SDK如何处理窗口-哪个更有效?

wn9m85ua  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(215)

我将apachebeam sdk与flinksdk进行流处理比较,以确定使用beam作为附加框架的成本/优势。
我有一个非常简单的设置,其中从kafka源读取数据流,并由运行flink的节点集群并行处理。
根据我对这些SDK工作原理的理解,逐窗口处理数据流的最简单方法是:
使用apache beam(在flink上运行):
1.1. 创建管道对象。
1.2. 创建Kafka记录的pcollection。
1.3. 应用窗口功能。
1.4. 通过窗口将管道转换为关键帧。
1.5. 按键分组记录(窗口)。
1.6. 对窗口记录应用所需的任何函数。
使用flink sdk
2.1. 从Kafka源创建数据流。
2.2. 通过提供键函数将其转换为键控流。
2.3. 应用窗口功能。
2.4. 对窗口记录应用所需的任何函数。
虽然flink解决方案在编程上更简洁,但根据我的经验,它在处理大量数据时效率较低。我只能想象开销是由密钥提取函数引入的,因为beam不需要这个步骤。
我的问题是:我是不是在比较同类?这些过程不是等价的吗?有什么可以解释光束通道更有效,因为它使用Flink作为一个跑步者(所有其他条件都是一样的)?
这是使用beam sdk的代码

PipelineOptions options = PipelineOptionsFactory.create();

    //Run with Flink
    FlinkPipelineOptions flinkPipelineOptions = options.as(FlinkPipelineOptions.class);
    flinkPipelineOptions.setRunner(FlinkRunner.class);
    flinkPipelineOptions.setStreaming(true);
    flinkPipelineOptions.setParallelism(-1); //Pick this up from the user interface at runtime

    // Create the Pipeline object with the options we defined above.
    Pipeline p = Pipeline.create(flinkPipelineOptions);

    // Create a PCollection of Kafka records
    PCollection<KafkaRecord<byte[], byte[]>> kafkaCollection = p.apply(KafkaIO.<Long, String>readBytes()
            .withBootstrapServers(KAFKA_IP + ":" + KAFKA_PORT)
            .withTopics(ImmutableList.of(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC))
            .updateConsumerProperties(ImmutableMap.of("group.id", CONSUMER_GROUP)));

    //Apply Windowing Function    
    PCollection<KafkaRecord<byte[], byte[]>> windowedKafkaCollection = kafkaCollection.apply(Window.into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1))));

    //Transform the pipeline to key by window
    PCollection<KV<IntervalWindow, KafkaRecord<byte[], byte[]>>> keyedByWindow =
            windowedKafkaCollection.apply(
                    ParDo.of(
                            new DoFn<KafkaRecord<byte[], byte[]>, KV<IntervalWindow, KafkaRecord<byte[], byte[]>>>() {
                                @ProcessElement
                                public void processElement(ProcessContext context, IntervalWindow window) {
                                    context.output(KV.of(window, context.element()));
                                }
                            }));
    //Group records by key (window)
    PCollection<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>> groupedByWindow = keyedByWindow
            .apply(GroupByKey.<IntervalWindow, KafkaRecord<byte[], byte[]>>create());

    //Process windowed data
    PCollection<KV<IIntervalWindowResult, IPueResult>> processed = groupedByWindow
            .apply("filterAndProcess", ParDo.of(new PueCalculatorFn()));

    // Run the pipeline.
    p.run().waitUntilFinish();

这是使用flink sdk的代码

// Create a Streaming Execution Environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(6);

//Connect to Kafka
Properties properties = new Properties();   
properties.setProperty("bootstrap.servers", KAFKA_IP + ":" + KAFKA_PORT);
properties.setProperty("group.id", CONSUMER_GROUP);

DataStream<ObjectNode> stream = env
            .addSource(new FlinkKafkaConsumer010<>(Arrays.asList(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC), new JSONDeserializationSchema(), properties));

//Key by id
stream.keyBy((KeySelector<ObjectNode, Integer>) jsonNode -> jsonNode.get("id").asInt())

        //Set the windowing function.
        .timeWindow(Time.seconds(5L), Time.seconds(1L))

        //Process Windowed Data
        .process(new PueCalculatorFn(), TypeInformation.of(ImmutablePair.class));

// execute program
env.execute("Using Flink SDK");

非常感谢您的宝贵意见。

编辑

我想我应该添加一些可能相关的指标。

网络接收字节

任务经理.2
2,644,786,446
任务经理.3
2,645,765,232
任务经理.1
2,827,676,598
任务经理.6
2,422,309,148
任务经理.4
2,428,570,491
任务经理.5
2,431,368,644

光束

任务经理.2
4,092,154,160
任务经理.3
4,435,132,862
任务经理.1
4,766,399,314
任务经理.6
4,425,190,393
任务经理.4
4,096,576,110
任务经理.5
4,092,849,114

cpu利用率(最大)

任务经理.2
93.00%
任务经理.3
92.00%
任务经理.1
91.00%
任务经理.6
90.00%
任务经理.4
90.00%
任务经理.5
92.00%

光束

任务经理.2
52.0%
任务经理.3
71.0%
任务经理.1
72.0%
任务经理.6
40.0%
任务经理.4
56.0%
任务经理.5
26.0%
beam似乎使用了更多的网络,而flink使用了更多的cpu。这是否意味着beam正在以一种更有效的方式并行处理呢?

编辑no2

我很确定puecalculatorfn类是等价的,但是我将在这里分享代码,看看这两个进程之间是否有明显的差异。

光束

public class PueCalculatorFn extends DoFn<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>, KV<IIntervalWindowResult, IPueResult>> implements Serializable {
private transient List<IKafkaConsumption> realEnergyRecords;
private transient List<IKafkaConsumption> itEnergyRecords;

@ProcessElement
public void procesElement(DoFn<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>, KV<IIntervalWindowResult, IPueResult>>.ProcessContext c, BoundedWindow w) {
    KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>> element = c.element();
    Instant windowStart = Instant.ofEpochMilli(element.getKey().start().getMillis());
    Instant windowEnd = Instant.ofEpochMilli(element.getKey().end().getMillis());
    Iterable<KafkaRecord<byte[], byte[]>> records = element.getValue();

    //Calculate Pue
    IPueResult result = calculatePue(element.getKey(), records);

    //Create IntervalWindowResult object to return
    DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC"));
    IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart),
            formatter.format(windowEnd), realEnergyRecords, itEnergyRecords);

    //Return Pue keyed by Window
    c.output(KV.of(intervalWindowResult, result));
}

private PueResult calculatePue(IntervalWindow window, Iterable<KafkaRecord<byte[], byte[]>> records) {
    //Define accumulators to gather readings
    final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
    final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);

    //Declare variable to store the result
    BigDecimal pue = BigDecimal.ZERO;

    //Initialise transient lists
    realEnergyRecords = new ArrayList<>();
    itEnergyRecords = new ArrayList<>();

    //Transform the results into a stream
    Stream<KafkaRecord<byte[], byte[]>> streamOfRecords = StreamSupport.stream(records.spliterator(), false);

    //Iterate through each reading and add to the increment count
    streamOfRecords
            .map(record -> {
                byte[] valueBytes = record.getKV().getValue();
                assert valueBytes != null;
                String valueString = new String(valueBytes);
                assert !valueString.isEmpty();
                return KV.of(record, valueString);
            }).map(kv -> {
        Gson gson = new GsonBuilder().registerTypeAdapter(KafkaConsumption.class, new KafkaConsumptionDeserialiser()).create();
        KafkaConsumption consumption = gson.fromJson(kv.getValue(), KafkaConsumption.class);
        return KV.of(kv.getKey(), consumption);

    }).forEach(consumptionRecord -> {
                switch (consumptionRecord.getKey().getTopic()) {
                    case REAL_ENERGY_TOPIC:
                        totalRealIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed());
                        realEnergyRecords.add(consumptionRecord.getValue());
                        break;
                    case IT_ENERGY_TOPIC:
                        totalItIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed());
                        itEnergyRecords.add(consumptionRecord.getValue());
                        break;
                }
            }
    );

    assert totalRealIncrement.doubleValue() > 0.0;
    assert totalItIncrement.doubleValue() > 0.0;

    //Beware of division by zero
    if (totalItIncrement.doubleValue() != 0.0) {
        //Calculate PUE
        pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP);
    }

    //Create a PueResult object to return
    IWindow intervalWindow = new Window(window.start().getMillis(), window.end().getMillis());
    return new PueResult(intervalWindow, pue.stripTrailingZeros());
}

@Override
protected void finalize() throws Throwable {
    super.finalize();
    RecordSenderFactory.closeSender();
    WindowSenderFactory.closeSender();
}
}
public class PueCalculatorFn extends ProcessWindowFunction<ObjectNode, ImmutablePair, Integer, TimeWindow> {
private transient List<KafkaConsumption> realEnergyRecords;
private transient List<KafkaConsumption> itEnergyRecords;

@Override
public void process(Integer integer, Context context, Iterable<ObjectNode> iterable, Collector<ImmutablePair> collector) throws Exception {
    Instant windowStart = Instant.ofEpochMilli(context.window().getStart());
    Instant windowEnd = Instant.ofEpochMilli(context.window().getEnd());
    BigDecimal pue = calculatePue(iterable);

    //Create IntervalWindowResult object to return
    DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC"));
    IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart),
            formatter.format(windowEnd), realEnergyRecords
            .stream()
            .map(e -> (IKafkaConsumption) e)
            .collect(Collectors.toList()), itEnergyRecords
            .stream()
            .map(e -> (IKafkaConsumption) e)
            .collect(Collectors.toList()));

    //Create PueResult object to return
    IPueResult pueResult = new PueResult(new Window(windowStart.toEpochMilli(), windowEnd.toEpochMilli()), pue.stripTrailingZeros());

    //Collect result
    collector.collect(new ImmutablePair<>(intervalWindowResult, pueResult));

}

protected BigDecimal calculatePue(Iterable<ObjectNode> iterable) {
    //Define accumulators to gather readings
    final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
    final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);

    //Declare variable to store the result
    BigDecimal pue = BigDecimal.ZERO;

    //Initialise transient lists
    realEnergyRecords = new ArrayList<>();
    itEnergyRecords = new ArrayList<>();

    //Iterate through each reading and add to the increment count
    StreamSupport.stream(iterable.spliterator(), false)
            .forEach(object -> {
                switch (object.get("topic").textValue()) {
                    case REAL_ENERGY_TOPIC:
                        totalRealIncrement.accumulate(object.get("energyConsumed").asDouble());
                        realEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object));
                        break;
                    case IT_ENERGY_TOPIC:
                        totalItIncrement.accumulate(object.get("energyConsumed").asDouble());
                        itEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object));
                        break;
                }

            });

    assert totalRealIncrement.doubleValue() > 0.0;
    assert totalItIncrement.doubleValue() > 0.0;

    //Beware of division by zero
    if (totalItIncrement.doubleValue() != 0.0) {
        //Calculate PUE
        pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP);
    }
    return pue;
}

}

下面是我在beam示例中使用的自定义反序列化程序。

Kafka消费序列器

public class KafkaConsumptionDeserialiser implements JsonDeserializer<KafkaConsumption> {

public KafkaConsumption deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
    if(jsonElement == null) {
        return null;
    } else {
        JsonObject jsonObject = jsonElement.getAsJsonObject();
        JsonElement id = jsonObject.get("id");
        JsonElement energyConsumed = jsonObject.get("energyConsumed");
        Gson gson = (new GsonBuilder()).registerTypeAdapter(Duration.class, new DurationDeserialiser()).registerTypeAdapter(ZonedDateTime.class, new ZonedDateTimeDeserialiser()).create();
        Duration duration = (Duration)gson.fromJson(jsonObject.get("duration"), Duration.class);
        JsonElement topic = jsonObject.get("topic");
        Instant eventTime = (Instant)gson.fromJson(jsonObject.get("eventTime"), Instant.class);
        return new KafkaConsumption(Integer.valueOf(id != null?id.getAsInt():0), Double.valueOf(energyConsumed != null?energyConsumed.getAsDouble():0.0D), duration, topic != null?topic.getAsString():"", eventTime);
    }
  }

}
mznpcxlj

mznpcxlj1#

不知道为什么您编写的beam管道更快,但从语义上讲,它与flink作业不同。与flink中窗口的工作方式类似,一旦在beam中指定了窗口,以下所有操作都会自动将窗口考虑在内。你不需要按窗口分组。
可以将梁管道定义简化如下:

// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(flinkPipelineOptions);

// Create a PCollection of Kafka records
PCollection<KafkaRecord<byte[], byte[]>> kafkaCollection = ...

//Apply Windowing Function
PCollection<KafkaRecord<byte[], byte[]>> windowedKafkaCollection = kafkaCollection.apply(
 Window.into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1))));

//Process windowed data
PCollection<KV<IIntervalWindowResult, IPueResult>> processed = windowedKafkaCollection
    .apply("filterAndProcess", ParDo.of(new PueCalculatorFn()));

// Run the pipeline.
p.run().waitUntilFinish();

至于性能,这取决于许多因素,但请记住,beam是flink之上的抽象层。总的来说,如果你看到beam on flink提高了性能,我会很惊讶。
编辑:为了进一步澄清,您没有在beam管道中的json“id”字段上分组,这是在flink代码段中进行的。

t3psigkw

t3psigkw2#

值得一提的是,如果窗口处理可以通过reduce()或aggregate()进行预聚合,那么本机flink作业的性能应该比当前更好。
许多细节,如状态后端的选择、序列化、检查点等,也会对性能产生很大的影响。
在这两种情况下是否使用了相同的flink——即相同的版本、相同的配置?

相关问题