java—在梁变换中循环使用apache beam顺序处理

l3zydbqr  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(291)

我正在创建一个beam管道来对数据包进行批处理。管道使用cassandraio读取记录。我想分批处理30分钟的数据,然后分组/缝合30分钟的数据并将其写入另一个表。我为每个员工提供300个捆绑包,我需要使用有限的资源(大约2gi)处理至少50个员工。但目前堆的使用率非常高,因此我只能处理1个员工(使用~4gi)。如果我提供更多的数据,就会出现内存不足/堆错误。
是一次处理一名员工的方法。就像一个循环,这样我就可以用我的~2gi按顺序处理所有员工
下面是我的示例代码

Pipeline p = Pipeline.create(options);
 dataToWritetoDb = p.apply(CassandraIO.<LevelOneInputEntity>read()
                    .withHosts(Arrays.asList("127.0.0.1")).withPort(9042).withKeyspace("beam")
                    .withTable("LevelOneInputEntity").withEntity(LevelOneInputEntity.class).withConnectTimeout(1000)
                    .withReadTimeout(5000).withCoder(SerializableCoder.of(LevelOneInputEntity.class)))
.apply(ParDo.of(new ApplyTimeStampDoFnFunction()))
.apply(Window.<EntityClass>into(FixedWindows.of(Duration.standardMinutes(30)))
.apply("Group and create keyvalue bundles",ParDo.of(new BundleKVDoFn(mapper)))

有没有一种方法可以实现下面的逻辑,以便管道为每个员工运行一次?

Pipeline p = Pipeline.create(options);

    for (String employeeId : employeelist) {
        sql = "select * from LevelOneInputEntity where employeeId = "+employeeId;
        dataToWritetoDb =  p.apply(CassandraIO.<LevelOneInputEntity>read()
                            .withHosts(Arrays.asList("127.0.0.1")).withPort(9042).withKeyspace("beam")
                            .withTable("LevelOneInputEntity").withEntity(LevelOneInputEntity.class).withConnectTimeout(1000).withQuery(sql)
                            .withReadTimeout(5000).withCoder(SerializableCoder.of(LevelOneInputEntity.class)))
        .apply(ParDo.of(new ApplyTimeStampDoFnFunction()))
        .apply(Window.<EntityClass>into(FixedWindows.of(Duration.standardMinutes(30)))
        .apply("Group and create keyvalue bundles",ParDo.of(new BundleKVDoFn(mapper)))
    }
gcmastyq

gcmastyq1#

beam没有一种内置的方法来实现这一点,但是您可以在昂贵的dofn中添加锁定,它只允许一次处理一个元素。像这样的

class ExpensiveDoFn extends DoFn<Input, Output> {
  private static Lock lock;
  @ProcessElement
  public void Process(@Element Input input) {
    lock.lock();
    // Do memory-intensive processing here.
    lock.unlock()
  }
}

或者,您可以尝试查看程序中的堆转储,看看是否可以优化内存使用

相关问题