我有一个没有任何模式验证的Kafka实现。
Kafka生产者基本上是使用Avro Serializer向Kafka发布Person消息。
我使用avro-maven-plugin从avro模式生成Person pojo类。
下面是我设计的avro架构
{
"namespace": "com.example.kafka.messageproducer",
"type": "record",
"name": "PersonMessage",
"doc": "Schema for Person validation",
"fields": [
{ "doc": "The firstName of a person.", "name": "firstName","type": "string" },
{ "doc": "The lastName of a person", "name": "lastName","type": "string"},
{ "doc": "The gender", "name": "gender", "type": "string"},
{ "doc": "The age", "name": "age", "type": "int" }
]
}
字符串
我已经上传了这个avro架构到confluent架构注册表和messagevalidation工作正常。
Kafka文档是用AsyncAPI编写的,我的目标是通过转换AsyncApi schema yaml来获得相同的avro schema。
有没有办法将AsyncAPI yaml文件转换为Avro模式文件?
下面是我的AsyncAPI文件
{
"asyncapi": "2.6.0",
"id": "urn:com:blaise:person",
"info": {
"title": "Person AsynchAPI",
"version": "1.0.0"
},
"servers": {
"non-prod-eu-west-3": {
"url": "pkc-75m1o.europe-west3.gcp.confluent.cloud:9092",
"protocol": "kafka"
}
},
"defaultContentType": "application/json",
"channels": {
"person": {
"publish": {
"message": {
"name": "personupload",
"summary": "Inform the wolrd about the new person created",
"description": "Kafka message related to person\n* publish a message on kafka bus when a new person is created\n",
"contentType": "application/json",
"headers": {
"$ref": "#/components/messageTraits/commonHeaders/headers"
},
"payload": {
"$ref": "#/components/schemas/personPayload"
}
}
}
}
},
"components": {
"messageTraits": {
"commonHeaders": {
"headers": {
"type": "object",
"properties": {
"header1": {
"type": "string",
"description": "the first header"
},
"header2": {
"type": "string",
"description": "the second header"
},
"header3": {
"type": "string",
"description": "the third header"
}
}
}
}
},
"schemas": {
"personPayload": {
"type": "object",
"properties": {
"id": {
"$ref": "#/components/schemas/personId"
},
"firstName": {
"$ref": "#/components/schemas/firstName"
},
"lastName": {
"$ref": "#/components/schemas/lastName"
},
"gender": {
"$ref": "#/components/schemas/gender"
},
"age": {
"$ref": "#/components/schemas/age"
}
},
"required": [
"firstName",
"lastName"
]
},
"personId": {
"type": "string",
"format": "uuid",
"description": "unique uiid to identify a person \nexamples: \n- \"9acabc86-d5e7-41bd-a210-f10bb1984105\n"
},
"firstName": {
"type": "string",
"description": "The firstName of a person \nexamples: \n- Robert\n"
},
"lastName": {
"type": "string",
"description": "The lastName of a person \nexamples: \n- Lafange\n"
},
"gender": {
"type": "string",
"description": "The gender of a persom \n",
"enum": [
"MALE",
"EMALE"
]
},
"age": {
"type": "integer",
"format": "int32",
"description": "The age of a person\n"
}
}
}
}
型
我能够使用以下工具https://studio.asyncapi.com/将其转换为json文件。但结果根本不是Avro模式,下面是结果:
{
"asyncapi": "2.6.0",
"id": "urn:com:blaise:person",
"info": {
"title": "Person AsynchAPI",
"version": "1.0.0"
},
"servers": {
"non-prod-eu-west-3": {
"url": "pkc-75m1o.europe-west3.gcp.confluent.cloud:9092",
"protocol": "kafka"
}
},
"defaultContentType": "application/json",
"channels": {
"person": {
"publish": {
"message": {
"name": "personupload",
"summary": "Inform the wolrd about the new person
created",
"description": "Kafka message related to person\n*
publish a message on kafka bus when a new person is created\n",
"contentType": "application/json",
"headers": {
"$ref":
"#/components/messageTraits/commonHeaders/headers"
},
"payload": {
"$ref": "#/components/schemas/personPayload"
}
}
}
}
},
"components": {
"messageTraits": {
"commonHeaders": {
"headers": {
"type": "object",
"properties": {
"header1": {
"type": "string",
"description": "the first header"
},
"header2": {
"type": "string",
"description": "the second header"
},
"header3": {
"type": "string",
"description": "the third header"
}
}
}
}
},
"schemas": {
"personPayload": {
"type": "object",
"properties": {
"id": {
"$ref": "#/components/schemas/personId"
},
"firstName": {
"$ref": "#/components/schemas/firstName"
},
"lastName": {
"$ref": "#/components/schemas/lastName"
},
"gender": {
"$ref": "#/components/schemas/gender"
},
"age": {
"$ref": "#/components/schemas/age"
}
},
"required": [
"firstName",
"lastName"
]
},
"personId": {
"type": "string",
"format": "uuid",
"description": "unique uiid to identify a person
\nexamples: \n- \"9acabc86-d5e7-41bd-a210-f10bb1984105\n"
},
"firstName": {
"type": "string",
"description": "The firstName of a person \nexamples: \n- Robert\n"
},
"lastName": {
"type": "string",
"description": "The lastName of a person
\nexamples: \n- Lafange\n"
},
"gender": {
"type": "string",
"description": "The gender of a persom \n",
"enum": [
"MALE",
"EMALE"
]
},
"age": {
"type": "integer",
"format": "int32",
"description": "The age of a person\n"
}
}
}
}
型
**有没有办法将AsyncAPI yaml文件转换为Avro模式文件?**非常感谢。
1条答案
按热度按时间qnzebej01#
Confluent Schema Registry支持JSON Schema,AsyncAPI Schema Object大致上是一个JSON Schema Draft 07对象。
阅读更多:https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#serializer-and-formatter.
让我知道它是否有效:)