为什么我的spark结构化流式作业没有批量写入jdbc?

brqmpdu1  于 2021-05-17  发布在  Spark
关注(0)|答案(0)|浏览(340)

我有一个spark结构的流式处理作业,它使用foreachbatch从源kafka主题读取内容,并将内容写入两个接收器。
Kafka主题
jdbc sink(mariadb)表
下面是一些附加上下文的伪代码:

def write_to_kafka_and_maria(df, epoch_id):
    df.persist()

    df=df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    df.write \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
      .option("topic", "target_topic") \
      .save()

    df.write \
      .format("jdbc") \
      .options(
        url="jdbc:mysql:dbserver:port/db",
        driver="org.mariadb.jdbc.Driver",
        dbtable="tablename",
        user="username",
        password="password",
        batchSize=100000,
        numPartition=10
      ).mode("append") \
      .save()

     df.unpersist()

# Read from the topic

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "source_topic") \
  .load()

# Write to Kafka and MariaDB

write = df.writeStream.trigger(once=True).foreachBatch(write_to_kafka_and_maria).start()

write.awaitTermination()

当我运行我的工作,我可以看到我的记录到达我的目的地Kafka速度约7-8k记录秒。然而,直到工作接近尾声时,我才看到我的记录到达mariadb。此外,对mariadb的写入不会在对kafka的写入之后立即执行。相反,对mariadb的写入发生在对目标主题的写入完成后10-12分钟。这是预期的行为,还是我应该看到记录一个接一个地成批传播到两个接收器(因为这就是foreachbatch操作的目的)?似乎作业从源kafka主题读取了两次数据,并在写入kafka和mariadb之前在引擎盖下执行了不同的操作。提前谢谢!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题