Kafka 如何将AsyncAPI模式转换为Avro模式

t0ybt7op  于 5个月前  发布在  Apache
关注(0)|答案(1)|浏览(75)

我有一个没有任何模式验证的Kafka实现。
Kafka生产者基本上是使用Avro Serializer向Kafka发布Person消息。
我使用avro-maven-plugin从avro模式生成Person pojo类。
下面是我设计的avro架构

{
  "namespace": "com.example.kafka.messageproducer",
  "type": "record",  
  "name": "PersonMessage",
  "doc": "Schema for Person validation",
  "fields": [
    { "doc": "The firstName of a person.", "name": "firstName","type": "string" },
    { "doc": "The lastName of a person", "name": "lastName","type": "string"},
    { "doc": "The gender", "name": "gender", "type": "string"},
    { "doc": "The age", "name": "age", "type": "int" }
  ]
}

字符串
我已经上传了这个avro架构到confluent架构注册表和messagevalidation工作正常。
Kafka文档是用AsyncAPI编写的,我的目标是通过转换AsyncApi schema yaml来获得相同的avro schema。
有没有办法将AsyncAPI yaml文件转换为Avro模式文件?
下面是我的AsyncAPI文件

{
"asyncapi": "2.6.0",
"id": "urn:com:blaise:person",
"info": {
  "title": "Person AsynchAPI",
  "version": "1.0.0"
},
"servers": {
  "non-prod-eu-west-3": {
    "url": "pkc-75m1o.europe-west3.gcp.confluent.cloud:9092",
    "protocol": "kafka"
  }
},
"defaultContentType": "application/json",
"channels": {
  "person": {
    "publish": {
      "message": {
        "name": "personupload",
        "summary": "Inform the wolrd about the new person created",
        "description": "Kafka message related to person\n* publish a message on kafka bus when a new person is created\n",
        "contentType": "application/json",
        "headers": {
          "$ref": "#/components/messageTraits/commonHeaders/headers"
        },
        "payload": {
          "$ref": "#/components/schemas/personPayload"
        }
      }
    }
  }
},
"components": {
  "messageTraits": {
    "commonHeaders": {
      "headers": {
        "type": "object",
        "properties": {
          "header1": {
            "type": "string",
            "description": "the first header"
          },
          "header2": {
            "type": "string",
            "description": "the second header"
          },
          "header3": {
            "type": "string",
            "description": "the third header"
          }
        }
      }
    }
  },
  "schemas": {
    "personPayload": {
      "type": "object",
      "properties": {
        "id": {
          "$ref": "#/components/schemas/personId"
        },
        "firstName": {
          "$ref": "#/components/schemas/firstName"
        },
        "lastName": {
          "$ref": "#/components/schemas/lastName"
        },
        "gender": {
          "$ref": "#/components/schemas/gender"
        },
        "age": {
          "$ref": "#/components/schemas/age"
        }
      },
      "required": [
        "firstName",
        "lastName"
      ]
    },
    "personId": {
      "type": "string",
      "format": "uuid",
      "description": "unique uiid to identify a person        \nexamples: \n- \"9acabc86-d5e7-41bd-a210-f10bb1984105\n"
    },
    "firstName": {
      "type": "string",
      "description": "The firstName of a person                \nexamples: \n- Robert\n"
    },
    "lastName": {
      "type": "string",
      "description": "The lastName of a person                \nexamples: \n- Lafange\n"
    },
    "gender": {
      "type": "string",
      "description": "The gender of a persom           \n",
      "enum": [
        "MALE",
        "EMALE"
      ]
    },
    "age": {
      "type": "integer",
      "format": "int32",
      "description": "The age of a person\n"
    }
  }
 }
}


我能够使用以下工具https://studio.asyncapi.com/将其转换为json文件。但结果根本不是Avro模式,下面是结果:

{
  "asyncapi": "2.6.0",
  "id": "urn:com:blaise:person",
  "info": {
    "title": "Person AsynchAPI",
    "version": "1.0.0"
   },
  "servers": {
    "non-prod-eu-west-3": {
      "url": "pkc-75m1o.europe-west3.gcp.confluent.cloud:9092",
      "protocol": "kafka"
    }
  },
  "defaultContentType": "application/json",
  "channels": {
    "person": {
      "publish": {
        "message": {
          "name": "personupload",
          "summary": "Inform the wolrd about the new person 
created",
          "description": "Kafka message related to person\n* 
publish a message on kafka bus when a new person is created\n",
          "contentType": "application/json",
          "headers": {
            "$ref": 
"#/components/messageTraits/commonHeaders/headers"
          },
          "payload": {
            "$ref": "#/components/schemas/personPayload"
          }
        }
      }
    }
  },
  "components": {
    "messageTraits": {
      "commonHeaders": {
        "headers": {
          "type": "object",
          "properties": {
            "header1": {
              "type": "string",
              "description": "the first header"
            },
            "header2": {
              "type": "string",
              "description": "the second header"
            },
            "header3": {
              "type": "string",
              "description": "the third header"
            }
          }
        }
      }
    },
    "schemas": {
      "personPayload": {
        "type": "object",
        "properties": {
          "id": {
            "$ref": "#/components/schemas/personId"
          },
          "firstName": {
            "$ref": "#/components/schemas/firstName"
          },
          "lastName": {
            "$ref": "#/components/schemas/lastName"
          },
          "gender": {
            "$ref": "#/components/schemas/gender"
          },
           "age": {
            "$ref": "#/components/schemas/age"
          }
        },
        "required": [
          "firstName",
          "lastName"
        ]
      },
      "personId": {
        "type": "string",
        "format": "uuid",
        "description": "unique uiid to identify a person        
\nexamples: \n- \"9acabc86-d5e7-41bd-a210-f10bb1984105\n"
       },
      "firstName": {
        "type": "string",
        "description": "The firstName of a person                \nexamples: \n- Robert\n"
      },
      "lastName": {
        "type": "string",
         "description": "The lastName of a person                
 \nexamples: \n- Lafange\n"
           },
        "gender": {
        "type": "string",
        "description": "The gender of a persom           \n",
        "enum": [
          "MALE",
          "EMALE"
        ]
      },
      "age": {
        "type": "integer",
        "format": "int32",
        "description": "The age of a person\n"
      }
    }
    }
}

**有没有办法将AsyncAPI yaml文件转换为Avro模式文件?**非常感谢。

qnzebej0

qnzebej01#

Confluent Schema Registry支持JSON Schema,AsyncAPI Schema Object大致上是一个JSON Schema Draft 07对象。
阅读更多:https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#serializer-and-formatter.
让我知道它是否有效:)

相关问题