如何从flink runner上的google数据流(apache beam)向kafka发送消息

w8f9ii69  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(392)

我试着写一个概念证明,从Kafka那里获取信息,用光束在Flink上变换,然后把结果推到另一个Kafka主题上。
我使用了kafkawindowedwordcount示例作为起点,这是我想做的第一部分,但是它输出到文本文件,而不是kafka。flinkkafkaproducer08看起来很有希望,但我不知道如何将它插入管道。我在想,它会被一个无限链接槽,或一些这样的 Package ,但这似乎并不存在。
对我要做的事有什么建议或想法吗?
我正在运行最新的孵化器beam(截至昨晚从github),集群模式下的flink1.0.0和kafka 0.9.0.1,所有这些都在google计算引擎上(debian jessie)。

vfwfrxfs

vfwfrxfs1#

beam中当前没有无界Sink类。大多数无边界接收器都是使用 ParDo .
您可能希望查看Kafka约连接器。这是一个kafka读取器,在所有的梁运行程序中工作,并实现并行读取、检查点和其他功能 UnboundedSource API。这个pull请求还包括tophashtags示例管道中的一个原始接收器,它通过在 ParDo :

class KafkaWriter extends DoFn<String, Void> {

  private final String topic;
  private final Map<String, Object> config;
  private transient KafkaProducer<String, String> producer = null;

  public KafkaWriter(Options options) {
    this.topic = options.getOutputTopic();
    this.config = ImmutableMap.<String, Object>of(
        "bootstrap.servers", options.getBootstrapServers(),
        "key.serializer",    StringSerializer.class.getName(),
        "value.serializer",  StringSerializer.class.getName());
  }

  @Override
  public void startBundle(Context c) throws Exception {
    if (producer == null) { // in Beam, startBundle might be called multiple times.
      producer = new KafkaProducer<String, String>(config);
    }
  }

  @Override
  public void finishBundle(Context c) throws Exception {
    producer.close();
  }

  @Override
  public void processElement(ProcessContext ctx) throws Exception {
    producer.send(new ProducerRecord<String, String>(topic, ctx.element()));
  }
}

当然,我们希望在 KafkaIO 也。它实际上与 KafkaWriter 但使用起来要简单得多。

llew8vvj

llew8vvj2#

2016年,apache beam/dataflow中添加了用于写入kafka的sink转换。有关详细信息,请参阅javadoc KafkaIO 以apachebeam为例。

相关问题