我有一个spark结构的流式处理作业,它使用foreachbatch从源kafka主题读取内容,并将内容写入两个接收器。
Kafka主题
jdbc sink(mariadb)表
下面是一些附加上下文的伪代码:
def write_to_kafka_and_maria(df, epoch_id):
df.persist()
df=df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "target_topic") \
.save()
df.write \
.format("jdbc") \
.options(
url="jdbc:mysql:dbserver:port/db",
driver="org.mariadb.jdbc.Driver",
dbtable="tablename",
user="username",
password="password",
batchSize=100000,
numPartition=10
).mode("append") \
.save()
df.unpersist()
# Read from the topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "source_topic") \
.load()
# Write to Kafka and MariaDB
write = df.writeStream.trigger(once=True).foreachBatch(write_to_kafka_and_maria).start()
write.awaitTermination()
当我运行我的工作,我可以看到我的记录到达我的目的地Kafka速度约7-8k记录秒。然而,直到工作接近尾声时,我才看到我的记录到达mariadb。此外,对mariadb的写入不会在对kafka的写入之后立即执行。相反,对mariadb的写入发生在对目标主题的写入完成后10-12分钟。这是预期的行为,还是我应该看到记录一个接一个地成批传播到两个接收器(因为这就是foreachbatch操作的目的)?似乎作业从源kafka主题读取了两次数据,并在写入kafka和mariadb之前在引擎盖下执行了不同的操作。提前谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!