spark从kafka读取数据,并使用自定义模式注册表存储到minio

pw136qt2  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(399)

我们在一个Kafka主题中制作所有事件 input-data-topic ,我们的系统有两个独特的事件,分别有自己的模式。我们有自己的自定义模式注册表来维护系统中的模式。
事件例如:
{id:“”,name:“”,designation:“”,vertical:“”,operation:“创建/更新”,schemaid:“schema1”}
{id:,操作:“delete”,schemaid:“schema2”}
总流量:
所有事件->Kafka( input-data-topic )->集群中的spark作业(scala)->minio

val events = spark.readStream
  .format("kafka")
  .option("failOnDataLoss", "false")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "input-data-topic")
  .load()
  .select(from_json(col("value").cast("string"), schema).as("event"))
  .select(col("event.*"))

   ........
   ........
   ........

这里我们为一个事件添加了一个静态模式,如何根据输入事件的模式id从自定义模式注册表中获取模式?
预期流量:
Kafka( input-data-topic )->集群中的spark job(scala)应该点击schema registry并验证schema,然后发送到minio->minio

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题